alair 3 месяцев назад
Родитель
Сommit
771d7a081c
93 измененных файлов с 411 добавлено и 453 удалено
  1. 6 4
      .env
  2. 14 0
      .trae/documents/修复DBUtils依赖问题.md
  3. 21 0
      .trae/documents/修复PDF解析工作流中的f-string格式错误.md
  4. 1 0
      D:\\project\\work\\ragflow_plugs\\book\\output\\temp\\request.txt
  5. BIN
      __pycache__/main.cpython-312.pyc
  6. 19 0
      doc/init.sql
  7. 26 15
      main.py
  8. 4 1
      requirements.txt
  9. 0 0
      src/.env.example
  10. 0 0
      src/.gitignore
  11. 0 0
      src/HTTP API.md
  12. 0 0
      src/__init__.py
  13. 3 9
      src/agent/test_image_agent.py
  14. 0 0
      src/api/__init__.py
  15. 0 0
      src/api/dataset/__init__.py
  16. 0 0
      src/api/dataset/services/__init__.py
  17. 2 2
      src/api/dataset/services/dataset_manage_service.py
  18. 0 0
      src/api/db/__init__.py
  19. 0 0
      src/api/db/services/__init__.py
  20. 5 5
      src/api/db/services/infinity_search_service.py
  21. 16 9
      src/api/db/services/tag_service.py
  22. 3 8
      src/api/hybrid_search_mcp.py
  23. 0 0
      src/api/sdk/__init__.py
  24. 2 2
      src/api/sdk/dataset_manage.py
  25. 2 2
      src/api/search_infinity.py
  26. 6 8
      src/api/tag_manage.py
  27. 0 1
      src/common/models/pagination.py
  28. 0 0
      src/conf/__init__.py
  29. 0 0
      src/conf/age_level.json
  30. 0 0
      src/conf/config.py
  31. 0 0
      src/conf/infinity_mapping.json
  32. 0 0
      src/conf/rag_parser_config.py
  33. 2 0
      src/conf/settings.py
  34. 0 0
      src/ee.json
  35. 131 0
      src/job/chunk_update_job.py
  36. 0 0
      src/model/__init__.py
  37. 0 0
      src/model/jina_rerank.py
  38. 2 2
      src/model/multimodal_embedding.py
  39. 1 1
      src/model/openai_chat_model.py
  40. 1 1
      src/model/qwen_vl.py
  41. 1 1
      src/model/tracked_multi_embedding.py
  42. 1 1
      src/model/tracked_openai_embeddings.py
  43. 0 0
      src/model_output.md
  44. 0 0
      src/parser/image_parser/__init__.py
  45. 6 12
      src/parser/image_parser/image_parser_workflow.py
  46. 0 0
      src/parser/pdf_parser/__init__.py
  47. 36 28
      src/parser/pdf_parser/pdf_parser_workflow.py
  48. 0 0
      src/parser/pdf_parser/pdf_splitter.py
  49. 0 0
      src/parser/pdf_parser/test_service.py
  50. 0 0
      src/private_key.pem
  51. 0 0
      src/prompt/parser/page_parse_prompt.py
  52. 0 0
      src/public_key.pem
  53. 0 0
      src/utils/__init__.py
  54. 0 0
      src/utils/asymmetric_encryption.py
  55. 0 0
      src/utils/decorators/langfuse_trace_embedding.py
  56. 0 0
      src/utils/decorators/singleton.py
  57. 0 0
      src/utils/excel_util.py
  58. 0 0
      src/utils/file/__init__.py
  59. 1 1
      src/utils/file/file_utils.py
  60. 2 2
      src/utils/file/image_util.py
  61. 0 0
      src/utils/file/minio/__init__.py
  62. 2 2
      src/utils/file/minio/minio_util.py
  63. 0 0
      src/utils/http_client.py
  64. 0 0
      src/utils/infinity/README.md
  65. 0 0
      src/utils/infinity/__init__.py
  66. 1 1
      src/utils/infinity/client.py
  67. 1 1
      src/utils/infinity/pool.py
  68. 0 0
      src/utils/infinity/result_util.py
  69. 0 0
      src/utils/infinity/test_infinity.py
  70. 0 0
      src/utils/mysql/__init__.py
  71. 3 3
      src/utils/mysql/mysql_conn.py
  72. 1 1
      src/utils/mysql/mysql_pool.py
  73. 0 0
      src/utils/ragflow/__init__.py
  74. 0 0
      src/utils/ragflow/agent_service.py
  75. 0 0
      src/utils/ragflow/chat_service.py
  76. 74 0
      src/utils/ragflow/chunk_record.py
  77. 0 0
      src/utils/ragflow/chunk_service.py
  78. 0 0
      src/utils/ragflow/dataset_service.py
  79. 0 0
      src/utils/ragflow/document_service.py
  80. 0 0
      src/utils/ragflow/file_service.py
  81. 0 0
      src/utils/ragflow/openai_service.py
  82. 9 9
      src/utils/ragflow/ragflow_service.py
  83. 6 6
      src/workflow/search/dataset_search_workflow.py
  84. 0 0
      test/test_mysql.py
  85. 0 0
      test/test_ragflow_http_api.py
  86. 0 0
      test/test_search.py
  87. 0 0
      test/test_upload_tag.py
  88. 0 9
      utils/ragflow_sdk/__init__.py
  89. 0 17
      utils/ragflow_sdk/base_util.py
  90. 0 80
      utils/ragflow_sdk/chunk_util.py
  91. 0 45
      utils/ragflow_sdk/conf/rag_parser_config.py
  92. 0 71
      utils/ragflow_sdk/dataset_util.py
  93. 0 93
      utils/ragflow_sdk/document_util.py

+ 6 - 4
.env

@@ -33,7 +33,9 @@ INFINITY_DATABASE=book_image_db
 INFINITY_USER=admin
 INFINITY_USER=admin
 INFINITY_PASSWORD=admin
 INFINITY_PASSWORD=admin
 INFINITY_TABLE_NAME=book_page_image
 INFINITY_TABLE_NAME=book_page_image
-INFINITY_PAGE_DATASET_ID=90d73295f02411f0a76b0242c0a85002
+INFINITY_RAGFLOW_DATABASE=default_db
+INFINITY_PAGE_DATASET_ID=68dc10bdf3b611f0b1bd0efbd173881b
+INFINITY_PAGE_TABLE_NAME=ragflow_67d93316eb1f11f0af39f2e22d6e0857_68dc10bdf3b611f0b1bd0efbd173881b
 
 
 # MySQL配置
 # MySQL配置
 MYSQL_HOST=192.168.0.4
 MYSQL_HOST=192.168.0.4
@@ -60,6 +62,6 @@ TAG_DOCUMENT_ID=3dda0a90f1e211f0a3b80242c0a85002
 TAG_TABLE_NAME=ragflow_92162247e93e11f084830242ac1d0002_18caf531f04d11f095670242c0a85002  6d2e0990f28b11f0b5200242c0a85002
 TAG_TABLE_NAME=ragflow_92162247e93e11f084830242ac1d0002_18caf531f04d11f095670242c0a85002  6d2e0990f28b11f0b5200242c0a85002
 
 
 # LANGFUSE
 # LANGFUSE
-LANGFUSE_PUBLIC_KEY=pk-lf-6918a148-be72-4211-a22d-183a23e6643e
-LANGFUSE_SECRET_KEY=sk-lf-9c64d7ed-1618-4da9-a775-33e39b05448e
-LANGFUSE_HOST=http://192.168.16.134:3000
+# LANGFUSE_PUBLIC_KEY=pk-lf-6918a148-be72-4211-a22d-183a23e6643e
+# LANGFUSE_SECRET_KEY=sk-lf-9c64d7ed-1618-4da9-a775-33e39b05448e
+# LANGFUSE_HOST=http://192.168.16.134:3000

+ 14 - 0
.trae/documents/修复DBUtils依赖问题.md

@@ -0,0 +1,14 @@
+## 1. 问题分析
+
+从错误信息和代码分析,我发现了以下几个问题:
+
+### 1.1 主要错误
+
+1. **`Connection.cursor() got an unexpected keyword argument 'cursorclass'`**
+   - 错误发生在 `mysql_conn.py` 文件中
+   - 原因:DBUtils 连接池返回的连接对象的 `cursor()` 方法不接受 `cursorclass` 参数
+   - 解决:移除 `cursor()` 方法中的 `cursorclass` 参数
+
+2. **`KeyError: 'id'`**
+   - 错误发生在 `chunk_update_job.py` 文件中
+   - 原因:查询

+ 21 - 0
.trae/documents/修复PDF解析工作流中的f-string格式错误.md

@@ -0,0 +1,21 @@
+## 问题分析
+
+在 `pdf_parser_workflow.py` 文件中,`_parse_single_page` 方法使用了 f-string 来构建提示词,但提示词中包含了用于定义 JSON 格式的字面量大括号 `{}`。在 f-string 中,大括号被用于插入变量,因此 Python 解释器会尝试将这些字面量大括号解析为格式说明符,导致 `Invalid format specifier` 错误。
+
+## 修复方案
+
+将提示词中所有用于定义 JSON 格式的字面量大括号 `{}` 替换为转义形式 `{{}}`,这样 Python 解释器就会将它们视为普通字符而不是格式说明符。
+
+## 具体修改位置
+
+- 文件:`/Users/alair/project/ai/graph_rag_server/src/parser/pdf_parser/pdf_parser_workflow.py`
+- 方法:`_parse_single_page`
+- 行号:282-314
+
+## 修复内容
+
+将提示词中的 JSON 示例部分的所有大括号替换为双大括号,例如:
+- `{` 替换为 `{{`
+- `}` 替换为 `}}`
+
+这样修改后,f-string 就会正确解析变量 `{page["page_number"]}`,同时将 JSON 格式中的大括号视为普通字符输出。

+ 1 - 0
D:\\project\\work\\ragflow_plugs\\book\\output\\temp\\request.txt

@@ -0,0 +1 @@
+{'method': 'POST', 'url': 'http://192.168.0.103:9380/api/v1/datasets/68dc10bdf3b611f0b1bd0efbd173881b/documents', 'data': None, 'json': None, 'headers': None, 'files': True}

BIN
__pycache__/main.cpython-312.pyc


+ 19 - 0
doc/init.sql

@@ -0,0 +1,19 @@
+CREATE TABLE IF NOT EXISTS ragflow_chunk_record (
+    id BIGINT AUTO_INCREMENT PRIMARY KEY COMMENT '主键ID',
+	dataset_name VARCHAR(64) NOT NULL COMMENT '数据库ID',
+	table_name VARCHAR(64) NOT NULL COMMENT '数据表ID',
+	chunk_id VARCHAR(64) NOT NULL COMMENT '分块ID',
+    cond VARCHAR(100) COMMENT '条件参数',
+    update_data JSON COMMENT '数据参数',
+    scheduled_time DATETIME NOT NULL COMMENT '计划执行时间(当前时间+20秒)',
+    status VARCHAR(20) NOT NULL DEFAULT '0' COMMENT '执行状态:0:未执行/1:执行成功/2:执行失败',
+	error_message VARCHAR(255) COMMENT '失败信息',
+	executed_time DATETIME COMMENT '执行时间',
+    created_at DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
+    updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
+    INDEX idx_dataset_name (dataset_name),
+    INDEX idx_table_name (table_name),
+    INDEX idx_chunk_id (chunk_id),
+    INDEX idx_scheduled_time (scheduled_time),
+    INDEX idx_status (status)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='RagflowChunk记录表';

+ 26 - 15
main.py

@@ -4,44 +4,55 @@ from fastapi import FastAPI
 from contextlib import asynccontextmanager
 from contextlib import asynccontextmanager
 
 
 # 导入所有子应用
 # 导入所有子应用
-from api.search_infinity import app as search_app
-from api.tag_manage import app as tag_app
-from api.sdk.dataset_manage import app as dataset_app
+from src.api.search_infinity import app as search_app
+from src.api.tag_manage import app as tag_app
+from src.api.sdk.dataset_manage import app as dataset_app
 
 
 # 定义主应用的生命周期管理
 # 定义主应用的生命周期管理
 @asynccontextmanager
 @asynccontextmanager
 async def main_lifespan(app: FastAPI):
 async def main_lifespan(app: FastAPI):
     """主应用生命周期管理"""
     """主应用生命周期管理"""
-    from utils.infinity import get_client, close_client
-    print("=== Infinity API Gateway 启动 ===")
+    from src.utils.infinity import get_client, close_client
     # 1. 初始化Infinity全局客户端(在服务启动时)
     # 1. 初始化Infinity全局客户端(在服务启动时)
     get_client(database="book_image_db")
     get_client(database="book_image_db")
     print("✅ Infinity客户端已初始化")
     print("✅ Infinity客户端已初始化")
     
     
     # 2. 初始化MySQL全局客户端
     # 2. 初始化MySQL全局客户端
-    from utils.mysql import init_global_mysql_client, close_global_mysql_client
+    from src.utils.mysql import init_global_mysql_client, close_global_mysql_client
     init_global_mysql_client()
     init_global_mysql_client()
     print("✅ MySQL客户端已初始化")
     print("✅ MySQL客户端已初始化")
     
     
     # 3. 初始化MinIO全局客户端并校验存储桶
     # 3. 初始化MinIO全局客户端并校验存储桶
-    from utils.file.minio.minio_util import init_minio_client, close_minio_client
+    from src.utils.file.minio.minio_util import init_minio_client, close_minio_client
     init_minio_client(check_bucket=True)
     init_minio_client(check_bucket=True)
     print("✅ MinIO客户端已初始化并校验存储桶")
     print("✅ MinIO客户端已初始化并校验存储桶")
+
+    # 4. 启动Chunk更新定时任务
+    from src.job.chunk_update_job import start_scheduler, shutdown_scheduler
+    start_scheduler()
+    print("✅ Chunk update scheduler started")
     
     
     yield
     yield
 
 
-    print("=== Infinity API Gateway 关闭 ===")
-    # 1. 关闭Infinity全局客户端(在服务关闭时)
+    # 1. 关闭Chunk更新定时任务
+    shutdown_scheduler()
+    print("✅ Chunk update scheduler shutdown")
+
+    # 2. 关闭MinIO全局客户端
+    close_minio_client()
+    print("✅ MinIO客户端已关闭")
+
+    # 3. 关闭MySQL全局客户端
+    close_global_mysql_client()
+    print("✅ MySQL客户端已关闭")
+
+    # 4. 关闭Infinity全局客户端(在服务关闭时)
     close_client()
     close_client()
     print("✅ Infinity客户端已关闭")
     print("✅ Infinity客户端已关闭")
     
     
-    # 2. 关闭MySQL全局客户端
-    close_global_mysql_client()
-    print("✅ MySQL客户端已关闭")
+
     
     
-    # 3. 关闭MinIO全局客户端
-    close_minio_client()
-    print("✅ MinIO客户端已关闭")
+
 
 
 # 创建主应用
 # 创建主应用
 main_app = FastAPI(
 main_app = FastAPI(

+ 4 - 1
requirements.txt

@@ -7,4 +7,7 @@ Pillow
 python-dotenv
 python-dotenv
 elasticsearch==8.11.1
 elasticsearch==8.11.1
 infinity-emb
 infinity-emb
-pymysql
+pymysql
+dbutils
+apscheduler
+dashscope

+ 0 - 0
.env.example → src/.env.example


+ 0 - 0
.gitignore → src/.gitignore


+ 0 - 0
HTTP API.md → src/HTTP API.md


+ 0 - 0
__init__.py → src/__init__.py


+ 3 - 9
agent/test_image_agent.py → src/agent/test_image_agent.py

@@ -1,15 +1,9 @@
 from langchain.chat_models import init_chat_model
 from langchain.chat_models import init_chat_model
 from langchain.messages import SystemMessage, HumanMessage
 from langchain.messages import SystemMessage, HumanMessage
-import sys
-import os
 from PIL import Image
 from PIL import Image
-
-# 添加项目根目录到Python路径
-sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
-
-from utils.infinity_util import InfinityVectorDB
-from model.multimodal_embedding import Embedding
-from conf.settings import model_settings, ragflow_settings, vector_db_settings
+from src.utils.infinity_util import InfinityVectorDB
+from src.model.multimodal_embedding import Embedding
+from src.conf.settings import model_settings, ragflow_settings, vector_db_settings
 
 
 system_prompt = """
 system_prompt = """
 【角色设定】 你是一位拥有丰富经验的儿童绘本编辑与阅读推广人,擅长从视觉美学、儿童心理学和文学创作三个维度深度解读绘本。
 【角色设定】 你是一位拥有丰富经验的儿童绘本编辑与阅读推广人,擅长从视觉美学、儿童心理学和文学创作三个维度深度解读绘本。

+ 0 - 0
api/__init__.py → src/api/__init__.py


+ 0 - 0
api/dataset/__init__.py → src/api/dataset/__init__.py


+ 0 - 0
api/dataset/services/__init__.py → src/api/dataset/services/__init__.py


+ 2 - 2
api/dataset/services/dataset_manage_service.py → src/api/dataset/services/dataset_manage_service.py

@@ -10,8 +10,8 @@
 import os
 import os
 import tempfile
 import tempfile
 from typing import Dict, Any, Optional
 from typing import Dict, Any, Optional
-from parser.pdf_parser.pdf_parser_workflow import PDFParsingWorkflow
-from conf.settings import vector_db_settings
+from src.parser.pdf_parser.pdf_parser_workflow import PDFParsingWorkflow
+from src.conf.settings import vector_db_settings
 
 
 
 
 class DatasetManageService:
 class DatasetManageService:

+ 0 - 0
api/db/__init__.py → src/api/db/__init__.py


+ 0 - 0
api/db/services/__init__.py → src/api/db/services/__init__.py


+ 5 - 5
api/db/services/infinity_search_service.py → src/api/db/services/infinity_search_service.py

@@ -1,9 +1,9 @@
 from typing import Dict, Any, List
 from typing import Dict, Any, List
-from conf.settings import vector_db_settings
-from utils.infinity import InfinityClient
-from utils.file.image_util import image_util
-from model.multimodal_embedding import get_embedding_model
-from utils.infinity.result_util import convert_to_basic_types
+from src.conf.settings import vector_db_settings
+from src.utils.infinity import InfinityClient
+from src.utils.file.image_util import image_util
+from src.model.multimodal_embedding import get_embedding_model
+from src.utils.infinity.result_util import convert_to_basic_types
 
 
 class InfinitySearchService:
 class InfinitySearchService:
     def __init__(self, infinity_client: InfinityClient, vector_field: str = None, match_field: str = None, match_type: str = None, table_name: str = None):
     def __init__(self, infinity_client: InfinityClient, vector_field: str = None, match_field: str = None, match_type: str = None, table_name: str = None):

+ 16 - 9
api/db/services/tag_service.py → src/api/db/services/tag_service.py

@@ -1,8 +1,8 @@
 from typing import List, Dict, Any, Optional
 from typing import List, Dict, Any, Optional
 from abc import ABC, abstractmethod
 from abc import ABC, abstractmethod
-from utils.ragflow_sdk.chunk_util import ChunkUtil
-from utils.infinity import InfinityClient
-from conf.settings import tag_search_settings
+from src.utils.ragflow.ragflow_service import RAGFlowService
+from src.utils.infinity import InfinityClient
+from src.conf.settings import tag_search_settings
 
 
 
 
 
 
@@ -61,8 +61,8 @@ class TagServiceImpl(TagService):
         """
         """
         self.tag_dataset_id=tag_search_settings.tag_dataset_id
         self.tag_dataset_id=tag_search_settings.tag_dataset_id
         self.tag_document_id=tag_search_settings.tag_document_id
         self.tag_document_id=tag_search_settings.tag_document_id
+        self.ragflow_service = RAGFlowService()
         self.infinity_client = infinity_client
         self.infinity_client = infinity_client
-        self.chunk_util = ChunkUtil(self.tag_dataset_id, self.tag_document_id)
     
     
     def create_tag(self, tag_data: Dict[str, Any]) -> Dict[str, Any]:
     def create_tag(self, tag_data: Dict[str, Any]) -> Dict[str, Any]:
         """
         """
@@ -78,8 +78,11 @@ class TagServiceImpl(TagService):
         tag_desc = tag_data["description"]
         tag_desc = tag_data["description"]
         age_range = tag_data["age_range"]
         age_range = tag_data["age_range"]
         # 步骤1:将分块数据写入ragflow
         # 步骤1:将分块数据写入ragflow
-        chunk = self.chunk_util.add_chunk(document_id=self.tag_document_id, content=tag_desc, important_keywords=[age_range])
-        chunk_id = chunk["id"]
+        chunk = self.ragflow_service.create_chunk(dataset_id=self.tag_dataset_id,
+                                                  document_id=self.tag_document_id,
+                                                  content=tag_desc,
+                                                  important_keywords=[age_range])
+        chunk_id = chunk["chunk"]["id"]
         # 步骤2: 调用infinity的update方法,将标签更新到块数据中
         # 步骤2: 调用infinity的update方法,将标签更新到块数据中
         res = self.infinity_client.update(f"id = {chunk_id}", {"tag_kwd": tag_name})
         res = self.infinity_client.update(f"id = {chunk_id}", {"tag_kwd": tag_name})
         if res["code"] != 0:
         if res["code"] != 0:
@@ -116,11 +119,15 @@ class TagServiceImpl(TagService):
             tag_desc = tag_data["description"]
             tag_desc = tag_data["description"]
             age_range = tag_data["age_range"]
             age_range = tag_data["age_range"]
             # 步骤1:将分块数据写入ragflow
             # 步骤1:将分块数据写入ragflow
-            chunk = self.chunk_util.add_chunk(dataset_name="标签", document_id=self.tag_document_id, content=tag_desc, important_keywords=[age_range])
-            print(f"分块数据写入成功, chunk_id: {chunk.id}")
+            chunk = self.ragflow_service.create_chunk(dataset_id=self.tag_dataset_id,
+                                                  document_id=self.tag_document_id,
+                                                  content=tag_desc,
+                                                  important_keywords=[age_range])
+            chunk_id = chunk["chunk"]["id"]        
+            print(f"分块数据写入成功, chunk_id: {chunk_id}")
             # 步骤2: 调用infinity的update方法,将标签更新到块数据中
             # 步骤2: 调用infinity的update方法,将标签更新到块数据中
             res = self.infinity_client.update(table_name="ragflow_92162247e93e11f084830242ac1d0002_18caf531f04d11f095670242c0a85002", 
             res = self.infinity_client.update(table_name="ragflow_92162247e93e11f084830242ac1d0002_18caf531f04d11f095670242c0a85002", 
-                                              cond=f"id = '{chunk.id}'", 
+                                              cond=f"id = '{chunk_id}'", 
                                               data={"tag_kwd": tag_name},
                                               data={"tag_kwd": tag_name},
                                               database_name="default_db")
                                               database_name="default_db")
             print(f"更新标签 {tag_name} 到 infinity 结果: {res}")
             print(f"更新标签 {tag_name} 到 infinity 结果: {res}")

+ 3 - 8
api/hybrid_search_mcp.py → src/api/hybrid_search_mcp.py

@@ -3,19 +3,14 @@
 混合检索MCP服务
 混合检索MCP服务
 使用fastmcp框架实现,提供图片解析后的向量化入库和混合检索功能
 使用fastmcp框架实现,提供图片解析后的向量化入库和混合检索功能
 """
 """
-
-import sys
-import os
 import requests
 import requests
 from io import BytesIO
 from io import BytesIO
 from typing import List, Dict, Any
 from typing import List, Dict, Any
 from fastmcp import FastMCP
 from fastmcp import FastMCP
-# 添加项目根目录到Python路径
-sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
 from PIL import Image
 from PIL import Image
-from utils.infinity_util import InfinityVectorDB
-from model.multimodal_embedding import Embedding
-from conf.settings import model_settings, ragflow_settings, vector_db_settings
+from src.utils.infinity_util import InfinityVectorDB
+from src.model.multimodal_embedding import Embedding
+from src.conf.settings import model_settings, ragflow_settings, vector_db_settings
 
 
 
 
 
 

+ 0 - 0
api/sdk/__init__.py → src/api/sdk/__init__.py


+ 2 - 2
api/sdk/dataset_manage.py → src/api/sdk/dataset_manage.py

@@ -8,7 +8,7 @@
 
 
 from fastapi import FastAPI, HTTPException, UploadFile, File, Form
 from fastapi import FastAPI, HTTPException, UploadFile, File, Form
 from typing import Dict, Any
 from typing import Dict, Any
-from api.dataset.services.dataset_manage_service import DatasetManageService
+from src.api.dataset.services.dataset_manage_service import DatasetManageService
 
 
 
 
 # 创建 FastAPI 应用
 # 创建 FastAPI 应用
@@ -49,7 +49,7 @@ async def parse_pdf(
             pdf_filename=file.filename
             pdf_filename=file.filename
         )
         )
         
         
-        return {"success": True, "result": result}
+        return {"success": True}
     except HTTPException as e:
     except HTTPException as e:
         raise e
         raise e
     except Exception as e:
     except Exception as e:

+ 2 - 2
api/search_infinity.py → src/api/search_infinity.py

@@ -2,8 +2,8 @@
 
 
 from fastapi import FastAPI, HTTPException
 from fastapi import FastAPI, HTTPException
 from typing import List, Dict, Any, Optional
 from typing import List, Dict, Any, Optional
-from api.db.services.infinity_search_service import InfinitySearchService
-from utils.infinity import get_client
+from src.api.db.services.infinity_search_service import InfinitySearchService
+from src.utils.infinity import get_client
 
 
 
 
 # 创建FastAPI应用
 # 创建FastAPI应用

+ 6 - 8
api/tag_manage.py → src/api/tag_manage.py

@@ -1,14 +1,12 @@
 # 标签管理API服务
 # 标签管理API服务
-
-from fastapi import FastAPI, HTTPException, UploadFile, File
-from typing import List, Dict, Any, Optional
-from api.db.services.tag_service import TagServiceFactory
-from utils.infinity import get_client
-from utils.excel_util import excel_util
-from pydantic import BaseModel
 import os
 import os
 import tempfile
 import tempfile
-
+from pydantic import BaseModel
+from fastapi import FastAPI, HTTPException, UploadFile, File
+from typing import List, Dict, Any, Optional
+from src.api.db.services.tag_service import TagServiceFactory
+from src.utils.infinity import get_client
+from src.utils.excel_util import excel_util
 
 
 # 创建FastAPI应用
 # 创建FastAPI应用
 app = FastAPI(
 app = FastAPI(

+ 0 - 1
common/models/pagination.py → src/common/models/pagination.py

@@ -1,5 +1,4 @@
 from pydantic import BaseModel, Field
 from pydantic import BaseModel, Field
-from typing import Optional
 
 
 class Pagination(BaseModel):
 class Pagination(BaseModel):
     """通用分页与过滤模型"""
     """通用分页与过滤模型"""

+ 0 - 0
conf/__init__.py → src/conf/__init__.py


+ 0 - 0
conf/age_level.json → src/conf/age_level.json


+ 0 - 0
conf/config.py → src/conf/config.py


+ 0 - 0
conf/infinity_mapping.json → src/conf/infinity_mapping.json


+ 0 - 0
conf/rag_parser_config.py → src/conf/rag_parser_config.py


+ 2 - 0
conf/settings.py → src/conf/settings.py

@@ -74,6 +74,8 @@ class VectorDBSettings(BaseSettings):
     infinity_database: str = Field(default="test", alias="INFINITY_DATABASE")
     infinity_database: str = Field(default="test", alias="INFINITY_DATABASE")
     infinity_table_name: str = Field(default="test", alias="INFINITY_TABLE_NAME")
     infinity_table_name: str = Field(default="test", alias="INFINITY_TABLE_NAME")
     infinity_page_dataset_id: str = Field(default="", alias="INFINITY_PAGE_DATASET_ID")
     infinity_page_dataset_id: str = Field(default="", alias="INFINITY_PAGE_DATASET_ID")
+    infinity_page_table_name: str = Field(default="", alias="INFINITY_PAGE_TABLE_NAME")
+    infinity_ragflow_database: str = Field(default="default_db", alias="INFINITY_RAGFLOW_DATABASE")
     
     
     model_config = SettingsConfigDict(
     model_config = SettingsConfigDict(
         env_file=".env",
         env_file=".env",

+ 0 - 0
ee.json → src/ee.json


+ 131 - 0
src/job/chunk_update_job.py

@@ -0,0 +1,131 @@
+"""
+Chunk 更新定时任务
+
+该模块负责处理 ragflow_chunk_record 表中的定时任务,包括:
+- 定期查询到期的任务
+- 执行任务逻辑
+- 更新任务状态
+"""
+import time
+import json
+from datetime import datetime
+from apscheduler.schedulers.background import BackgroundScheduler
+from apscheduler.triggers.interval import IntervalTrigger
+from src.utils.mysql import get_global_mysql_client
+from src.utils.infinity import get_client
+
+# 初始化调度器
+_scheduler = None
+
+class ChunkUpdateJob:
+    """Chunk 更新定时任务服务"""
+    
+    def __init__(self):
+        """初始化定时任务服务"""
+        self.mysql_client = get_global_mysql_client()
+        self.infinity_client = get_client()
+    
+    def process_due_tasks(self):
+        """处理到期的任务"""
+        try:
+            # 查询状态为"未执行"且计划时间小于等于当前时间的任务
+            current_time = datetime.now()
+            tasks = self.mysql_client.fetch_all(
+                "SELECT id, database_name, table_name, chunk_id, cond, update_data FROM ragflow_chunk_record "
+                "WHERE status = %s AND scheduled_time <= %s",
+                ["未执行", current_time]
+            )
+            
+            # 处理每个任务
+            for task in tasks:
+                task_id = task["id"]
+                database_name = task["database_name"]
+                table_name = task["table_name"]
+                chunk_id = task["chunk_id"]
+                cond = task["cond"]
+                data = task["update_data"]
+                
+                try:
+                    # 执行任务逻辑
+                    self._execute_task(database_name, table_name, chunk_id, cond, data)
+                    
+                    # 更新任务状态为"已执行"
+                    self.mysql_client.execute(
+                        "UPDATE ragflow_chunk_record SET status = %s, executed_time = %s WHERE id = %s",
+                        ["已执行", datetime.now(), task_id]
+                    )
+                    
+                    print(f"Task {task_id} executed successfully")
+                except Exception as e:
+                    # 更新任务状态为"执行失败"
+                    self.mysql_client.execute(
+                        "UPDATE ragflow_chunk_record SET status = %s, error_message = %s, executed_time = %s WHERE id = %s",
+                        ["执行失败", str(e), datetime.now(), task_id]
+                    )
+                    
+                    print(f"Task {task_id} execution failed: {e}")
+        except Exception as e:
+            print(f"Failed to process due tasks: {e}")
+    
+    def _execute_task(self, database_name: str, table_name: str, chunk_id: str, 
+                      cond: str, data: dict) -> None:
+        """
+        执行具体的任务逻辑
+        
+        Args:
+            database_name: 数据库名称
+            table_name: 表名称
+            chunk_id: Chunk ID
+            cond: 条件字符串
+            data: 数据字典
+        """
+        # TODO: 根据业务需求实现具体的任务执行逻辑
+        # 示例:更新 Infinity 中的 Chunk 数据
+        try:
+            # 使用 Infinity 客户端执行更新操作
+            # 这里需要根据实际的 Infinity API 进行调整
+            if cond and data:
+                self.infinity_client.update(
+                    table_name=table_name,
+                    cond=cond,
+                    data=json.loads(data),
+                    database_name=database_name
+                )
+            print(f"Updated chunk {chunk_id} in {database_name}.{table_name}")
+        except Exception as e:
+            raise Exception(f"Failed to update chunk {chunk_id}: {e}")
+
+
+def start_scheduler():
+    """启动定时任务调度器"""
+    global _scheduler
+    
+    if _scheduler is None:
+        # 创建调度器
+        _scheduler = BackgroundScheduler()
+        
+        # 创建任务实例
+        chunk_update_job = ChunkUpdateJob()
+        
+        # 添加定时任务,每5秒执行一次
+        _scheduler.add_job(
+            func=chunk_update_job.process_due_tasks,
+            trigger=IntervalTrigger(seconds=30),
+            id="chunk_update_job",
+            name="Process due chunk update tasks",
+            replace_existing=True
+        )
+        
+        # 启动调度器
+        _scheduler.start()
+        print("✅ Chunk update scheduler started")
+
+
+def shutdown_scheduler():
+    """关闭定时任务调度器"""
+    global _scheduler
+    
+    if _scheduler is not None:
+        _scheduler.shutdown()
+        _scheduler = None
+        print("✅ Chunk update scheduler shutdown")

+ 0 - 0
model/__init__.py → src/model/__init__.py


+ 0 - 0
model/jina_rerank.py → src/model/jina_rerank.py


+ 2 - 2
model/multimodal_embedding.py → src/model/multimodal_embedding.py

@@ -4,9 +4,9 @@ import base64
 import io
 import io
 from langchain_openai import OpenAIEmbeddings
 from langchain_openai import OpenAIEmbeddings
 from dashscope import MultiModalEmbedding
 from dashscope import MultiModalEmbedding
-from conf.settings import model_settings
+from src.conf.settings import model_settings
 from langfuse import observe
 from langfuse import observe
-from utils.file.image_util import image_util
+from src.utils.file.image_util import image_util
 
 
 class Embedding:
 class Embedding:
     """Embedding模型工具"""
     """Embedding模型工具"""

+ 1 - 1
model/openai_chat_model.py → src/model/openai_chat_model.py

@@ -1,7 +1,7 @@
 from typing import Optional, Dict, Any
 from typing import Optional, Dict, Any
 from langchain.chat_models import init_chat_model
 from langchain.chat_models import init_chat_model
 from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
 from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
-from conf.settings import model_settings
+from src.conf.settings import model_settings
 
 
 class OpenAIChatModel:
 class OpenAIChatModel:
     """
     """

+ 1 - 1
model/qwen_vl.py → src/model/qwen_vl.py

@@ -3,7 +3,7 @@ from PIL import Image
 import base64
 import base64
 import io
 import io
 from langchain.chat_models import init_chat_model
 from langchain.chat_models import init_chat_model
-from conf.settings import model_settings
+from src.conf.settings import model_settings
 from langfuse.langchain import CallbackHandler
 from langfuse.langchain import CallbackHandler
 
 
 class QWenVLParser:
 class QWenVLParser:

+ 1 - 1
model/tracked_multi_embedding.py → src/model/tracked_multi_embedding.py

@@ -1,6 +1,6 @@
 from model.multimodal_embedding import Embedding
 from model.multimodal_embedding import Embedding
 from PIL import Image
 from PIL import Image
-from utils.decorators.langfuse_trace_embedding import langfuse_trace_embedding
+from src.utils.decorators.langfuse_trace_embedding import langfuse_trace_embedding
 
 
 class TrackedMultiEmbeddings(Embedding):
 class TrackedMultiEmbeddings(Embedding):
     
     

+ 1 - 1
model/tracked_openai_embeddings.py → src/model/tracked_openai_embeddings.py

@@ -1,5 +1,5 @@
 from langchain_openai import OpenAIEmbeddings
 from langchain_openai import OpenAIEmbeddings
-from utils.decorators.langfuse_trace_embedding import langfuse_trace_embedding
+from src.utils.decorators.langfuse_trace_embedding import langfuse_trace_embedding
 
 
 class TrackedOpenAIEmbeddings(OpenAIEmbeddings):
 class TrackedOpenAIEmbeddings(OpenAIEmbeddings):
     
     

+ 0 - 0
model_output.md → src/model_output.md


+ 0 - 0
parser/image_parser/__init__.py → src/parser/image_parser/__init__.py


+ 6 - 12
parser/image_parser/image_parser_workflow.py → src/parser/image_parser/image_parser_workflow.py

@@ -2,25 +2,19 @@
 """
 """
 图片解析工作流
 图片解析工作流
 """
 """
-
-import sys
-import os
 import concurrent.futures
 import concurrent.futures
 from concurrent.futures import ThreadPoolExecutor
 from concurrent.futures import ThreadPoolExecutor
 from PIL import Image
 from PIL import Image
 import requests
 import requests
-# 添加项目根目录到Python路径
-sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
-
 from langgraph.graph import StateGraph, START, END
 from langgraph.graph import StateGraph, START, END
 from typing import List, Dict, Any, Annotated
 from typing import List, Dict, Any, Annotated
 from pydantic import BaseModel, Field, ConfigDict
 from pydantic import BaseModel, Field, ConfigDict
-from model.qwen_vl import QWenVLParser
-from utils.ragflow.ragflow_service import RAGFlowService
-from model.multimodal_embedding import Embedding
-from utils.minio.image_util import image_util
-from conf.settings import model_settings
-from utils.infinity import get_client
+from src.model.qwen_vl import QWenVLParser
+from src.utils.ragflow.ragflow_service import RAGFlowService
+from src.model.multimodal_embedding import Embedding
+from src.utils.file.image_util import image_util
+from src.conf.settings import model_settings
+from src.utils.infinity import get_client
 
 
 # 定义工作流状态类
 # 定义工作流状态类
 class ImageParsingState(BaseModel):
 class ImageParsingState(BaseModel):

+ 0 - 0
parser/pdf_parser/__init__.py → src/parser/pdf_parser/__init__.py


+ 36 - 28
parser/pdf_parser/pdf_parser_workflow.py → src/parser/pdf_parser/pdf_parser_workflow.py

@@ -5,14 +5,15 @@ from concurrent.futures import ThreadPoolExecutor
 from langgraph.graph import StateGraph, START, END
 from langgraph.graph import StateGraph, START, END
 from typing import List, Dict, Any
 from typing import List, Dict, Any
 from pydantic import BaseModel, Field, ConfigDict
 from pydantic import BaseModel, Field, ConfigDict
-from parser.pdf_parser.pdf_splitter import PDFSplitter
-from model.qwen_vl import QWenVLParser
-from utils.ragflow.ragflow_service import RAGFlowService
-from model.multimodal_embedding import Embedding
-from conf.settings import model_settings, vector_db_settings
-from utils.infinity import get_client
+from src.parser.pdf_parser.pdf_splitter import PDFSplitter
+from src.model.qwen_vl import QWenVLParser
+from src.utils.ragflow.ragflow_service import RAGFlowService
+from src.utils.ragflow.chunk_record import get_chunk_record_service
+from src.model.multimodal_embedding import Embedding
+from src.conf.settings import model_settings, vector_db_settings, minio_settings
+from src.utils.infinity import get_client
 from langfuse.langchain import CallbackHandler
 from langfuse.langchain import CallbackHandler
-from conf.rag_parser_config import RagParserDefaults
+from src.conf.rag_parser_config import RagParserDefaults
 
 
 # 定义工作流状态类
 # 定义工作流状态类
 class PDFParsingState(BaseModel):
 class PDFParsingState(BaseModel):
@@ -258,9 +259,12 @@ class PDFParsingWorkflow:
     
     
     def _parse_single_page(self, page: Dict[str, Any], model_name: str) -> Dict[str, Any]:
     def _parse_single_page(self, page: Dict[str, Any], model_name: str) -> Dict[str, Any]:
         """解析单个页面(用于并行处理)"""
         """解析单个页面(用于并行处理)"""
-        prompt = """
+        page_number = page["page_number"]
+        image = page["image"]
+        prompt = f"""
             角色定位:你是一位顶尖的儿童绘本分析师与视觉工程专家,擅长将插画视觉信息转化为高精度的结构化元数据。
             角色定位:你是一位顶尖的儿童绘本分析师与视觉工程专家,擅长将插画视觉信息转化为高精度的结构化元数据。
             任务描述:请深度解析提供的绘本页面,不仅提取基本要素,还要进行“像素级”的特征拆解。重点关注角色的微表情、服饰纹理、环境光效、构图视角及整体艺术风格。
             任务描述:请深度解析提供的绘本页面,不仅提取基本要素,还要进行“像素级”的特征拆解。重点关注角色的微表情、服饰纹理、环境光效、构图视角及整体艺术风格。
+            当前提取页码为:{page_number}
             提取维度:
             提取维度:
             艺术风格 (Style):包括笔触(如水彩、蜡笔)、线条粗细、整体色调偏好。
             艺术风格 (Style):包括笔触(如水彩、蜡笔)、线条粗细、整体色调偏好。
             角色特征 (Character):五官细节、肢体动作的动态感、衣物材质、标志性配饰。
             角色特征 (Character):五官细节、肢体动作的动态感、衣物材质、标志性配饰。
@@ -277,44 +281,41 @@ class PDFParsingWorkflow:
             描述精度:单条描述需包含具体视觉属性(颜色、形状、质感),字数控制在50字以内。
             描述精度:单条描述需包含具体视觉属性(颜色、形状、质感),字数控制在50字以内。
             格式要求:严谨按照指定的JSON结构输出。  
             格式要求:严谨按照指定的JSON结构输出。  
             json格式:
             json格式:
-            {
-                "page_meta": {
-                "page_number": 1,
+            {{
+                "page_meta": {{
+                "page_number": {page_number},
                 "content_text": "页面原文本内容",
                 "content_text": "页面原文本内容",
-                "overall_style": {
+                "overall_style": {{
                     "art_medium": "艺术媒介(如:手绘水彩、矢量平涂、3D渲染)",
                     "art_medium": "艺术媒介(如:手绘水彩、矢量平涂、3D渲染)",
                     "color_palette": ["主色调1", "主色调2"],
                     "color_palette": ["主色调1", "主色调2"],
                     "lighting": "光影描述(如:柔和侧光、清晨自然光)",
                     "lighting": "光影描述(如:柔和侧光、清晨自然光)",
                     "composition": "构图(如:三分法、对角线构图、大远景)"
                     "composition": "构图(如:三分法、对角线构图、大远景)"
-                    }
-                },
+                    }}
+                }},
                 "elements": [
                 "elements": [
-                    {
+                    {{
                         "element_name": "元素名称(如:小兔子)",
                         "element_name": "元素名称(如:小兔子)",
                         "character_name": "角色名称(如果有,没有的话,角色名称为空字符串)",
                         "character_name": "角色名称(如果有,没有的话,角色名称为空字符串)",
                         "category": "分类(角色/场景/道具)",
                         "category": "分类(角色/场景/道具)",
                         "spatial_layer": "所在层级(前景/中景/背景)",
                         "spatial_layer": "所在层级(前景/中景/背景)",
-                        "visual_attributes": {
+                        "visual_attributes": {{
                             "appearance": "外貌细节描述(发型、五官、材质感)",
                             "appearance": "外貌细节描述(发型、五官、材质感)",
                             "action_emotion": "行为动作与情感流露",
                             "action_emotion": "行为动作与情感流露",
                             "color_detail": "像素级颜色描述(如:淡茱萸粉、薄荷绿)",
                             "color_detail": "像素级颜色描述(如:淡茱萸粉、薄荷绿)",
                             "ability_tag": "如果为角色,其表现出的正面能力/特质"
                             "ability_tag": "如果为角色,其表现出的正面能力/特质"
-                        },
-                        "content_tags": {
+                        }},
+                        "content_tags": {{
                             "theme": ["自然", "社交", "生活常识"], 
                             "theme": ["自然", "社交", "生活常识"], 
                             "object": ["动物", "服装", "植物"],
                             "object": ["动物", "服装", "植物"],
                             "emotion": ["快乐", "勇敢"]
                             "emotion": ["快乐", "勇敢"]
-                        },
+                        }},
                         "ability_tags": ["语言表达", "逻辑思维", "自我认知"],
                         "ability_tags": ["语言表达", "逻辑思维", "自我认知"],
                         "description": "综合性简洁描述(50字内)"
                         "description": "综合性简洁描述(50字内)"
-                    }
+                    }}
                 ]
                 ]
-            }
+            }}
             """
             """
         
         
-        page_number = page["page_number"]
-        image = page["image"]
-        
         print(f"开始解析第 {page_number} 页")
         print(f"开始解析第 {page_number} 页")
         
         
         # 使用QWEN VL模型解析图像
         # 使用QWEN VL模型解析图像
@@ -380,6 +381,8 @@ class PDFParsingWorkflow:
             page_number = parsed_result.get("page_number")
             page_number = parsed_result.get("page_number")
             text = parsed_result.get("content", "")
             text = parsed_result.get("content", "")
             image_path = state.split_pages[i].get("image_path")
             image_path = state.split_pages[i].get("image_path")
+            # 截取url中的图片名
+            img_id = f"{vector_db_settings.infinity_page_dataset_id}-{os.path.basename(image_path).split(".")[0]}"
             
             
             # 上传单页到RagFlow Chunk
             # 上传单页到RagFlow Chunk
             chunk = self.ragflow_service.create_chunk(dataset_id=state.page_dataset_id, 
             chunk = self.ragflow_service.create_chunk(dataset_id=state.page_dataset_id, 
@@ -387,10 +390,15 @@ class PDFParsingWorkflow:
                                               content=text)
                                               content=text)
             chunk_id = chunk["chunk"]["id"]
             chunk_id = chunk["chunk"]["id"]
             print(f"上传第 {page_number} 页,Chunk ID: {chunk_id}")
             print(f"上传第 {page_number} 页,Chunk ID: {chunk_id}")
-            # # 睡眠50ms,避免上传过快
-            # time.sleep(0.05)
-            # result = get_client().update(database_name=state.dataset_name, table_name="", cond=f"id = '{chunk_id}'", data={"img_id": img_id})
-            # print(f"更新第 {page_number} 页,Chunk ID: {chunk_id},结果: {result}")
+
+            # 记录到定时任务表
+            get_chunk_record_service().record_chunk_add(
+                database_name=vector_db_settings.infinity_ragflow_database,
+                table_name=vector_db_settings.infinity_page_table_name,
+                chunk_id=chunk_id,
+                cond=f"id = '{chunk_id}'",
+                data={"img_id": img_id}
+            )
 
 
     def _vectorize_store_node(self, state: PDFParsingState) -> Dict[str, Any]:
     def _vectorize_store_node(self, state: PDFParsingState) -> Dict[str, Any]:
         """向量化入库节点"""
         """向量化入库节点"""

+ 0 - 0
parser/pdf_parser/pdf_splitter.py → src/parser/pdf_parser/pdf_splitter.py


+ 0 - 0
parser/pdf_parser/test_service.py → src/parser/pdf_parser/test_service.py


+ 0 - 0
private_key.pem → src/private_key.pem


+ 0 - 0
prompt/parser/page_parse_prompt.py → src/prompt/parser/page_parse_prompt.py


+ 0 - 0
public_key.pem → src/public_key.pem


+ 0 - 0
utils/__init__.py → src/utils/__init__.py


+ 0 - 0
utils/asymmetric_encryption.py → src/utils/asymmetric_encryption.py


+ 0 - 0
utils/decorators/langfuse_trace_embedding.py → src/utils/decorators/langfuse_trace_embedding.py


+ 0 - 0
utils/decorators/singleton.py → src/utils/decorators/singleton.py


+ 0 - 0
utils/excel_util.py → src/utils/excel_util.py


+ 0 - 0
utils/file/minio/__init__.py → src/utils/file/__init__.py


+ 1 - 1
utils/file/file_utils.py → src/utils/file/file_utils.py

@@ -4,5 +4,5 @@ import os
 def generate_unique_filename(filename: str) -> str:
 def generate_unique_filename(filename: str) -> str:
     """生成唯一文件名,避免重复"""
     """生成唯一文件名,避免重复"""
     ext = os.path.splitext(filename)[1]
     ext = os.path.splitext(filename)[1]
-    unique_id = str(uuid.uuid4())
+    unique_id = str(uuid.uuid4()).replace("-", "")
     return f"{unique_id}{ext}"
     return f"{unique_id}{ext}"

+ 2 - 2
utils/file/image_util.py → src/utils/file/image_util.py

@@ -8,8 +8,8 @@ import re
 from typing import List
 from typing import List
 from io import BytesIO
 from io import BytesIO
 from PIL import Image
 from PIL import Image
-from utils.file.minio.minio_util import MinIOUtil
-from utils.file.file_utils import generate_unique_filename
+from src.utils.file.minio.minio_util import MinIOUtil
+from src.utils.file.file_utils import generate_unique_filename
 
 
 
 
 class ImageUtil:
 class ImageUtil:

+ 0 - 0
utils/ragflow/__init__.py → src/utils/file/minio/__init__.py


+ 2 - 2
utils/file/minio/minio_util.py → src/utils/file/minio/minio_util.py

@@ -1,8 +1,8 @@
 from minio import Minio
 from minio import Minio
 from typing import BinaryIO
 from typing import BinaryIO
 from datetime import timedelta
 from datetime import timedelta
-from conf.settings import minio_settings
-from utils.file.file_utils import generate_unique_filename
+from src.conf.settings import minio_settings
+from src.utils.file.file_utils import generate_unique_filename
 
 
 # 全局MinIO客户端实例
 # 全局MinIO客户端实例
 _global_minio_client = None
 _global_minio_client = None

+ 0 - 0
utils/http_client.py → src/utils/http_client.py


+ 0 - 0
utils/infinity/README.md → src/utils/infinity/README.md


+ 0 - 0
utils/infinity/__init__.py → src/utils/infinity/__init__.py


+ 1 - 1
utils/infinity/client.py → src/utils/infinity/client.py

@@ -3,7 +3,7 @@ from infinity.common import ConflictType
 from typing import Dict, Any, List, Optional
 from typing import Dict, Any, List, Optional
 import threading
 import threading
 from contextlib import contextmanager
 from contextlib import contextmanager
-from conf.settings import vector_db_settings
+from src.conf.settings import vector_db_settings
 from .pool import InfinityConnectionPool
 from .pool import InfinityConnectionPool
 
 
 class InfinityClient:
 class InfinityClient:

+ 1 - 1
utils/infinity/pool.py → src/utils/infinity/pool.py

@@ -4,7 +4,7 @@ from typing import Dict, Any, List, Optional
 import threading
 import threading
 import time
 import time
 from contextlib import contextmanager
 from contextlib import contextmanager
-from conf.settings import vector_db_settings
+from src.conf.settings import vector_db_settings
 
 
 class InfinityConnectionPool:
 class InfinityConnectionPool:
     """
     """

+ 0 - 0
utils/infinity/result_util.py → src/utils/infinity/result_util.py


+ 0 - 0
utils/infinity/test_infinity.py → src/utils/infinity/test_infinity.py


+ 0 - 0
utils/mysql/__init__.py → src/utils/mysql/__init__.py


+ 3 - 3
utils/mysql/mysql_conn.py → src/utils/mysql/mysql_conn.py

@@ -13,7 +13,7 @@ from pymysql.cursors import DictCursor
 from typing import Any, List, Dict, Optional, Union
 from typing import Any, List, Dict, Optional, Union
 from contextlib import contextmanager
 from contextlib import contextmanager
 from .mysql_pool import get_mysql_pool, MySQLPool
 from .mysql_pool import get_mysql_pool, MySQLPool
-from utils.decorators.singleton import singleton
+from src.utils.decorators.singleton import singleton
 
 
 @singleton
 @singleton
 class MySQLConnection:
 class MySQLConnection:
@@ -66,13 +66,13 @@ class MySQLConnection:
         获取游标上下文管理器
         获取游标上下文管理器
         
         
         Args:
         Args:
-            cursorclass: 游标类型
+            cursorclass: 游标类型,默认为DictCursor
         
         
         Yields:
         Yields:
             MySQL 游标对象
             MySQL 游标对象
         """
         """
         conn = self._get_connection()
         conn = self._get_connection()
-        cursor = conn.cursor(cursorclass=cursorclass)
+        cursor = conn.cursor(cursorclass)
         
         
         try:
         try:
             yield cursor
             yield cursor

+ 1 - 1
utils/mysql/mysql_pool.py → src/utils/mysql/mysql_pool.py

@@ -10,7 +10,7 @@ MySQL 连接池配置
 import pymysql
 import pymysql
 from pymysql.cursors import DictCursor
 from pymysql.cursors import DictCursor
 from dbutils.pooled_db import PooledDB
 from dbutils.pooled_db import PooledDB
-from conf.settings import mysql_settings
+from src.conf.settings import mysql_settings
 
 
 # 单例装饰器
 # 单例装饰器
 class singleton:
 class singleton:

+ 0 - 0
utils/ragflow_sdk/param.json → src/utils/ragflow/__init__.py


+ 0 - 0
utils/ragflow/agent_service.py → src/utils/ragflow/agent_service.py


+ 0 - 0
utils/ragflow/chat_service.py → src/utils/ragflow/chat_service.py


+ 74 - 0
src/utils/ragflow/chunk_record.py

@@ -0,0 +1,74 @@
+"""
+RagFlow Chunk 上传记录管理模块
+
+该模块负责处理 RagFlow Chunk 上传记录的数据库操作,包括:
+- 记录 Chunk 上传任务到定时任务表
+- 提供统一的接口供外部调用
+"""
+import json
+from typing import Dict, Any, Optional
+from src.utils.mysql import get_global_mysql_client
+from datetime import datetime, timedelta
+
+class ChunkRecordService:
+    """Chunk 上传记录服务"""
+    
+    def __init__(self):
+        """初始化 Chunk 记录服务"""
+        self.mysql_client = get_global_mysql_client()
+    
+    def record_chunk_add(self, database_name: str, table_name: str, chunk_id: str,
+                         cond: Optional[str] = None, data: Dict[str, Any] = {}) -> None:
+        """
+        记录 Chunk 上传信息到 MySQL 定时任务表
+        
+        Args:
+            database_name: 数据库 ID
+            table_name: 数据表 ID
+            chunk_id: 上传成功返回的 Chunk ID
+            image_path: 图片路径
+            cond: 条件字符串,由调用方传递
+            data: 数据字符串,由调用方传递或自动生成
+        """
+        # 计算20秒后的时间
+        scheduled_time = datetime.now() + timedelta(seconds=60)
+
+        # 判断cond与data不能为空
+        if not cond and not data:
+            raise ValueError("cond and data must be provided")
+        
+        # 准备插入数据(所有记录status固定为"未执行")
+        insert_data = {
+            "database_name": database_name,
+            "table_name": table_name,
+            "chunk_id": chunk_id,
+            "cond": cond,
+            "update_data": json.dumps(data),
+            "scheduled_time": scheduled_time,
+            "status": "未执行"
+        }
+        
+        try:
+            # 插入记录到 MySQL 定时任务表
+            self.mysql_client.execute(
+                "INSERT INTO ragflow_chunk_record (database_name, table_name, chunk_id, cond, update_data, scheduled_time, status) "
+                "VALUES (%(database_name)s, %(table_name)s, %(chunk_id)s, %(cond)s, %(update_data)s, %(scheduled_time)s, %(status)s)",
+                insert_data
+            )
+        except Exception as e:
+            raise Exception(f"Failed to record chunk upload: {e}")
+
+# 创建全局实例
+_chunk_record_service = None
+
+def get_chunk_record_service() -> ChunkRecordService:
+    """
+    获取 Chunk 记录服务实例(单例模式)
+    
+    Returns:
+        ChunkRecordService 实例
+    """
+    global _chunk_record_service
+    if _chunk_record_service is None:
+        _chunk_record_service = ChunkRecordService()
+    return _chunk_record_service

+ 0 - 0
utils/ragflow/chunk_service.py → src/utils/ragflow/chunk_service.py


+ 0 - 0
utils/ragflow/dataset_service.py → src/utils/ragflow/dataset_service.py


+ 0 - 0
utils/ragflow/document_service.py → src/utils/ragflow/document_service.py


+ 0 - 0
utils/ragflow/file_service.py → src/utils/ragflow/file_service.py


+ 0 - 0
utils/ragflow/openai_service.py → src/utils/ragflow/openai_service.py


+ 9 - 9
utils/ragflow/ragflow_service.py → src/utils/ragflow/ragflow_service.py

@@ -5,15 +5,15 @@ from dataclasses import dataclass
 
 
 sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
 sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
 
 
-from utils.http_client import HTTPClient
-from conf.settings import ragflow_settings
-from utils.ragflow.dataset_service import DatasetService
-from utils.ragflow.document_service import DocumentService
-from utils.ragflow.chunk_service import ChunkService
-from utils.ragflow.chat_service import ChatService
-from utils.ragflow.agent_service import AgentService
-from utils.ragflow.file_service import FileService
-from utils.ragflow.openai_service import OpenAICompatibleService
+from src.utils.http_client import HTTPClient
+from src.conf.settings import ragflow_settings
+from src.utils.ragflow.dataset_service import DatasetService
+from src.utils.ragflow.document_service import DocumentService
+from src.utils.ragflow.chunk_service import ChunkService
+from src.utils.ragflow.chat_service import ChatService
+from src.utils.ragflow.agent_service import AgentService
+from src.utils.ragflow.file_service import FileService
+from src.utils.ragflow.openai_service import OpenAICompatibleService
 
 
 @dataclass
 @dataclass
 class DocumentInfo:
 class DocumentInfo:

+ 6 - 6
workflow/search/dataset_search_workflow.py → src/workflow/search/dataset_search_workflow.py

@@ -2,12 +2,12 @@ from concurrent.futures import ThreadPoolExecutor
 from langgraph.graph import StateGraph, START, END
 from langgraph.graph import StateGraph, START, END
 from typing import List, Dict, Any
 from typing import List, Dict, Any
 from pydantic import BaseModel, Field, ConfigDict
 from pydantic import BaseModel, Field, ConfigDict
-from model.qwen_vl import QWenVLParser
-from model.openai_chat_model import OpenAIChatModel
-from utils.ragflow.ragflow_service import RAGFlowService
-from model.multimodal_embedding import Embedding
-from conf.settings import model_settings, vector_db_settings
-from utils.infinity import get_client
+from src.model.qwen_vl import QWenVLParser
+from src.model.openai_chat_model import OpenAIChatModel
+from src.utils.ragflow.ragflow_service import RAGFlowService
+from src.model.multimodal_embedding import Embedding
+from src.conf.settings import model_settings, vector_db_settings
+from src.utils.infinity import get_client
 from langfuse.langchain import CallbackHandler
 from langfuse.langchain import CallbackHandler
 
 
 # 定义工作流状态类
 # 定义工作流状态类

+ 0 - 0
test/test_mysql.py


+ 0 - 0
test_ragflow_http_api.py → test/test_ragflow_http_api.py


+ 0 - 0
test_search.py → test/test_search.py


+ 0 - 0
test_upload_tag.py → test/test_upload_tag.py


+ 0 - 9
utils/ragflow_sdk/__init__.py

@@ -1,9 +0,0 @@
-from .dataset_util import DataSetUtil
-from .document_util import DocumentUtil
-from .chunk_util import ChunkUtil
-
-__all__ = [
-    "DataSetUtil",
-    "DocumentUtil",
-    "ChunkUtil"
-]

+ 0 - 17
utils/ragflow_sdk/base_util.py

@@ -1,17 +0,0 @@
-from ragflow_sdk import RAGFlow
-from conf.settings import ragflow_settings
-
-
-class RAGFlowBaseUtil:
-    """
-    RAGFlow 工具类基础类,封装了 RAGFlow 客户端初始化逻辑
-    """
-    
-    def __init__(self):
-        """
-        初始化 RAGFlow 客户端
-        """
-        self.ragflow_client = RAGFlow(
-            api_key=ragflow_settings.ragflow_api_key,
-            base_url=ragflow_settings.ragflow_api_url
-        )

+ 0 - 80
utils/ragflow_sdk/chunk_util.py

@@ -1,80 +0,0 @@
-from typing import Optional
-from ragflow_sdk import Chunk
-from utils.ragflow_sdk.document_util import DocumentUtil
-from .base_util import RAGFlowBaseUtil
-
-
-
-class ChunkUtil(RAGFlowBaseUtil):
-
-    def __init__(self, dataset_id: str, document_id: str):
-        super().__init__()
-        self.dataset_id = dataset_id
-        self.document_id = document_id
-
-    def add_chunk(self, dataset_name: Optional[str] = None, document_id: Optional[str] = None, content: str = None, important_keywords: list[str] = []):
-        """
-        添加文档分块
-        """
-        try:
-            if not dataset_name or not document_id or not content:
-                raise ValueError("数据集名称、文档ID和内容不能为空")
-            chunk = DocumentUtil().get_document(dataset_name=dataset_name, document_id=document_id).add_chunk(content=content, important_keywords=important_keywords)
-            return chunk
-        except Exception as e:
-            print(f"添加文档分块失败: {e}")
-            raise e
-    
-    def delete_chunks(self, chunk_ids: list[str]):
-        """
-        删除文档分块
-        """
-        try:
-            DocumentUtil().get_document(dataset_name=dataset_name, document_id=document_id).delete_chunks(chunk_ids)
-        except Exception as e:
-            print(f"删除文档分块失败: {e}")
-            raise e
-    
-
-    def list_chunks(self, keywords: str = None, page: int = 1, page_size: int = 30, id : str = None) -> list[Chunk]:
-        """
-        列出文档分块
-        """
-        try:
-            chunks = DocumentUtil().get_document(dataset_name=dataset_name, document_id=document_id).list_chunks(keywords=keywords, page=page, page_size=page_size, id=id)
-            return chunks
-        except Exception as e:
-            print(f"列出文档分块失败: {e}")
-            raise e
-
-    def get_chunk(self, chunk_id: str) -> Chunk:
-        """
-        获取文档分块
-        """
-        _list = self.list_chunks(id=chunk_id)
-        if len(_list) > 0:
-            return _list[0]
-        raise Exception("Chunk %s not found" % chunk_id)
-    
-
-    def update_chunk(self, chunk_id: str, update_message: dict):
-        """
-        更新文档分块
-        """
-        try:
-            self.get_chunk(chunk_id=chunk_id).update(update_message)
-        except Exception as e:
-            print(f"更新文档分块失败: {e}")
-            raise e
-    
-    def retrieve(self, question: str, dataset_ids: list[str] = None, document_ids: list[str] = None, page: int = 1, page_size: int = 30, similarity_threshold: float = 0.2, vector_similarity_weight: float = 0.3, top_k: int = 1024, rerank_id: str = None, keyword: bool = False, cross_languages: list[str] = None, metadata_condition: dict = None) -> list[Chunk]:
-        """
-        检索文档分块
-        """
-        try:
-            chunks = self.ragflow_client.retrieve(question=question, dataset_ids=dataset_ids, document_ids=document_ids, page=page, page_size=page_size, similarity_threshold=similarity_threshold, vector_similarity_weight=vector_similarity_weight, top_k=top_k, rerank_id=rerank_id, keyword=keyword, cross_languages=cross_languages, metadata_condition=metadata_condition)
-            return chunks
-        except Exception as e:
-            print(f"检索文档分块失败: {e}")
-            raise e
-        

+ 0 - 45
utils/ragflow_sdk/conf/rag_parser_config.py

@@ -1,45 +0,0 @@
-class RagParserDefaults:
-
-    DATASET_PERMISSION="team"
-
-    DATASET_CHUNK_METHOD="naive"
-
-    DATASET_CONFIG_DICT = {
-            "chunk_token_num": 256,
-            "delimiter": "\n!?;。;!?",
-            "html4excel": False,
-            "layout_recognize": "Pro/Qwen/Qwen2.5-VL-7B-Instruct@SILICONFLOW",
-            "auto_keywords": 5,
-            "tag_kb_ids": [],
-            "topn_tags": 3,
-            "task_page_size": 4,
-            "raptor": {
-                "max_cluster": 64,
-                "max_token": 256,
-                "prompt": "Please summarize the following paragraphs. Be careful with the numbers, do not make things up. Paragraphs as following:\n      {cluster_content}\nThe above is the content you need to summarize.",
-                "random_seed": 0,
-                "threshold": 0.1,
-                "use_raptor": True
-            },
-            "graphrag": {
-                "resolution": True,
-                "use_graphrag": True,
-                "method": "general",
-                "entity_types": [
-                    "event",
-                    "Book",
-                    "Author",
-                    "Illustrator",
-                    "Series",
-                    "Theme",
-                    "Genre",
-                    "Character",
-                    "Setting",
-                    "AgeGroup",
-                    "Competency",
-                    "ArtStyle",
-                    "Award",
-                    "Publisher"
-                ]
-            }
-        }

+ 0 - 71
utils/ragflow_sdk/dataset_util.py

@@ -1,71 +0,0 @@
-from typing import Optional
-from ragflow_sdk import DataSet
-from conf.settings import ragflow_settings, model_settings
-from .conf.rag_parser_config import RagParserDefaults
-from .base_util import RAGFlowBaseUtil
-
-
-
-class DataSetUtil(RAGFlowBaseUtil):
-
-    def __init__(self):
-        super().__init__()
-
-    def create_dataset(self, chunk_method: str = RagParserDefaults.DATASET_CHUNK_METHOD,
-        dataset_name: Optional[str] = None, dataset_desc: str = "", 
-        embedding_model: str = f"{model_settings.embedding_model_name}@SILICONFLOW",
-        parser_config: dict = RagParserDefaults.DATASET_CONFIG_DICT):
-        """
-        创建数据集
-        """
-        # 封装数据集参数
-        parser_obj = DataSet.ParserConfig(self.ragflow_client, parser_config)
-        # 创建数据集
-        dataset = self.ragflow_client.create_dataset(
-            name=dataset_name,
-            description=dataset_desc,
-            embedding_model=embedding_model,
-            permission=RagParserDefaults.DATASET_PERMISSION, 
-            chunk_method= chunk_method,
-            parser_config=parser_obj
-        )
-        return dataset
-
-    def delete_dataset(self, ids: list[str] | None = None):
-        """
-        删除数据集
-        """
-        try:
-            if ids is not None:
-                self.ragflow_client.delete_datasets(ids)
-        except Exception as e:
-            print(f"删除数据集失败: {e}")
-
-    def get_dataset(self, name: Optional[str] = None):
-        """
-        获取数据集
-        """
-        try:
-            dataset = self.ragflow_client.get_dataset(name=name)
-        except Exception as e:
-            print(f"获取数据集失败: {e}")
-            return None
-        return dataset
-                
-    def list_datasets(self, page: int = 1, page_size: int = 30, orderby: str = "create_time", desc: bool = True, id: str | None = None, name: str | None = None) -> list[DataSet]:
-        """
-        获取数据集列表
-        """
-        try:
-            dataset_list = self.ragflow_client.list_datasets(
-                page=page,
-                page_size=page_size,
-                orderby=orderby,
-                desc=desc,
-                id=id,
-                name=name,
-            )
-        except Exception as e:
-            print(f"获取数据集列表失败: {e}")
-            return None
-        return dataset_list

+ 0 - 93
utils/ragflow_sdk/document_util.py

@@ -1,93 +0,0 @@
-from token import OP
-from typing import Optional
-from ragflow_sdk import DataSet, Document
-from conf.settings import ragflow_settings
-from .base_util import RAGFlowBaseUtil
-from common.models.pagination import Pagination
-
-
-
-class DocumentUtil(RAGFlowBaseUtil):
-
-    def __init__(self):
-        super().__init__()
-        
-
-    def upload_documents(self, dataset_name: Optional[str] = None, document_list: list[dict] = None):
-        """
-        上传文档到数据集
-        """
-        try:
-            if not document_list:
-                raise ValueError("文档列表不能为空")
-            # 上传文档到数据集
-            doc_list = self.ragflow_client.get_dataset(name=dataset_name).upload_documents(document_list)
-            return doc_list
-        except Exception as e:
-            print(f"上传文档到数据集失败: {e}")
-            raise e
-
-    def list_documents(self, dataset_name: Optional[str] = None, pagination: Pagination = Pagination(), id: str = None, keywords: str = None) -> list[Document]:
-        """
-        列出数据集的文档
-        """
-        try:
-            doc_list = self.ragflow_client.get_dataset(name=dataset_name).list_documents(id=id, keywords=keywords, **pagination.to_dict())
-            return doc_list
-        except Exception as e:
-            print(f"列出数据集文档失败: {e}")
-            raise e
-
-    def get_document(self, dataset_name: Optional[str] = None, document_id: Optional[str] = None) -> Document:
-        """
-        获取文档
-        """
-        _list = self.list_documents(dataset_name=dataset_name, id=document_id)
-        if len(_list) > 0:
-            return _list[0]
-        raise Exception("Document %s not found" % document_id)
-
-    # Document.update(update_message:dict)
-    def update_document(self, document: Document, update_message: dict):
-        """
-        更新文档
-        """
-        try:
-            document.update(update_message)
-        except Exception as e:
-            print(f"更新文档失败: {e}")
-            raise e 
-
-    def delete_document(self, dataset_name: Optional[str] = None, ids: list[str] = None):
-        """
-        删除文档
-        """
-        try:
-            if not ids:
-                raise ValueError("文档ID列表不能为空")
-            self.ragflow_client.get_dataset(name=dataset_name).delete_documents(ids=ids)
-        except Exception as e:
-            print(f"删除文档失败: {e}")
-            raise e
-
-    def async_parse_documents(self, dataset_name: Optional[str] = None,document_ids: list[str] = None):
-        """
-        异步解析文档
-        """
-        try:
-            self.ragflow_client.get_dataset(name=dataset_name).async_parse_documents(document_ids=document_ids)
-        except Exception as e:
-            print(f"异步解析文档失败: {e}")
-            raise e
-    
-    def parse_documents(self, dataset_name: Optional[str] = None,document_ids: list[str] = None) -> list[tuple[str, str, int, int]]:
-        """
-        解析文档
-        """
-        try:
-            doc_list = self.ragflow_client.get_dataset(name=dataset_name).parse_documents(document_ids=document_ids)
-            return doc_list
-        except Exception as e:
-            print(f"解析文档失败: {e}")
-            raise e
-