image_parsing_workflow.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310
  1. #!/usr/bin/env python3
  2. """
  3. 图片解析工作流
  4. """
  5. import sys
  6. import os
  7. import concurrent.futures
  8. from concurrent.futures import ThreadPoolExecutor
  9. from PIL import Image
  10. import requests
  11. # 添加项目根目录到Python路径
  12. sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  13. from langgraph.graph import StateGraph, START, END
  14. from typing import List, Dict, Any, Annotated
  15. from pydantic import BaseModel, Field, ConfigDict
  16. from model.qwen_vl import QWenVLParser
  17. from utils.ragflow.ragflow_service import RAGFlowService
  18. from utils.vector_db import VectorDBFactory
  19. from model.multimodal_embedding import Embedding
  20. from utils.minio.image_util import image_util
  21. from conf.config import ModelConfig
  22. # 定义工作流状态类
  23. class ImageParsingState(BaseModel):
  24. """图片解析工作流状态"""
  25. model_config = ConfigDict(arbitrary_types_allowed=True)
  26. zip_file_path: str = Field(..., description="图片压缩包路径")
  27. book_name: str = Field(..., description="书名")
  28. dataset_id: str = Field(..., description="数据集ID")
  29. ragflow_service: RAGFlowService = Field(default_factory=RAGFlowService, description="RAGFLOW服务")
  30. vector_db: Any = Field(default_factory=VectorDBFactory.get_vector_db, description="向量数据库实例")
  31. embedding_model: Embedding = Field(default_factory=Embedding, description="多模态嵌入模型实例")
  32. document_id: str = Field(default="", description="文档ID")
  33. split_images: List[Dict[str, Any]] = Field(default_factory=list, description="拆分后的图片列表,包含图片URL和页码信息")
  34. parsed_results: List[Dict[str, Any]] = Field(default_factory=list, description="解析结果列表")
  35. vectorized_results: List[Dict[str, Any]] = Field(default_factory=list, description="向量化结果列表")
  36. processed_images: int = Field(default=0, description="已处理的图片数量")
  37. vectorized_images: int = Field(default=0, description="已向量化的图片数量")
  38. is_complete: bool = Field(default=False, description="是否处理完成")
  39. # 创建工作流构建器
  40. class ImageParsingWorkflow:
  41. """图片解析工作流"""
  42. def __init__(self, model_name: str = "Qwen/Qwen3-VL-8B-Instruct"):
  43. """
  44. 初始化图片解析工作流
  45. Args:
  46. model_name: QWEN VL模型名称
  47. """
  48. self.model_name = model_name
  49. self.workflow = self._build_workflow()
  50. def _build_workflow(self):
  51. """构建langgraph工作流,实现基于条件路由的并行处理"""
  52. # 创建状态图
  53. graph = StateGraph(ImageParsingState)
  54. # 添加节点
  55. graph.add_node("upload_images", self._upload_images_node)
  56. graph.add_node("parse_image", self._parse_image_node)
  57. graph.add_node("vectorize_store", self._vectorize_store_node)
  58. graph.add_node("complete", self._complete_node)
  59. # 定义边
  60. graph.add_edge(START, "upload_images")
  61. graph.add_edge("upload_images", "parse_image")
  62. # 添加条件边:判断是否继续解析
  63. graph.add_conditional_edges(
  64. "parse_image",
  65. self._should_continue_parsing,
  66. {
  67. "continue": "parse_image",
  68. "complete": "vectorize_store"
  69. }
  70. )
  71. graph.add_edge("vectorize_store", "complete")
  72. graph.add_edge("complete", END)
  73. # 编译工作流
  74. return graph.compile()
  75. def _upload_images_node(self, state: ImageParsingState) -> Dict[str, Any]:
  76. """上传图片节点,调用image_util处理图片压缩包"""
  77. print(f"开始处理图片压缩包: {state.zip_file_path}")
  78. try:
  79. # 调用image_util处理图片压缩包,获取图片URL列表
  80. image_urls = image_util.process_image_zip(
  81. state.zip_file_path,
  82. state.book_name
  83. )
  84. print(f"图片压缩包处理完成,共 {len(image_urls)} 张图片")
  85. # 构建split_images列表,格式与PDF解析工作流保持一致
  86. split_images = []
  87. for i, url in enumerate(image_urls):
  88. split_images.append({
  89. "page_number": i + 1,
  90. "image_url": url,
  91. "image": None # 稍后在解析时加载
  92. })
  93. return {
  94. "split_images": split_images,
  95. "processed_images": 0,
  96. "is_complete": False
  97. }
  98. except Exception as e:
  99. print(f"处理图片压缩包时出错: {str(e)}")
  100. raise
  101. def _parse_single_page(self, image_info: Dict[str, Any], model_name: str) -> Dict[str, Any]:
  102. """解析单个图片(用于并行处理)"""
  103. prompt = """
  104. 你是一个画本类童书的创作者,创作的内容适合0-12岁的儿童
  105. 任务:你需要根据现有童书插画与内容,提取出插画中的各种要素、行为、情感,并针对每个要素进行独立描述
  106. 注意:描述内容要积极正向,符合社会主义核心价值观
  107. 输出要求:
  108. 1.以json的格式输出,结构为:
  109. {
  110. "page_number": 页码,
  111. "content": 页面原文本内容,
  112. "elements": [
  113. {
  114. "element": "元素描述",
  115. "description": "详细描述"
  116. },
  117. ...
  118. ]
  119. }
  120. 2.每个要素的描述要简洁明了,不超过50个中文字符
  121. 3.每个元素的描述要与插画中的元素相关,不能脱离插画而独立存在
  122. 4.每个元素的描述要符合社会价值观,不能包含任何负面或不道德的内容
  123. """
  124. page_number = image_info["page_number"]
  125. image_url = image_info["image_url"]
  126. print(f"开始解析第 {page_number} 页,图片URL: {image_url}")
  127. try:
  128. # 从URL加载图片
  129. response = requests.get(image_url, timeout=30)
  130. response.raise_for_status()
  131. image = Image.open(requests.get(image_url, stream=True).raw)
  132. # 使用QWEN VL模型解析图像
  133. parser = QWenVLParser(model_name)
  134. result = parser.parse_image(image, page_number, prompt)
  135. print(f"第 {page_number} 页解析完成")
  136. return result
  137. except Exception as e:
  138. print(f"解析第 {page_number} 页时出错: {str(e)}")
  139. raise
  140. def _parse_image_node(self, state: ImageParsingState) -> Dict[str, Any]:
  141. """解析图像节点,使用并行处理"""
  142. if not state.split_images:
  143. return state.dict()
  144. print(f"开始并行解析 {len(state.split_images)} 张图片")
  145. parsed_results = []
  146. # 使用ThreadPoolExecutor实现并行处理
  147. with ThreadPoolExecutor(max_workers=4) as executor:
  148. # 提交所有图片解析任务
  149. future_to_image = {
  150. executor.submit(self._parse_single_page, image_info, self.model_name): image_info
  151. for image_info in state.split_images
  152. }
  153. # 收集解析结果
  154. for future in concurrent.futures.as_completed(future_to_image):
  155. try:
  156. result = future.result()
  157. parsed_results.append(result)
  158. except Exception as e:
  159. image_info = future_to_image[future]
  160. print(f"解析第 {image_info['page_number']} 页时出错: {str(e)}")
  161. # 按页码排序结果
  162. parsed_results.sort(key=lambda x: x["page_number"])
  163. print(f"所有图片解析完成,共解析 {len(parsed_results)} 张图片")
  164. return {
  165. "split_images": state.split_images, # 保留split_images,以便后续访问图片
  166. "parsed_results": parsed_results,
  167. "processed_images": len(parsed_results),
  168. "is_complete": True
  169. }
  170. def _should_continue_parsing(self, state: ImageParsingState) -> str:
  171. """判断是否继续解析"""
  172. # 由于我们使用了并行处理,parse_image_node会一次性处理所有图片
  173. # 所以这里总是返回"complete"
  174. return "complete"
  175. def _vectorize_store_node(self, state: ImageParsingState) -> Dict[str, Any]:
  176. """向量化入库节点"""
  177. print(f"开始向量化入库,共 {len(state.parsed_results)} 张图片")
  178. # 创建索引(如果不存在)
  179. index_name = f"image_documents_{state.dataset_id}"
  180. state.vector_db.create_index(index_name)
  181. # 准备要入库的文档列表
  182. documents_to_store = []
  183. # 获取文件名和总页数
  184. file_name = f"{state.book_name}.zip"
  185. file_page_count = len(state.split_images)
  186. # 遍历所有解析结果,生成向量化文档
  187. for i, parsed_result in enumerate(state.parsed_results):
  188. try:
  189. page_number = parsed_result.get("page_number")
  190. text = parsed_result.get("content", "")
  191. image_url = state.split_images[i].get("image_url")
  192. # 从URL加载图片
  193. image = None
  194. try:
  195. response = requests.get(image_url, timeout=30)
  196. response.raise_for_status()
  197. image = Image.open(requests.get(image_url, stream=True).raw)
  198. except Exception as e:
  199. print(f"加载图片 {image_url} 失败: {str(e)}")
  200. # 获取多模态嵌入向量
  201. print(f"正在生成第 {page_number} 页的多模态嵌入...")
  202. embedding = state.embedding_model.get_multimodal_embedding(text, image)
  203. # 生成1024维稠密向量
  204. dense_vector_1024 = embedding[:1024] # 取前1024维
  205. # 创建文档
  206. document = {
  207. "id": f"{state.document_id}_{page_number}" if state.document_id else f"image_{state.dataset_id}_{page_number}",
  208. "file_name": file_name,
  209. "file_page_count": file_page_count,
  210. "page_number": page_number,
  211. "content": text,
  212. "image_path": image_url, # 这里可以根据实际情况生成图片ID
  213. "dense_vector_1024": dense_vector_1024,
  214. "dataset_id": state.dataset_id,
  215. "document_id": state.document_id
  216. }
  217. documents_to_store.append(document)
  218. print(f"第 {page_number} 页向量化完成")
  219. except Exception as e:
  220. print(f"第 {i+1} 页向量化失败: {str(e)}")
  221. # 批量入库
  222. if documents_to_store:
  223. print(f"开始入库,共 {len(documents_to_store)} 个文档")
  224. result = state.vector_db.bulk_insert(index_name, documents_to_store)
  225. print(f"入库结果: {result}")
  226. return {
  227. "vectorized_results": documents_to_store,
  228. "vectorized_images": len(documents_to_store),
  229. "is_complete": True
  230. }
  231. def _complete_node(self, state: ImageParsingState) -> Dict[str, Any]:
  232. """完成节点"""
  233. print(f"图片解析工作流完成,共解析 {len(state.parsed_results)} 张图片,向量化 {state.vectorized_images} 张图片")
  234. return {
  235. "is_complete": True
  236. }
  237. def run(self, zip_file_path: str, book_name: str, dataset_id: str, ragflow_api_url: str, rag_flow_api_key: str) -> Dict[str, Any]:
  238. """
  239. 运行图片解析工作流
  240. Args:
  241. zip_file_path: 图片压缩包路径
  242. book_name: 书名
  243. dataset_id: 数据集ID
  244. ragflow_api_url: RAGFLOW API URL
  245. rag_flow_api_key: RAGFLOW API密钥
  246. Returns:
  247. Dict: 包含最终状态的字典
  248. """
  249. initial_state = ImageParsingState(
  250. zip_file_path=zip_file_path,
  251. book_name=book_name,
  252. dataset_id=dataset_id,
  253. embedding_model=Embedding(model_name=ModelConfig.get_multimodal_embedding_model_name(), api_key=ModelConfig.get_dashscope_api_key()),
  254. ragflow_service=RAGFlowService(base_url=ragflow_api_url, api_key=rag_flow_api_key)
  255. )
  256. result = self.workflow.invoke(initial_state)
  257. # 检查结果类型,如果是字典直接返回,否则调用dict()方法
  258. if isinstance(result, dict):
  259. return result
  260. else:
  261. return result.dict()