""" RagFlow Chunk 上传记录管理模块 该模块负责处理 RagFlow Chunk 上传记录的数据库操作,包括: - 记录 Chunk 上传任务到定时任务表 - 提供统一的接口供外部调用 """ import json from typing import Dict, Any, Optional from src.utils.mysql import get_global_mysql_client from datetime import datetime, timedelta class ChunkRecordService: """Chunk 上传记录服务""" def __init__(self): """初始化 Chunk 记录服务""" self.mysql_client = get_global_mysql_client() def record_chunk_add(self, database_name: str, table_name: str, chunk_id: str, cond: Optional[str] = None, data: Dict[str, Any] = {}) -> None: """ 记录 Chunk 上传信息到 MySQL 定时任务表 Args: database_name: 数据库 ID table_name: 数据表 ID chunk_id: 上传成功返回的 Chunk ID image_path: 图片路径 cond: 条件字符串,由调用方传递 data: 数据字符串,由调用方传递或自动生成 """ # 计算20秒后的时间 scheduled_time = datetime.now() + timedelta(seconds=60) # 判断cond与data不能为空 if not cond and not data: raise ValueError("cond and data must be provided") # 准备插入数据(所有记录status固定为"未执行") insert_data = { "database_name": database_name, "table_name": table_name, "chunk_id": chunk_id, "cond": cond, "update_data": json.dumps(data), "scheduled_time": scheduled_time, "status": "未执行" } try: # 插入记录到 MySQL 定时任务表 self.mysql_client.execute( "INSERT INTO ragflow_chunk_record (database_name, table_name, chunk_id, cond, update_data, scheduled_time, status) " "VALUES (%(database_name)s, %(table_name)s, %(chunk_id)s, %(cond)s, %(update_data)s, %(scheduled_time)s, %(status)s)", insert_data ) except Exception as e: raise Exception(f"Failed to record chunk upload: {e}") # 创建全局实例 _chunk_record_service = None def get_chunk_record_service() -> ChunkRecordService: """ 获取 Chunk 记录服务实例(单例模式) Returns: ChunkRecordService 实例 """ global _chunk_record_service if _chunk_record_service is None: _chunk_record_service = ChunkRecordService() return _chunk_record_service