| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310 |
- #!/usr/bin/env python3
- """
- 图片解析工作流
- """
- import sys
- import os
- import concurrent.futures
- from concurrent.futures import ThreadPoolExecutor
- from PIL import Image
- import requests
- # 添加项目根目录到Python路径
- sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- from langgraph.graph import StateGraph, START, END
- from typing import List, Dict, Any, Annotated
- from pydantic import BaseModel, Field, ConfigDict
- from model.qwen_vl import QWenVLParser
- from utils.ragflow.ragflow_service import RAGFlowService
- from utils.vector_db import VectorDBFactory
- from model.multimodal_embedding import Embedding
- from utils.minio.image_util import image_util
- from conf.config import ModelConfig
- # 定义工作流状态类
- class ImageParsingState(BaseModel):
- """图片解析工作流状态"""
- model_config = ConfigDict(arbitrary_types_allowed=True)
- zip_file_path: str = Field(..., description="图片压缩包路径")
- book_name: str = Field(..., description="书名")
- dataset_id: str = Field(..., description="数据集ID")
- ragflow_service: RAGFlowService = Field(default_factory=RAGFlowService, description="RAGFLOW服务")
- vector_db: Any = Field(default_factory=VectorDBFactory.get_vector_db, description="向量数据库实例")
- embedding_model: Embedding = Field(default_factory=Embedding, description="多模态嵌入模型实例")
- document_id: str = Field(default="", description="文档ID")
- split_images: List[Dict[str, Any]] = Field(default_factory=list, description="拆分后的图片列表,包含图片URL和页码信息")
- parsed_results: List[Dict[str, Any]] = Field(default_factory=list, description="解析结果列表")
- vectorized_results: List[Dict[str, Any]] = Field(default_factory=list, description="向量化结果列表")
- processed_images: int = Field(default=0, description="已处理的图片数量")
- vectorized_images: int = Field(default=0, description="已向量化的图片数量")
- is_complete: bool = Field(default=False, description="是否处理完成")
- # 创建工作流构建器
- class ImageParsingWorkflow:
- """图片解析工作流"""
-
- def __init__(self, model_name: str = "Qwen/Qwen3-VL-8B-Instruct"):
- """
- 初始化图片解析工作流
-
- Args:
- model_name: QWEN VL模型名称
- """
- self.model_name = model_name
- self.workflow = self._build_workflow()
-
- def _build_workflow(self):
- """构建langgraph工作流,实现基于条件路由的并行处理"""
- # 创建状态图
- graph = StateGraph(ImageParsingState)
-
- # 添加节点
- graph.add_node("upload_images", self._upload_images_node)
- graph.add_node("parse_image", self._parse_image_node)
- graph.add_node("vectorize_store", self._vectorize_store_node)
- graph.add_node("complete", self._complete_node)
-
- # 定义边
- graph.add_edge(START, "upload_images")
- graph.add_edge("upload_images", "parse_image")
-
- # 添加条件边:判断是否继续解析
- graph.add_conditional_edges(
- "parse_image",
- self._should_continue_parsing,
- {
- "continue": "parse_image",
- "complete": "vectorize_store"
- }
- )
-
- graph.add_edge("vectorize_store", "complete")
- graph.add_edge("complete", END)
-
- # 编译工作流
- return graph.compile()
-
- def _upload_images_node(self, state: ImageParsingState) -> Dict[str, Any]:
- """上传图片节点,调用image_util处理图片压缩包"""
- print(f"开始处理图片压缩包: {state.zip_file_path}")
-
- try:
- # 调用image_util处理图片压缩包,获取图片URL列表
- image_urls = image_util.process_image_zip(
- state.zip_file_path,
- state.book_name
- )
-
- print(f"图片压缩包处理完成,共 {len(image_urls)} 张图片")
-
- # 构建split_images列表,格式与PDF解析工作流保持一致
- split_images = []
- for i, url in enumerate(image_urls):
- split_images.append({
- "page_number": i + 1,
- "image_url": url,
- "image": None # 稍后在解析时加载
- })
-
- return {
- "split_images": split_images,
- "processed_images": 0,
- "is_complete": False
- }
- except Exception as e:
- print(f"处理图片压缩包时出错: {str(e)}")
- raise
-
- def _parse_single_page(self, image_info: Dict[str, Any], model_name: str) -> Dict[str, Any]:
- """解析单个图片(用于并行处理)"""
- prompt = """
- 你是一个画本类童书的创作者,创作的内容适合0-12岁的儿童
- 任务:你需要根据现有童书插画与内容,提取出插画中的各种要素、行为、情感,并针对每个要素进行独立描述
- 注意:描述内容要积极正向,符合社会主义核心价值观
- 输出要求:
- 1.以json的格式输出,结构为:
- {
- "page_number": 页码,
- "content": 页面原文本内容,
- "elements": [
- {
- "element": "元素描述",
- "description": "详细描述"
- },
- ...
- ]
- }
- 2.每个要素的描述要简洁明了,不超过50个中文字符
- 3.每个元素的描述要与插画中的元素相关,不能脱离插画而独立存在
- 4.每个元素的描述要符合社会价值观,不能包含任何负面或不道德的内容
- """
-
- page_number = image_info["page_number"]
- image_url = image_info["image_url"]
-
- print(f"开始解析第 {page_number} 页,图片URL: {image_url}")
-
- try:
- # 从URL加载图片
- response = requests.get(image_url, timeout=30)
- response.raise_for_status()
- image = Image.open(requests.get(image_url, stream=True).raw)
-
- # 使用QWEN VL模型解析图像
- parser = QWenVLParser(model_name)
- result = parser.parse_image(image, page_number, prompt)
-
- print(f"第 {page_number} 页解析完成")
- return result
- except Exception as e:
- print(f"解析第 {page_number} 页时出错: {str(e)}")
- raise
-
- def _parse_image_node(self, state: ImageParsingState) -> Dict[str, Any]:
- """解析图像节点,使用并行处理"""
- if not state.split_images:
- return state.dict()
-
- print(f"开始并行解析 {len(state.split_images)} 张图片")
-
- parsed_results = []
-
- # 使用ThreadPoolExecutor实现并行处理
- with ThreadPoolExecutor(max_workers=4) as executor:
- # 提交所有图片解析任务
- future_to_image = {
- executor.submit(self._parse_single_page, image_info, self.model_name): image_info
- for image_info in state.split_images
- }
-
- # 收集解析结果
- for future in concurrent.futures.as_completed(future_to_image):
- try:
- result = future.result()
- parsed_results.append(result)
- except Exception as e:
- image_info = future_to_image[future]
- print(f"解析第 {image_info['page_number']} 页时出错: {str(e)}")
-
- # 按页码排序结果
- parsed_results.sort(key=lambda x: x["page_number"])
-
- print(f"所有图片解析完成,共解析 {len(parsed_results)} 张图片")
-
- return {
- "split_images": state.split_images, # 保留split_images,以便后续访问图片
- "parsed_results": parsed_results,
- "processed_images": len(parsed_results),
- "is_complete": True
- }
-
- def _should_continue_parsing(self, state: ImageParsingState) -> str:
- """判断是否继续解析"""
- # 由于我们使用了并行处理,parse_image_node会一次性处理所有图片
- # 所以这里总是返回"complete"
- return "complete"
-
- def _vectorize_store_node(self, state: ImageParsingState) -> Dict[str, Any]:
- """向量化入库节点"""
- print(f"开始向量化入库,共 {len(state.parsed_results)} 张图片")
-
- # 创建索引(如果不存在)
- index_name = f"image_documents_{state.dataset_id}"
- state.vector_db.create_index(index_name)
-
- # 准备要入库的文档列表
- documents_to_store = []
-
- # 获取文件名和总页数
- file_name = f"{state.book_name}.zip"
- file_page_count = len(state.split_images)
-
- # 遍历所有解析结果,生成向量化文档
- for i, parsed_result in enumerate(state.parsed_results):
- try:
- page_number = parsed_result.get("page_number")
- text = parsed_result.get("content", "")
- image_url = state.split_images[i].get("image_url")
-
- # 从URL加载图片
- image = None
- try:
- response = requests.get(image_url, timeout=30)
- response.raise_for_status()
- image = Image.open(requests.get(image_url, stream=True).raw)
- except Exception as e:
- print(f"加载图片 {image_url} 失败: {str(e)}")
-
- # 获取多模态嵌入向量
- print(f"正在生成第 {page_number} 页的多模态嵌入...")
- embedding = state.embedding_model.get_multimodal_embedding(text, image)
-
- # 生成1024维稠密向量
- dense_vector_1024 = embedding[:1024] # 取前1024维
-
- # 创建文档
- document = {
- "id": f"{state.document_id}_{page_number}" if state.document_id else f"image_{state.dataset_id}_{page_number}",
- "file_name": file_name,
- "file_page_count": file_page_count,
- "page_number": page_number,
- "content": text,
- "image_path": image_url, # 这里可以根据实际情况生成图片ID
- "dense_vector_1024": dense_vector_1024,
- "dataset_id": state.dataset_id,
- "document_id": state.document_id
- }
-
- documents_to_store.append(document)
- print(f"第 {page_number} 页向量化完成")
- except Exception as e:
- print(f"第 {i+1} 页向量化失败: {str(e)}")
-
- # 批量入库
- if documents_to_store:
- print(f"开始入库,共 {len(documents_to_store)} 个文档")
- result = state.vector_db.bulk_insert(index_name, documents_to_store)
- print(f"入库结果: {result}")
-
- return {
- "vectorized_results": documents_to_store,
- "vectorized_images": len(documents_to_store),
- "is_complete": True
- }
-
- def _complete_node(self, state: ImageParsingState) -> Dict[str, Any]:
- """完成节点"""
- print(f"图片解析工作流完成,共解析 {len(state.parsed_results)} 张图片,向量化 {state.vectorized_images} 张图片")
- return {
- "is_complete": True
- }
-
- def run(self, zip_file_path: str, book_name: str, dataset_id: str, ragflow_api_url: str, rag_flow_api_key: str) -> Dict[str, Any]:
- """
- 运行图片解析工作流
-
- Args:
- zip_file_path: 图片压缩包路径
- book_name: 书名
- dataset_id: 数据集ID
- ragflow_api_url: RAGFLOW API URL
- rag_flow_api_key: RAGFLOW API密钥
-
- Returns:
- Dict: 包含最终状态的字典
- """
- initial_state = ImageParsingState(
- zip_file_path=zip_file_path,
- book_name=book_name,
- dataset_id=dataset_id,
- embedding_model=Embedding(model_name=ModelConfig.get_multimodal_embedding_model_name(), api_key=ModelConfig.get_dashscope_api_key()),
- ragflow_service=RAGFlowService(base_url=ragflow_api_url, api_key=rag_flow_api_key)
- )
- result = self.workflow.invoke(initial_state)
-
- # 检查结果类型,如果是字典直接返回,否则调用dict()方法
- if isinstance(result, dict):
- return result
- else:
- return result.dict()
|