|
@@ -0,0 +1,541 @@
|
|
|
|
|
+"""
|
|
|
|
|
+结果汇总节点
|
|
|
|
|
+
|
|
|
|
|
+使用Chat模型汇总滑动窗口解析的所有结果,并为每个分块拼接对应页码的图片上传到MinIO。
|
|
|
|
|
+"""
|
|
|
|
|
+
|
|
|
|
|
+import json
|
|
|
|
|
+import re
|
|
|
|
|
+from io import BytesIO
|
|
|
|
|
+from typing import Dict, Any, List, Optional
|
|
|
|
|
+from PIL import Image
|
|
|
|
|
+from src.datasets.parser.core.base import BaseNode, BaseState
|
|
|
|
|
+from src.datasets.parser.core.registry import register_node
|
|
|
|
|
+from src.model.qwen_vl import QWenVLParser
|
|
|
|
|
+from src.conf.settings import model_settings
|
|
|
|
|
+from src.common.logging_config import get_logger
|
|
|
|
|
+from src.utils.json_utils import parse_json_response
|
|
|
|
|
+from src.utils.file.minio.minio_util import MinIOUtil
|
|
|
|
|
+from src.utils.file.image_util import ImageUtil
|
|
|
|
|
+
|
|
|
|
|
+logger = get_logger(__name__)
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+@register_node()
|
|
|
|
|
+class ResultAggregationNode(BaseNode):
|
|
|
|
|
+ """
|
|
|
|
|
+ 结果汇总节点
|
|
|
|
|
+
|
|
|
|
|
+ 使用Chat模型汇总滑动窗口解析的所有结果,去重并整合信息。
|
|
|
|
|
+
|
|
|
|
|
+ 需要的状态字段:
|
|
|
|
|
+ - windowed_results: 滑动窗口解析结果列表
|
|
|
|
|
+ - dimension_name: 维度名称(用于生成汇总提示词)
|
|
|
|
|
+
|
|
|
|
|
+ 更新的状态字段:
|
|
|
|
|
+ - parsed_results: 汇总后的解析结果列表(按"分块"拆分)
|
|
|
|
|
+ """
|
|
|
|
|
+
|
|
|
|
|
+ def __init__(
|
|
|
|
|
+ self,
|
|
|
|
|
+ model_name: Optional[str] = None,
|
|
|
|
|
+ dimension_id: Optional[int] = None
|
|
|
|
|
+ ):
|
|
|
|
|
+ """
|
|
|
|
|
+ 初始化结果汇总节点
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ model_name: Chat模型名称
|
|
|
|
|
+ dimension_id: 维度ID
|
|
|
|
|
+ """
|
|
|
|
|
+ self.model_name = model_name or model_settings.chat_model_name
|
|
|
|
|
+ self.dimension_id = dimension_id
|
|
|
|
|
+ self.default_output = self._get_default_output()
|
|
|
|
|
+
|
|
|
|
|
+ @property
|
|
|
|
|
+ def name(self) -> str:
|
|
|
|
|
+ return "result_aggregation"
|
|
|
|
|
+
|
|
|
|
|
+ def _get_default_output(self) -> str:
|
|
|
|
|
+ """获取默认输出格式"""
|
|
|
|
|
+ return """```json
|
|
|
|
|
+{
|
|
|
|
|
+ "维度字段1": [
|
|
|
|
|
+ {
|
|
|
|
|
+ "内容": "...",
|
|
|
|
|
+ "分析": "..."
|
|
|
|
|
+ }
|
|
|
|
|
+ ],
|
|
|
|
|
+ "维度字段2": [
|
|
|
|
|
+ {
|
|
|
|
|
+ "内容": "...",
|
|
|
|
|
+ "分析": "..."
|
|
|
|
|
+ }
|
|
|
|
|
+ ]
|
|
|
|
|
+}
|
|
|
|
|
+```"""
|
|
|
|
|
+
|
|
|
|
|
+ def _build_aggregation_prompt(
|
|
|
|
|
+ self,
|
|
|
|
|
+ windowed_results: List[Dict[str, Any]],
|
|
|
|
|
+ dimension_name: str = "未知维度",
|
|
|
|
|
+ output_format: Optional[str] = None
|
|
|
|
|
+ ) -> str:
|
|
|
|
|
+ """
|
|
|
|
|
+ 构建汇总提示词
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ windowed_results: 滑动窗口解析结果
|
|
|
|
|
+ dimension_name: 维度名称
|
|
|
|
|
+ output_format: 输出格式(可选)
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ 汇总提示词
|
|
|
|
|
+ """
|
|
|
|
|
+ # 使用提供的格式或默认格式
|
|
|
|
|
+ if output_format is None:
|
|
|
|
|
+ output_format = self.default_output
|
|
|
|
|
+
|
|
|
|
|
+ # 将所有窗口结果格式化为文本
|
|
|
|
|
+ results_text = []
|
|
|
|
|
+ for i, result in enumerate(windowed_results, 1):
|
|
|
|
|
+ center_page = result.get("center_page", "?")
|
|
|
|
|
+ page_range = result.get("page_range", [])
|
|
|
|
|
+ parsed_content = result.get("parsed_content", {})
|
|
|
|
|
+
|
|
|
|
|
+ results_text.append(f"## 窗口 {i} (中心页: {center_page}, 范围: {page_range})")
|
|
|
|
|
+ results_text.append(json.dumps(parsed_content, ensure_ascii=False, indent=2))
|
|
|
|
|
+ results_text.append("")
|
|
|
|
|
+
|
|
|
|
|
+ prompt = f"""你是一个专业的内容分析专家。现在需要你汇总和整合多个滑动窗口的解析结果。
|
|
|
|
|
+
|
|
|
|
|
+**任务说明:**
|
|
|
|
|
+- 这些结果来自对同一本书使用滑动窗口方式的解析
|
|
|
|
|
+- 每个窗口包含3页内容(当前页+前后页),因此相邻窗口之间有重叠
|
|
|
|
|
+- 你需要去除重复信息,整合所有有效内容
|
|
|
|
|
+
|
|
|
|
|
+**维度名称:** {dimension_name}
|
|
|
|
|
+
|
|
|
|
|
+**滑动窗口解析结果:**
|
|
|
|
|
+
|
|
|
|
|
+{chr(10).join(results_text)}
|
|
|
|
|
+
|
|
|
|
|
+**输出要求:**
|
|
|
|
|
+1. 仔细分析所有窗口的结果,识别重复和冗余信息
|
|
|
|
|
+2. 保留所有独特的、有价值的内容
|
|
|
|
|
+3. 对于重复出现的内容,只保留一次,选择描述最完整的版本
|
|
|
|
|
+4. 按照原始JSON格式输出,保持维度字段结构不变
|
|
|
|
|
+5. 每个维度字段下的"分块"数组应该包含所有去重后的独特内容
|
|
|
|
|
+6. 必须输出有效的JSON格式,不要添加任何额外的说明文字
|
|
|
|
|
+
|
|
|
|
|
+**输出格式示例:**
|
|
|
|
|
+{output_format}
|
|
|
|
|
+
|
|
|
|
|
+请直接输出汇总后的JSON结果:"""
|
|
|
|
|
+
|
|
|
|
|
+ return prompt
|
|
|
|
|
+
|
|
|
|
|
+ def _stitch_images_for_pages(
|
|
|
|
|
+ self,
|
|
|
|
|
+ page_numbers: List[int],
|
|
|
|
|
+ page_map: Dict[int, Image.Image]
|
|
|
|
|
+ ) -> Optional[Image.Image]:
|
|
|
|
|
+ """
|
|
|
|
|
+ 为指定页码范围拼接图片
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ page_numbers: 页码列表
|
|
|
|
|
+ page_map: 页码到图片的映射
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ 拼接后的图片,如果失败返回None
|
|
|
|
|
+ """
|
|
|
|
|
+ try:
|
|
|
|
|
+ # 获取对应页码的图片
|
|
|
|
|
+ images = []
|
|
|
|
|
+ for page_num in page_numbers:
|
|
|
|
|
+ image = page_map.get(page_num)
|
|
|
|
|
+ if image and isinstance(image, Image.Image):
|
|
|
|
|
+ images.append(image)
|
|
|
|
|
+ else:
|
|
|
|
|
+ logger.warning(f"页码 {page_num} 的图片无效")
|
|
|
|
|
+
|
|
|
|
|
+ if not images:
|
|
|
|
|
+ logger.warning(f"页码范围 {page_numbers} 没有有效图片")
|
|
|
|
|
+ return None
|
|
|
|
|
+
|
|
|
|
|
+ # 计算拼接后图片的尺寸
|
|
|
|
|
+ max_width = max(img.width for img in images)
|
|
|
|
|
+ total_height = sum(img.height for img in images)
|
|
|
|
|
+
|
|
|
|
|
+ # 创建新的空白图片
|
|
|
|
|
+ stitched_image = Image.new('RGB', (max_width, total_height), color='white')
|
|
|
|
|
+
|
|
|
|
|
+ # 垂直拼接所有图片
|
|
|
|
|
+ current_y = 0
|
|
|
|
|
+ for img in images:
|
|
|
|
|
+ x_offset = (max_width - img.width) // 2
|
|
|
|
|
+ stitched_image.paste(img, (x_offset, current_y))
|
|
|
|
|
+ current_y += img.height
|
|
|
|
|
+
|
|
|
|
|
+ # 压缩图片
|
|
|
|
|
+ image_util = ImageUtil()
|
|
|
|
|
+
|
|
|
|
|
+ # 检查像素数量是否超过限制
|
|
|
|
|
+ max_pixels = Image.MAX_IMAGE_PIXELS
|
|
|
|
|
+ total_pixels = stitched_image.width * stitched_image.height
|
|
|
|
|
+ if total_pixels > max_pixels:
|
|
|
|
|
+ logger.warning(f"图片像素数 ({total_pixels}) 超过限制,进行缩放")
|
|
|
|
|
+ target_pixels = max_pixels * 0.8
|
|
|
|
|
+ scale_ratio = (target_pixels / total_pixels) ** 0.5
|
|
|
|
|
+ new_width = int(stitched_image.width * scale_ratio)
|
|
|
|
|
+ new_height = int(stitched_image.height * scale_ratio)
|
|
|
|
|
+ stitched_image = stitched_image.resize((new_width, new_height), Image.Resampling.LANCZOS)
|
|
|
|
|
+
|
|
|
|
|
+ # 压缩图片
|
|
|
|
|
+ image_stream = BytesIO()
|
|
|
|
|
+ stitched_image.save(image_stream, format='JPEG')
|
|
|
|
|
+ image_stream.seek(0)
|
|
|
|
|
+ compressed_bytes = image_util._compress_image_to_bytes(image_stream)
|
|
|
|
|
+ compressed_image = Image.open(BytesIO(compressed_bytes))
|
|
|
|
|
+
|
|
|
|
|
+ return compressed_image
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ logger.error(f"拼接页码 {page_numbers} 的图片失败: {str(e)}")
|
|
|
|
|
+ return None
|
|
|
|
|
+
|
|
|
|
|
+ def _upload_image_to_minio(
|
|
|
|
|
+ self,
|
|
|
|
|
+ image: Image.Image,
|
|
|
|
|
+ dimension_id: int,
|
|
|
|
|
+ chunk_index: int
|
|
|
|
|
+ ) -> Optional[str]:
|
|
|
|
|
+ """
|
|
|
|
|
+ 上传图片到MinIO
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ image: PIL图像对象
|
|
|
|
|
+ dimension_id: 维度ID
|
|
|
|
|
+ chunk_index: 分块索引
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ MinIO URL,如果失败返回None
|
|
|
|
|
+ """
|
|
|
|
|
+ try:
|
|
|
|
|
+ # 将图片转换为字节流
|
|
|
|
|
+ image_stream = BytesIO()
|
|
|
|
|
+ image.save(image_stream, format='JPEG')
|
|
|
|
|
+ image_stream.seek(0)
|
|
|
|
|
+
|
|
|
|
|
+ # 生成文件名
|
|
|
|
|
+ filename = f"dimension_{dimension_id}_chunk_{chunk_index}.jpg"
|
|
|
|
|
+
|
|
|
|
|
+ # 上传到MinIO
|
|
|
|
|
+ # 上传图片到MinIO,获取URL
|
|
|
|
|
+ minio_util = MinIOUtil()
|
|
|
|
|
+ bucket_name = "bookpage"
|
|
|
|
|
+ url = minio_util.custom_upload_file(file=image_stream, original_filename=filename, bucket_name=bucket_name)
|
|
|
|
|
+
|
|
|
|
|
+ # url = minio_util.upload_file(image_stream, filename)
|
|
|
|
|
+
|
|
|
|
|
+ logger.debug(f"图片上传成功: {url}")
|
|
|
|
|
+ return url
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ logger.error(f"上传图片到MinIO失败: {str(e)}")
|
|
|
|
|
+ return None
|
|
|
|
|
+
|
|
|
|
|
+ def _extract_page_numbers_from_windows(
|
|
|
|
|
+ self,
|
|
|
|
|
+ windowed_results: List[Dict[str, Any]],
|
|
|
|
|
+ chunk_content: str
|
|
|
|
|
+ ) -> List[int]:
|
|
|
|
|
+ """
|
|
|
|
|
+ 从窗口结果中提取分块对应的页码范围
|
|
|
|
|
+
|
|
|
|
|
+ 策略:找到包含该分块内容的所有窗口,合并它们的页码范围
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ windowed_results: 窗口结果列表
|
|
|
|
|
+ chunk_content: 分块内容
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ 页码列表
|
|
|
|
|
+ """
|
|
|
|
|
+ page_numbers = set()
|
|
|
|
|
+
|
|
|
|
|
+ # 遍历所有窗口,找到包含该内容的窗口
|
|
|
|
|
+ for result in windowed_results:
|
|
|
|
|
+ parsed_content = result.get("parsed_content", {})
|
|
|
|
|
+
|
|
|
|
|
+ # 检查该窗口是否包含该分块内容
|
|
|
|
|
+ content_found = False
|
|
|
|
|
+ for key, value in parsed_content.items():
|
|
|
|
|
+ if isinstance(value, list):
|
|
|
|
|
+ for item in value:
|
|
|
|
|
+ if isinstance(item, dict):
|
|
|
|
|
+ item_content = item.get("内容", "")
|
|
|
|
|
+ # 简单的内容匹配(可以优化为更精确的匹配)
|
|
|
|
|
+ if chunk_content and item_content and chunk_content in item_content:
|
|
|
|
|
+ content_found = True
|
|
|
|
|
+ break
|
|
|
|
|
+ if content_found:
|
|
|
|
|
+ break
|
|
|
|
|
+
|
|
|
|
|
+ # 如果找到,添加该窗口的页码范围
|
|
|
|
|
+ if content_found:
|
|
|
|
|
+ window_page_numbers = result.get("page_numbers", [])
|
|
|
|
|
+ page_numbers.update(window_page_numbers)
|
|
|
|
|
+
|
|
|
|
|
+ # 如果没有找到匹配的窗口,使用中心页
|
|
|
|
|
+ if not page_numbers and windowed_results:
|
|
|
|
|
+ # 使用第一个窗口的中心页作为默认值
|
|
|
|
|
+ page_numbers.add(windowed_results[0].get("center_page", 1))
|
|
|
|
|
+
|
|
|
|
|
+ return sorted(list(page_numbers))
|
|
|
|
|
+
|
|
|
|
|
+ def _split_by_chunks(
|
|
|
|
|
+ self,
|
|
|
|
|
+ aggregated_result: Dict[str, Any],
|
|
|
|
|
+ windowed_results: List[Dict[str, Any]],
|
|
|
|
|
+ split_pages: List[Dict[str, Any]]
|
|
|
|
|
+ ) -> List[Dict[str, Any]]:
|
|
|
|
|
+ """
|
|
|
|
|
+ 将汇总结果按"分块"对象拆分,并为每个分块拼接对应页码的图片上传到MinIO
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ aggregated_result: 汇总后的JSON结果
|
|
|
|
|
+ windowed_results: 窗口结果列表(用于提取页码范围)
|
|
|
|
|
+ split_pages: 原始分页图片列表
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ 拆分后的分块列表,格式为:
|
|
|
|
|
+ {
|
|
|
|
|
+ "page_number": int,
|
|
|
|
|
+ "chunk_id": "",
|
|
|
|
|
+ "content": dict, # 直接存储chunk的JSON对象
|
|
|
|
|
+ "model": str,
|
|
|
|
|
+ "image_path": str
|
|
|
|
|
+ }
|
|
|
|
|
+ """
|
|
|
|
|
+ parsed_results = []
|
|
|
|
|
+
|
|
|
|
|
+ if not aggregated_result:
|
|
|
|
|
+ return parsed_results
|
|
|
|
|
+
|
|
|
|
|
+ # 创建页码到图片的映射
|
|
|
|
|
+ page_map = {page.get('page_number'): page.get('image') for page in split_pages}
|
|
|
|
|
+
|
|
|
|
|
+ chunk_index = 0
|
|
|
|
|
+
|
|
|
|
|
+ # 遍历JSON中的所有维度字段(排除"页码"等元数据字段)
|
|
|
|
|
+ for key, value in aggregated_result.items():
|
|
|
|
|
+ if key in ["页码", "page_number"] or not isinstance(value, list):
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ # 将每个维度的分块对象展开到parsed_results中
|
|
|
|
|
+ for chunk in value:
|
|
|
|
|
+ if isinstance(chunk, dict):
|
|
|
|
|
+ chunk_content = chunk.get("内容", "")
|
|
|
|
|
+
|
|
|
|
|
+ # 提取该分块对应的页码范围
|
|
|
|
|
+ page_numbers = self._extract_page_numbers_from_windows(
|
|
|
|
|
+ windowed_results,
|
|
|
|
|
+ chunk_content
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ # 使用第一个页码作为主页码
|
|
|
|
|
+ page_number = page_numbers[0] if page_numbers else 0
|
|
|
|
|
+
|
|
|
|
|
+ # 拼接对应页码的图片
|
|
|
|
|
+ stitched_image = self._stitch_images_for_pages(page_numbers, page_map)
|
|
|
|
|
+
|
|
|
|
|
+ # 上传到MinIO
|
|
|
|
|
+
|
|
|
|
|
+ image_url = None
|
|
|
|
|
+ if stitched_image:
|
|
|
|
|
+ image_url = self._upload_image_to_minio(
|
|
|
|
|
+ stitched_image,
|
|
|
|
|
+ self.dimension_id or 0,
|
|
|
|
|
+ chunk_index
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ # 统一格式:直接存储chunk的JSON对象
|
|
|
|
|
+ parsed_results.append({
|
|
|
|
|
+ "page_number": page_number,
|
|
|
|
|
+ "chunk_id": "", # 后续由RAGFlowChunkNode填充
|
|
|
|
|
+ "content": chunk, # 直接存储chunk的JSON对象
|
|
|
|
|
+ "model": self.model_name,
|
|
|
|
|
+ "image_path": image_url or "", # 拼接后图片的MinIO URL
|
|
|
|
|
+ # 额外信息(供调试和后续使用)
|
|
|
|
|
+ "_dimension": key, # 维度名称
|
|
|
|
|
+ "_page_numbers": page_numbers, # 该分块对应的页码列表
|
|
|
|
|
+ "_image": stitched_image, # PIL图像对象(供后续节点使用)
|
|
|
|
|
+ "_image_url": image_url # MinIO URL
|
|
|
|
|
+ })
|
|
|
|
|
+
|
|
|
|
|
+ chunk_index += 1
|
|
|
|
|
+
|
|
|
|
|
+ logger.info(f"共生成 {len(parsed_results)} 个分块,已上传到MinIO")
|
|
|
|
|
+
|
|
|
|
|
+ return parsed_results
|
|
|
|
|
+
|
|
|
|
|
+ def execute(self, state: BaseState) -> Dict[str, Any]:
|
|
|
|
|
+ """
|
|
|
|
|
+ 执行结果汇总
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ state: 包含windowed_results和split_pages的状态
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ 包含parsed_results的更新字典
|
|
|
|
|
+ """
|
|
|
|
|
+ windowed_results = getattr(state, 'windowed_results', None)
|
|
|
|
|
+ split_pages = getattr(state, 'split_pages', None)
|
|
|
|
|
+ dimension_name = getattr(state, 'dimension_name', '未知维度')
|
|
|
|
|
+ prompt_template = getattr(state, 'dimension_prompt', None)
|
|
|
|
|
+
|
|
|
|
|
+ if not windowed_results:
|
|
|
|
|
+ logger.warning("没有待汇总的窗口结果")
|
|
|
|
|
+ return {"parsed_results": []}
|
|
|
|
|
+
|
|
|
|
|
+ if not split_pages:
|
|
|
|
|
+ logger.warning("缺少原始分页图片,无法拼接分块图片")
|
|
|
|
|
+ split_pages = []
|
|
|
|
|
+
|
|
|
|
|
+ logger.info(f"开始汇总 {len(windowed_results)} 个窗口的解析结果")
|
|
|
|
|
+
|
|
|
|
|
+ try:
|
|
|
|
|
+ # 获取原提示词中的输出格式
|
|
|
|
|
+ output_format = self._extract_output_format(prompt_template) if prompt_template else None
|
|
|
|
|
+
|
|
|
|
|
+ # 构建汇总提示词
|
|
|
|
|
+ prompt = self._build_aggregation_prompt(windowed_results, dimension_name, output_format)
|
|
|
|
|
+
|
|
|
|
|
+ # 使用Chat模型进行汇总
|
|
|
|
|
+ parser = QWenVLParser(self.model_name)
|
|
|
|
|
+ aggregated_content = parser.chat(prompt)
|
|
|
|
|
+
|
|
|
|
|
+ logger.debug(f"Chat模型返回内容: {aggregated_content[:500]}...")
|
|
|
|
|
+
|
|
|
|
|
+ # 解析JSON结果
|
|
|
|
|
+ aggregated_result = parse_json_response(aggregated_content, expected_type=dict)
|
|
|
|
|
+
|
|
|
|
|
+ # 按"分块"拆分结果,并为每个分块拼接图片上传到MinIO
|
|
|
|
|
+ parsed_results = self._split_by_chunks(
|
|
|
|
|
+ aggregated_result,
|
|
|
|
|
+ windowed_results,
|
|
|
|
|
+ split_pages
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ logger.info(f"结果汇总完成,共生成 {len(parsed_results)} 个分块")
|
|
|
|
|
+
|
|
|
|
|
+ return {
|
|
|
|
|
+ "parsed_results": parsed_results,
|
|
|
|
|
+ "aggregated_result": aggregated_result # 保留完整的汇总结果供调试
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ logger.error(f"结果汇总失败: {str(e)}")
|
|
|
|
|
+ # 如果汇总失败,尝试直接合并所有窗口结果
|
|
|
|
|
+ logger.warning("尝试使用简单合并策略")
|
|
|
|
|
+ return self._fallback_merge(windowed_results, split_pages)
|
|
|
|
|
+
|
|
|
|
|
+ def _extract_output_format(self, text: str) -> Optional[str]:
|
|
|
|
|
+ """
|
|
|
|
|
+ 从提示词中提取输出格式部分
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ text: 提示词文本
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ 输出格式文本,如果未找到返回None
|
|
|
|
|
+ """
|
|
|
|
|
+ if not text:
|
|
|
|
|
+ return None
|
|
|
|
|
+
|
|
|
|
|
+ pattern = r"(## 输出格式要求[\s\S]*?)(?=\n## |\Z)"
|
|
|
|
|
+ m = re.search(pattern, text)
|
|
|
|
|
+ return m.group(1).strip() if m else None
|
|
|
|
|
+
|
|
|
|
|
+ def _fallback_merge(
|
|
|
|
|
+ self,
|
|
|
|
|
+ windowed_results: List[Dict[str, Any]],
|
|
|
|
|
+ split_pages: List[Dict[str, Any]]
|
|
|
|
|
+ ) -> Dict[str, Any]:
|
|
|
|
|
+ """
|
|
|
|
|
+ 备用合并策略:简单合并所有窗口结果
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ windowed_results: 窗口结果列表
|
|
|
|
|
+ split_pages: 原始分页图片列表
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ 合并后的结果,格式为:
|
|
|
|
|
+ {
|
|
|
|
|
+ "parsed_results": [
|
|
|
|
|
+ {
|
|
|
|
|
+ "page_number": int,
|
|
|
|
|
+ "chunk_id": "",
|
|
|
|
|
+ "content": dict, # 直接存储chunk的JSON对象
|
|
|
|
|
+ "model": str,
|
|
|
|
|
+ "image_path": str
|
|
|
|
|
+ }
|
|
|
|
|
+ ]
|
|
|
|
|
+ }
|
|
|
|
|
+ """
|
|
|
|
|
+ parsed_results = []
|
|
|
|
|
+ page_map = {page.get('page_number'): page.get('image') for page in split_pages}
|
|
|
|
|
+ chunk_index = 0
|
|
|
|
|
+
|
|
|
|
|
+ for result in windowed_results:
|
|
|
|
|
+ parsed_content = result.get("parsed_content", {})
|
|
|
|
|
+ center_page = result.get("center_page", 0)
|
|
|
|
|
+ page_numbers = result.get("page_numbers", [center_page])
|
|
|
|
|
+
|
|
|
|
|
+ # 遍历每个维度字段
|
|
|
|
|
+ for key, value in parsed_content.items():
|
|
|
|
|
+ if key in ["页码", "page_number"] or not isinstance(value, list):
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ # 展开分块
|
|
|
|
|
+ for chunk in value:
|
|
|
|
|
+ if isinstance(chunk, dict):
|
|
|
|
|
+ # 使用第一个页码作为主页码
|
|
|
|
|
+ page_number = page_numbers[0] if page_numbers else center_page
|
|
|
|
|
+
|
|
|
|
|
+ # 拼接图片
|
|
|
|
|
+ stitched_image = self._stitch_images_for_pages(page_numbers, page_map)
|
|
|
|
|
+
|
|
|
|
|
+ # 上传到MinIO
|
|
|
|
|
+ image_url = None
|
|
|
|
|
+ if stitched_image:
|
|
|
|
|
+ image_url = self._upload_image_to_minio(
|
|
|
|
|
+ stitched_image,
|
|
|
|
|
+ self.dimension_id or 0,
|
|
|
|
|
+ chunk_index
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ # 统一格式:直接存储chunk的JSON对象
|
|
|
|
|
+ parsed_results.append({
|
|
|
|
|
+ "page_number": page_number,
|
|
|
|
|
+ "chunk_id": "",
|
|
|
|
|
+ "content": chunk, # 直接存储chunk的JSON对象
|
|
|
|
|
+ "model": self.model_name,
|
|
|
|
|
+ "image_path": image_url or "", # 拼接后图片的MinIO URL
|
|
|
|
|
+ # 额外信息
|
|
|
|
|
+ "_dimension": key,
|
|
|
|
|
+ "_page_numbers": page_numbers,
|
|
|
|
|
+ "_image": stitched_image,
|
|
|
|
|
+ "_image_url": image_url
|
|
|
|
|
+ })
|
|
|
|
|
+
|
|
|
|
|
+ chunk_index += 1
|
|
|
|
|
+
|
|
|
|
|
+ logger.info(f"备用合并完成,共 {len(parsed_results)} 个分块")
|
|
|
|
|
+
|
|
|
|
|
+ return {
|
|
|
|
|
+ "parsed_results": parsed_results
|
|
|
|
|
+ }
|
|
|
|
|
+
|