workflow.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378
  1. import sys
  2. import os
  3. import concurrent.futures
  4. from concurrent.futures import ThreadPoolExecutor
  5. # 添加项目根目录到Python路径
  6. sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
  7. from langgraph.graph import StateGraph, START, END
  8. from langgraph.graph.message import add_messages
  9. from typing import List, Dict, Any
  10. from pydantic import BaseModel, Field, ConfigDict
  11. from parser.pdf_parser.pdf_splitter import PDFSplitter
  12. from model.qwen_vl import QWenVLParser
  13. from utils.ragflow.ragflow_service import RAGFlowService
  14. from utils.vector_db import VectorDBFactory
  15. from model.multimodal_embedding import Embedding
  16. from conf.config import ModelConfig, VectorDBConfig
  17. from utils.minio.image_util import ImageUtil
  18. # 定义工作流状态类
  19. class PDFParsingState(BaseModel):
  20. """PDF解析工作流状态"""
  21. model_config = ConfigDict(arbitrary_types_allowed=True)
  22. pdf_path: str = Field(..., description="PDF文件路径")
  23. dataset_id: str = Field(..., description="数据集ID")
  24. ragflow_service: RAGFlowService = Field(default_factory=RAGFlowService, description="RAGFLOW服务")
  25. vector_db: Any = Field(default_factory=VectorDBFactory.get_vector_db, description="向量数据库实例")
  26. embedding_model: Embedding = Field(default_factory=Embedding, description="多模态嵌入模型实例")
  27. document_id: str = Field(default="", description="上传后的文档ID")
  28. split_pages: List[Dict[str, Any]] = Field(default_factory=list, description="拆分后的页面列表")
  29. current_page: Dict[str, Any] = Field(default_factory=dict, description="当前处理的页面")
  30. parsed_results: List[Dict[str, Any]] = Field(default_factory=list, description="解析结果列表")
  31. vectorized_results: List[Dict[str, Any]] = Field(default_factory=list, description="向量化结果列表")
  32. processed_pages: int = Field(default=0, description="已处理的页面数量")
  33. vectorized_pages: int = Field(default=0, description="已向量化的页面数量")
  34. is_complete: bool = Field(default=False, description="是否处理完成")
  35. # 创建工作流构建器
  36. class PDFParsingWorkflow:
  37. """PDF扫描件拆分解析工作流"""
  38. def __init__(self, model_name: str = "Qwen/Qwen3-VL-8B-Instruct"):
  39. """
  40. 初始化PDF解析工作流
  41. Args:
  42. model_name: QWEN VL模型名称
  43. """
  44. self.model_name = model_name
  45. self.workflow = self._build_workflow()
  46. def _build_workflow(self):
  47. """构建langgraph工作流,实现基于条件路由的并行处理"""
  48. # 创建状态图
  49. graph = StateGraph(PDFParsingState)
  50. # 添加上传文档节点
  51. graph.add_node("upload_document", self._upload_document_node)
  52. # 添加解析文档节点
  53. graph.add_node("parse_document", self._parse_document_node)
  54. # 添加拆分PDF节点
  55. graph.add_node("split_pdf", self._split_pdf_node)
  56. # 添加解析图像节点
  57. graph.add_node("parse_image", self._parse_image_node)
  58. # 添加向量化入库节点
  59. graph.add_node("vectorize_store", self._vectorize_store_node)
  60. # 添加完成节点
  61. graph.add_node("complete", self._complete_node)
  62. # 定义边
  63. # 定义RagFLow解析文档
  64. graph.add_edge(START, "upload_document")
  65. # 添加解析文档边
  66. graph.add_edge("upload_document", "parse_document")
  67. graph.add_edge("parse_document", "split_pdf")
  68. # 定义图片解析边
  69. graph.add_edge("split_pdf", "parse_image")
  70. # 添加条件边:判断是否继续解析
  71. graph.add_conditional_edges(
  72. "parse_image",
  73. self._should_continue_parsing,
  74. {
  75. "continue": "parse_image",
  76. "complete": "vectorize_store"
  77. }
  78. )
  79. # 添加向量化入库边
  80. graph.add_edge("vectorize_store", "complete")
  81. graph.add_edge("complete", END)
  82. # 编译工作流
  83. return graph.compile()
  84. def _upload_document_node(self, state: PDFParsingState) -> Dict[str, Any]:
  85. """RAGFLOW上传文档节点"""
  86. print(f"开始上传文档到数据集 {state.dataset_id}: {state.pdf_path}")
  87. try:
  88. # 上传文档
  89. document_info_list = state.ragflow_service.upload_document(
  90. dataset_id=state.dataset_id,
  91. file_path=state.pdf_path
  92. )
  93. # 检查响应
  94. if document_info_list and len(document_info_list) > 0:
  95. document_id = document_info_list[0]["id"]
  96. print(f"文档上传成功,文档ID: {document_id}")
  97. return {
  98. "document_id": document_id
  99. }
  100. else:
  101. print("文档上传失败: 未返回有效的文档信息")
  102. raise Exception("文档上传失败: 未返回有效的文档信息")
  103. except Exception as e:
  104. print(f"上传文档时出错: {str(e)}")
  105. raise
  106. def _parse_document_node(self, state: PDFParsingState) -> Dict[str, Any]:
  107. """RAGFLOW文档解析节点"""
  108. print(f"开始解析文档 {state.dataset_id}: {state.document_id}")
  109. try:
  110. # 解析文档
  111. parse_success = state.ragflow_service.parse_document(
  112. dataset_id=state.dataset_id,
  113. document_ids=[state.document_id]
  114. )
  115. # 检查响应parse_success为bool
  116. if parse_success:
  117. print(f"文档解析成功,文档ID: {state.document_id}")
  118. # 返回空列表,因为parsed_results字段期望是列表类型
  119. return {
  120. "parsed_results": []
  121. }
  122. else:
  123. print("文档解析失败: 未返回有效的解析结果")
  124. raise Exception("文档解析失败: 未返回有效的解析结果")
  125. except Exception as e:
  126. print(f"解析文档时出错: {str(e)}")
  127. raise
  128. def _split_pdf_node(self, state: PDFParsingState) -> Dict[str, Any]:
  129. """拆分PDF节点"""
  130. print(f"开始拆分PDF: {state.pdf_path}")
  131. # 拆分PDF
  132. splitter = PDFSplitter()
  133. split_pages = splitter.split_pdf(state.pdf_path)
  134. print(f"PDF拆分完成,共 {len(split_pages)} 页")
  135. return {
  136. "split_pages": split_pages,
  137. "parsed_results": [],
  138. "processed_pages": 0,
  139. "is_complete": False
  140. }
  141. def _parse_single_page(self, page: Dict[str, Any], model_name: str) -> Dict[str, Any]:
  142. """解析单个页面(用于并行处理)"""
  143. prompt = """
  144. 角色定位:你是一位顶尖的儿童绘本分析师与视觉工程专家,擅长将插画视觉信息转化为高精度的结构化元数据。
  145. 任务描述:请深度解析提供的绘本页面,不仅提取基本要素,还要进行“像素级”的特征拆解。重点关注角色的微表情、服饰纹理、环境光效、构图视角及整体艺术风格。
  146. 提取维度:
  147. 艺术风格 (Style):包括笔触(如水彩、蜡笔)、线条粗细、整体色调偏好。
  148. 角色特征 (Character):五官细节、肢体动作的动态感、衣物材质、标志性配饰。
  149. 空间构图 (Composition):透视关系(仰拍/俯拍)、视觉焦点、前景/中景/背景的层次。
  150. 物品与环境 (Object & Environment):物体的精确形状、材质光泽、环境中的自然元素(风吹草动的方向等)。
  151. 内容标签 (content_tags):请从以下三个维度进行打标:
  152. 主题维度(如:自然探索、家庭学校、科学科普、传统文化)
  153. 具体对象(如:昆虫、交通工具、五官、家具)
  154. 情感氛围(如:惊喜、友爱、好奇、安静)
  155. 能力标签 (ability_tags):请严格参照以下教育能力模型,根据图中元素体现的教育价值进行选择:
  156. [语言表达、逻辑思维、数理逻辑、空间想象、艺术创造、身体协调、自我认知、社会交往、自然观察、情绪管理]。
  157. 输出约束:
  158. 保持描述积极向上,符合0-12岁儿童阅读的安全标准。
  159. 描述精度:单条描述需包含具体视觉属性(颜色、形状、质感),字数控制在50字以内。
  160. 格式要求:严谨按照指定的JSON结构输出。
  161. json格式:
  162. {
  163. "page_meta": {
  164. "page_number": 1,
  165. "content_text": "页面原文本内容",
  166. "overall_style": {
  167. "art_medium": "艺术媒介(如:手绘水彩、矢量平涂、3D渲染)",
  168. "color_palette": ["主色调1", "主色调2"],
  169. "lighting": "光影描述(如:柔和侧光、清晨自然光)",
  170. "composition": "构图(如:三分法、对角线构图、大远景)"
  171. }
  172. },
  173. "elements": [
  174. {
  175. "element_name": "元素名称(如:小兔子)",
  176. "character_name": "角色名称(如果有,没有的话,角色名称为空字符串)",
  177. "category": "分类(角色/场景/道具)",
  178. "spatial_layer": "所在层级(前景/中景/背景)",
  179. "visual_attributes": {
  180. "appearance": "外貌细节描述(发型、五官、材质感)",
  181. "action_emotion": "行为动作与情感流露",
  182. "color_detail": "像素级颜色描述(如:淡茱萸粉、薄荷绿)",
  183. "ability_tag": "如果为角色,其表现出的正面能力/特质"
  184. },
  185. "content_tags": {
  186. "theme": ["自然", "社交", "生活常识"],
  187. "object": ["动物", "服装", "植物"],
  188. "emotion": ["快乐", "勇敢"]
  189. },
  190. "ability_tags": ["语言表达", "逻辑思维", "自我认知"],
  191. "description": "综合性简洁描述(50字内)"
  192. }
  193. ]
  194. }
  195. """
  196. page_number = page["page_number"]
  197. image = page["image"]
  198. print(f"开始解析第 {page_number} 页")
  199. # 使用QWEN VL模型解析图像
  200. parser = QWenVLParser(model_name)
  201. result = parser.parse_image(image, page_number, prompt)
  202. print(f"第 {page_number} 页解析完成")
  203. return result
  204. def _parse_image_node(self, state: PDFParsingState) -> Dict[str, Any]:
  205. """解析图像节点,使用并行处理"""
  206. if not state.split_pages:
  207. return state.dict()
  208. print(f"开始并行解析 {len(state.split_pages)} 页")
  209. parsed_results = []
  210. # 使用ThreadPoolExecutor实现并行处理
  211. with ThreadPoolExecutor(max_workers=5, thread_name_prefix="parse_page_") as executor:
  212. # 提交所有页面解析任务
  213. future_to_page = {
  214. executor.submit(self._parse_single_page, page, self.model_name): page
  215. for page in state.split_pages
  216. }
  217. # 收集解析结果
  218. for future in concurrent.futures.as_completed(future_to_page):
  219. try:
  220. result = future.result()
  221. parsed_results.append(result)
  222. except Exception as e:
  223. page = future_to_page[future]
  224. print(f"解析第 {page['page_number']} 页时出错: {str(e)}")
  225. # 按页码排序结果
  226. parsed_results.sort(key=lambda x: x["page_number"])
  227. print(f"所有页面解析完成,共解析 {len(parsed_results)} 页")
  228. return {
  229. "split_pages": state.split_pages, # 保留split_pages,以便后续访问图片
  230. "parsed_results": parsed_results,
  231. "processed_pages": len(parsed_results),
  232. "is_complete": True
  233. }
  234. def _should_continue_parsing(self, state: PDFParsingState) -> str:
  235. """判断是否继续解析"""
  236. # 由于我们使用了并行处理,parse_image_node会一次性处理所有页面
  237. # 所以这里总是返回"complete"
  238. return "complete"
  239. def _vectorize_store_node(self, state: PDFParsingState) -> Dict[str, Any]:
  240. """向量化入库节点"""
  241. print(f"开始向量化入库,共 {len(state.parsed_results)} 页")
  242. # 创建索引(如果不存在)
  243. index_name = f"{VectorDBConfig.get_infinity_table_name()}"
  244. state.vector_db.create_index(index_name)
  245. # 准备要入库的文档列表
  246. documents_to_store = []
  247. # 获取文件名和总页数
  248. file_name = os.path.basename(state.pdf_path)
  249. file_page_count = len(state.split_pages)
  250. # 遍历所有解析结果,生成向量化文档
  251. for i, parsed_result in enumerate(state.parsed_results):
  252. try:
  253. page_number = parsed_result.get("page_number")
  254. text = parsed_result.get("content", "")
  255. image = state.split_pages[i].get("image")
  256. image_path = state.split_pages[i].get("image_path")
  257. # 获取多模态嵌入向量
  258. print(f"正在生成第 {page_number} 页的多模态嵌入...")
  259. embedding = state.embedding_model.get_multimodal_embedding(text, image)
  260. # 生成1024维稠密向量(如果嵌入向量维度不是1024,这里需要处理)
  261. dense_vector_1024 = embedding[:1024] # 取前1024维
  262. # 创建文档
  263. document = {
  264. "id": f"{state.document_id}_{page_number}",
  265. "file_name": file_name,
  266. "file_page_count": file_page_count,
  267. "page_number": page_number,
  268. "content": text,
  269. "image_path": image_path,
  270. "dense_vector_1024": dense_vector_1024,
  271. "dataset_id": state.dataset_id,
  272. "document_id": state.document_id
  273. }
  274. documents_to_store.append(document)
  275. print(f"第 {page_number} 页向量化完成")
  276. except Exception as e:
  277. print(f"第 {i+1} 页向量化失败: {str(e)}")
  278. # 批量入库
  279. if documents_to_store:
  280. print(f"开始入库,共 {len(documents_to_store)} 个文档")
  281. result = state.vector_db.bulk_insert(index_name, documents_to_store)
  282. print(f"入库结果: {result}")
  283. return {
  284. "vectorized_results": documents_to_store,
  285. "vectorized_pages": len(documents_to_store),
  286. "is_complete": True
  287. }
  288. def _complete_node(self, state: PDFParsingState) -> Dict[str, Any]:
  289. """完成节点"""
  290. print(f"PDF解析工作流完成,共解析 {len(state.parsed_results)} 页,向量化 {state.vectorized_pages} 页")
  291. # 判断ragflow是否解析成功
  292. return {
  293. "is_complete": True
  294. }
  295. def run(self, pdf_path: str, dataset_id: str, ragflow_api_url: str, rag_flow_api_key: str) -> Dict[str, Any]:
  296. """
  297. 运行PDF解析工作流
  298. Args:
  299. pdf_path: PDF文件路径
  300. dataset_id: 数据集ID
  301. ragflow_api_url: RAGFLOW API URL
  302. rag_flow_api_key: RAGFLOW API密钥
  303. Returns:
  304. Dict: 包含最终状态的字典
  305. """
  306. initial_state = PDFParsingState(
  307. pdf_path=pdf_path,
  308. dataset_id=dataset_id,
  309. embedding_model=Embedding(model_name=ModelConfig.get_multimodal_embedding_model_name(), api_key=ModelConfig.get_dashscope_api_key()),
  310. ragflow_service=RAGFlowService(base_url=ragflow_api_url, api_key=rag_flow_api_key)
  311. )
  312. result = self.workflow.invoke(initial_state)
  313. # 检查结果类型,如果是字典直接返回,否则调用dict()方法
  314. if isinstance(result, dict):
  315. return result
  316. else:
  317. return result.dict()