chunk_record.py 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. """
  2. RagFlow Chunk 上传记录管理模块
  3. 该模块负责处理 RagFlow Chunk 上传记录的数据库操作,包括:
  4. - 记录 Chunk 上传任务到定时任务表
  5. - 提供统一的接口供外部调用
  6. """
  7. import json
  8. from typing import Dict, Any, Optional
  9. from src.utils.mysql import get_global_mysql_client
  10. from datetime import datetime, timedelta
  11. class ChunkRecordService:
  12. """Chunk 上传记录服务"""
  13. def __init__(self):
  14. """初始化 Chunk 记录服务"""
  15. self.mysql_client = get_global_mysql_client()
  16. def record_chunk_add(self, database_name: str, table_name: str, chunk_id: str,
  17. cond: Optional[str] = None, data: Dict[str, Any] = {}) -> None:
  18. """
  19. 记录 Chunk 上传信息到 MySQL 定时任务表
  20. Args:
  21. database_name: 数据库 ID
  22. table_name: 数据表 ID
  23. chunk_id: 上传成功返回的 Chunk ID
  24. image_path: 图片路径
  25. cond: 条件字符串,由调用方传递
  26. data: 数据字符串,由调用方传递或自动生成
  27. """
  28. # 计算20秒后的时间
  29. scheduled_time = datetime.now() + timedelta(seconds=60)
  30. # 判断cond与data不能为空
  31. if not cond and not data:
  32. raise ValueError("cond and data must be provided")
  33. # 准备插入数据(所有记录status固定为"未执行")
  34. insert_data = {
  35. "database_name": database_name,
  36. "table_name": table_name,
  37. "chunk_id": chunk_id,
  38. "cond": cond,
  39. "update_data": json.dumps(data),
  40. "scheduled_time": scheduled_time,
  41. "status": "未执行"
  42. }
  43. try:
  44. # 插入记录到 MySQL 定时任务表
  45. self.mysql_client.execute(
  46. "INSERT INTO ragflow_chunk_record (database_name, table_name, chunk_id, cond, update_data, scheduled_time, status) "
  47. "VALUES (%(database_name)s, %(table_name)s, %(chunk_id)s, %(cond)s, %(update_data)s, %(scheduled_time)s, %(status)s)",
  48. insert_data
  49. )
  50. except Exception as e:
  51. raise Exception(f"Failed to record chunk upload: {e}")
  52. # 创建全局实例
  53. _chunk_record_service = None
  54. def get_chunk_record_service() -> ChunkRecordService:
  55. """
  56. 获取 Chunk 记录服务实例(单例模式)
  57. Returns:
  58. ChunkRecordService 实例
  59. """
  60. global _chunk_record_service
  61. if _chunk_record_service is None:
  62. _chunk_record_service = ChunkRecordService()
  63. return _chunk_record_service