|
|
@@ -1,19 +1,15 @@
|
|
|
-import sys
|
|
|
import os
|
|
|
import concurrent.futures
|
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
|
-# 添加项目根目录到Python路径
|
|
|
-sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
|
|
|
-
|
|
|
from langgraph.graph import StateGraph, START, END
|
|
|
-from langgraph.graph.message import add_messages
|
|
|
from typing import List, Dict, Any
|
|
|
from pydantic import BaseModel, Field, ConfigDict
|
|
|
from parser.pdf_parser.pdf_splitter import PDFSplitter
|
|
|
from model.qwen_vl import QWenVLParser
|
|
|
+from utils.ragflow_sdk import DataSetUtil, DocumentUtil, ChunkUtil
|
|
|
from utils.ragflow.ragflow_service import RAGFlowService
|
|
|
from model.multimodal_embedding import Embedding
|
|
|
-from conf.config import ModelConfig, VectorDBConfig
|
|
|
+from conf.settings import model_settings, vector_db_settings
|
|
|
from utils.infinity import get_client
|
|
|
|
|
|
# 定义工作流状态类
|
|
|
@@ -22,7 +18,11 @@ class PDFParsingState(BaseModel):
|
|
|
model_config = ConfigDict(arbitrary_types_allowed=True)
|
|
|
pdf_path: str = Field(..., description="PDF文件路径")
|
|
|
dataset_id: str = Field(..., description="数据集ID")
|
|
|
- ragflow_service: RAGFlowService = Field(default_factory=RAGFlowService, description="RAGFLOW服务")
|
|
|
+ page_dataset_id: str = Field(..., description="页面数据集ID")
|
|
|
+ ragflow_service: RAGFlowService = Field(default_factory=RAGFlowService, description="RAGFlow服务实例")
|
|
|
+ dataset_util: DataSetUtil = Field(default_factory=DataSetUtil, description="数据集工具类实例")
|
|
|
+ document_util: DocumentUtil = Field(default_factory=DocumentUtil, description="文档工具类实例")
|
|
|
+ chunk_util: ChunkUtil = Field(default_factory=ChunkUtil, description="文档工具类实例")
|
|
|
embedding_model: Embedding = Field(default_factory=Embedding, description="多模态嵌入模型实例")
|
|
|
document_id: str = Field(default="", description="上传后的文档ID")
|
|
|
split_pages: List[Dict[str, Any]] = Field(default_factory=list, description="拆分后的页面列表")
|
|
|
@@ -46,6 +46,7 @@ class PDFParsingWorkflow:
|
|
|
"""
|
|
|
self.model_name = model_name
|
|
|
self.workflow = self._build_workflow()
|
|
|
+
|
|
|
|
|
|
def _build_workflow(self):
|
|
|
"""构建langgraph工作流,实现基于条件路由的并行处理"""
|
|
|
@@ -97,6 +98,37 @@ class PDFParsingWorkflow:
|
|
|
# 编译工作流
|
|
|
return graph.compile()
|
|
|
|
|
|
+ def get_ragflow_dataset(self, dataset_name: str) -> str:
|
|
|
+ """获取RAGFLOW数据集ID"""
|
|
|
+ try:
|
|
|
+ dataset_id = self.dataset_util.get_dataset(name=dataset_name)
|
|
|
+ print(f"数据集 {dataset_name} 的ID为: {dataset_id}")
|
|
|
+ return dataset_id
|
|
|
+ except Exception as e:
|
|
|
+ print(f"获取数据集ID时出错: {str(e)}")
|
|
|
+ raise
|
|
|
+
|
|
|
+ def create_ragflow_dataset(self, state: PDFParsingState, dataset_name: str) -> str:
|
|
|
+ """创建RAGFLOW数据集"""
|
|
|
+ if state.dataset_id:
|
|
|
+ print(f"数据集 {dataset_name} 已存在,数据集ID: {state.dataset_id}")
|
|
|
+ return state.dataset_id
|
|
|
+
|
|
|
+ print(f"开始创建数据集: {dataset_name}")
|
|
|
+
|
|
|
+ try:
|
|
|
+ # 创建数据集
|
|
|
+ dataset_id = self.dataset_util.create_dataset(
|
|
|
+ chunk_method="naive",
|
|
|
+ dataset_name=dataset_name,
|
|
|
+ dataset_desc="",
|
|
|
+ )
|
|
|
+ print(f"数据集创建成功,数据集ID: {dataset_id}")
|
|
|
+ return dataset_id
|
|
|
+ except Exception as e:
|
|
|
+ print(f"创建数据集时出错: {str(e)}")
|
|
|
+ raise
|
|
|
+
|
|
|
def _upload_document_node(self, state: PDFParsingState) -> Dict[str, Any]:
|
|
|
"""RAGFLOW上传文档节点"""
|
|
|
print(f"开始上传文档到数据集 {state.dataset_id}: {state.pdf_path}")
|
|
|
@@ -107,13 +139,20 @@ class PDFParsingWorkflow:
|
|
|
dataset_id=state.dataset_id,
|
|
|
file_path=state.pdf_path
|
|
|
)
|
|
|
+ # 上传文档
|
|
|
+ document_info_list2 = state.ragflow_service.upload_document(
|
|
|
+ dataset_id=state.page_dataset_id,
|
|
|
+ file_path=state.pdf_path
|
|
|
+ )
|
|
|
|
|
|
# 检查响应
|
|
|
if document_info_list and len(document_info_list) > 0:
|
|
|
document_id = document_info_list[0]["id"]
|
|
|
+ page_document_id = document_info_list2[0]["id"]
|
|
|
print(f"文档上传成功,文档ID: {document_id}")
|
|
|
return {
|
|
|
- "document_id": document_id
|
|
|
+ "document_id": document_id,
|
|
|
+ "page_document_id": page_document_id
|
|
|
}
|
|
|
else:
|
|
|
print("文档上传失败: 未返回有效的文档信息")
|
|
|
@@ -278,12 +317,38 @@ class PDFParsingWorkflow:
|
|
|
# 所以这里总是返回"complete"
|
|
|
return "complete"
|
|
|
|
|
|
+ def create_ragflow_chunk(self, state: PDFParsingState):
|
|
|
+ """单页上传节点"""
|
|
|
+ print(f"开始单页上传,共 {len(state.parsed_results)} 页")
|
|
|
+
|
|
|
+ # 遍历所有解析结果,上传单页
|
|
|
+ for parsed_result in state.parsed_results:
|
|
|
+ page_number = parsed_result.get("page_number")
|
|
|
+ text = parsed_result.get("content", "")
|
|
|
+ image = state.split_pages[page_number - 1].get("image")
|
|
|
+
|
|
|
+ # 上传单页到RagFlow Chunk
|
|
|
+ chunk = state.chunk_util.add_chunk(
|
|
|
+ dataset_name=state.dataset_name,
|
|
|
+ document_id=state.page_document_id,
|
|
|
+ content=text,
|
|
|
+ )
|
|
|
+
|
|
|
+ infinity_client = get_client()
|
|
|
+ infinity_client.update(database_name=state.dataset_name, table_name="", cond=f"id = {chunk_id}", data={"tag_kwd": tag_name})
|
|
|
+
|
|
|
+ # 检查响应
|
|
|
+ if document_info and document_info.get("id"):
|
|
|
+ print(f"第 {page_number} 页上传成功,文档ID: {document_info['id']}")
|
|
|
+ else:
|
|
|
+ print(f"第 {page_number} 页上传失败")
|
|
|
+
|
|
|
def _vectorize_store_node(self, state: PDFParsingState) -> Dict[str, Any]:
|
|
|
"""向量化入库节点"""
|
|
|
print(f"开始向量化入库,共 {len(state.parsed_results)} 页")
|
|
|
|
|
|
# 创建索引(如果不存在)
|
|
|
- index_name = f"{VectorDBConfig.get_infinity_table_name()}"
|
|
|
+ index_name = f"{vector_db_settings.infinity_table_name}"
|
|
|
state.vector_db.create_index(index_name)
|
|
|
|
|
|
# 准备要入库的文档列表
|
|
|
@@ -349,13 +414,13 @@ class PDFParsingWorkflow:
|
|
|
"is_complete": True
|
|
|
}
|
|
|
|
|
|
- def run(self, pdf_path: str, dataset_id: str, ragflow_api_url: str, rag_flow_api_key: str) -> Dict[str, Any]:
|
|
|
+ def run(self, pdf_path: str, page_dataset_id: str, ragflow_api_url: str, rag_flow_api_key: str) -> Dict[str, Any]:
|
|
|
"""
|
|
|
运行PDF解析工作流
|
|
|
|
|
|
Args:
|
|
|
pdf_path: PDF文件路径
|
|
|
- dataset_id: 数据集ID
|
|
|
+ page_dataset_id: 数据集ID
|
|
|
ragflow_api_url: RAGFLOW API URL
|
|
|
rag_flow_api_key: RAGFLOW API密钥
|
|
|
|
|
|
@@ -364,9 +429,12 @@ class PDFParsingWorkflow:
|
|
|
"""
|
|
|
initial_state = PDFParsingState(
|
|
|
pdf_path=pdf_path,
|
|
|
- dataset_id=dataset_id,
|
|
|
- embedding_model=Embedding(model_name=ModelConfig.get_multimodal_embedding_model_name(), api_key=ModelConfig.get_dashscope_api_key()),
|
|
|
- ragflow_service=RAGFlowService(base_url=ragflow_api_url, api_key=rag_flow_api_key)
|
|
|
+ page_dataset_id=page_dataset_id,
|
|
|
+ embedding_model=Embedding(model_name=model_settings.multimodal_embedding_model_name, api_key=model_settings.dashscope_api_key),
|
|
|
+ dataset_util=DataSetUtil(),
|
|
|
+ document_util=DocumentUtil(),
|
|
|
+ chunk_util=ChunkUtil(),
|
|
|
+ ragflow_service=RAGFlowService(api_url=ragflow_api_url, api_key=rag_flow_api_key)
|
|
|
)
|
|
|
result = self.workflow.invoke(initial_state)
|
|
|
|