| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374 |
- """
- RagFlow Chunk 上传记录管理模块
- 该模块负责处理 RagFlow Chunk 上传记录的数据库操作,包括:
- - 记录 Chunk 上传任务到定时任务表
- - 提供统一的接口供外部调用
- """
- import json
- from typing import Dict, Any, Optional
- from src.utils.mysql import get_global_mysql_client
- from datetime import datetime, timedelta
- class ChunkRecordService:
- """Chunk 上传记录服务"""
-
- def __init__(self):
- """初始化 Chunk 记录服务"""
- self.mysql_client = get_global_mysql_client()
-
- def record_chunk_add(self, database_name: str, table_name: str, chunk_id: str,
- cond: Optional[str] = None, data: Dict[str, Any] = {}) -> None:
- """
- 记录 Chunk 上传信息到 MySQL 定时任务表
-
- Args:
- database_name: 数据库 ID
- table_name: 数据表 ID
- chunk_id: 上传成功返回的 Chunk ID
- image_path: 图片路径
- cond: 条件字符串,由调用方传递
- data: 数据字符串,由调用方传递或自动生成
- """
- # 计算20秒后的时间
- scheduled_time = datetime.now() + timedelta(seconds=60)
- # 判断cond与data不能为空
- if not cond and not data:
- raise ValueError("cond and data must be provided")
-
- # 准备插入数据(所有记录status固定为"未执行")
- insert_data = {
- "database_name": database_name,
- "table_name": table_name,
- "chunk_id": chunk_id,
- "cond": cond,
- "update_data": json.dumps(data),
- "scheduled_time": scheduled_time,
- "status": "未执行"
- }
-
- try:
- # 插入记录到 MySQL 定时任务表
- self.mysql_client.execute(
- "INSERT INTO ragflow_chunk_record (database_name, table_name, chunk_id, cond, update_data, scheduled_time, status) "
- "VALUES (%(database_name)s, %(table_name)s, %(chunk_id)s, %(cond)s, %(update_data)s, %(scheduled_time)s, %(status)s)",
- insert_data
- )
- except Exception as e:
- raise Exception(f"Failed to record chunk upload: {e}")
- # 创建全局实例
- _chunk_record_service = None
- def get_chunk_record_service() -> ChunkRecordService:
- """
- 获取 Chunk 记录服务实例(单例模式)
-
- Returns:
- ChunkRecordService 实例
- """
- global _chunk_record_service
- if _chunk_record_service is None:
- _chunk_record_service = ChunkRecordService()
- return _chunk_record_service
|