Explorar o código

19维度图书解析流程调整-新增滑动窗口模式

yingge hai 3 meses
pai
achega
4b4e2b9984

+ 6 - 6
src/datasets/parser/nodes/image_parse_node.py

@@ -147,12 +147,12 @@ class ImageParseNode(BaseNode):
         try:
             # 使用共享的Parser实例(已启用图片编码缓存)
             result = self.parser.parse_image(image, page_number, prompt)
-            content = result.get("content", "")
+            # content = result.get("content", "")
             
             # JSON解析也在工作线程中完成,避免阻塞主线程
-            parsed_content = parse_json_response(content, expected_type=dict)
+            # parsed_content = parse_json_response(content, expected_type=dict)
             
-            result["content"] = parsed_content
+            # result["content"] = parsed_content
             
             logger.debug(f"第 {page_number} 页解析完成")
             
@@ -194,11 +194,11 @@ class ImageParseNode(BaseNode):
         try:
             # 使用共享的Parser实例
             result = self.parser.parse_image(book_image, 0, prompt_template)
-            content = result.get("content", "")
-            parsed_content = parse_json_response(content, expected_type=dict)
+            # content = result.get("content", "")
+            # parsed_content = parse_json_response(content, expected_type=dict)
             
             logger.info("完整书本图片解析完成")
-            return parsed_content
+            return result
         except Exception as e:
             logger.error(f"解析完整书本图片时出错: {str(e)}")
             return {

+ 134 - 105
src/datasets/parser/nodes/result_aggregation_node.py

@@ -58,22 +58,17 @@ class ResultAggregationNode(BaseNode):
 
     def _get_default_output(self) -> str:
         """获取默认输出格式"""
-        return """```json
-{
-  "维度字段1": [
-    {
-      "内容": "...",
-      "分析": "..."
-    }
-  ],
-  "维度字段2": [
-    {
-      "内容": "...",
-      "分析": "..."
-    }
-  ]
-}
-```"""
+        return """
+        ### 页码:{page_number}
+        #### 分块ID:分块ID
+        **分块类型**:{{类型}}
+        **分块标题**:{{标题}}
+        **分块摘要**:{{一句话摘要}}
+        **分块标签**:标签1, 标签2
+        **分块内容**:
+            - **内容描述**:{{描述图片中观察到的原始文字或图像信息}}
+            - **深度分析**:{{基于标准的合规性或逻辑判定逻辑}}
+        """
     
     def _build_aggregation_prompt(
         self, 
@@ -104,14 +99,14 @@ class ResultAggregationNode(BaseNode):
             parsed_content = result.get("parsed_content", {})
             
             results_text.append(f"## 窗口 {i} (中心页: {center_page}, 范围: {page_range})")
-            if isinstance(parsed_content, (dict, list)):
-                # 如果是对象,转为格式化字符串
-                content_str = json.dumps(parsed_content, ensure_ascii=False, indent=2)
-            else:
-                # 如果是字符串或其他,转为强类型 string 即可
-                content_str = str(parsed_content)
-            results_text.append(content_str)
-            results_text.append("")
+            # if isinstance(parsed_content, (dict, list)):
+            #     # 如果是对象,转为格式化字符串
+            #     content_str = json.dumps(parsed_content, ensure_ascii=False, indent=2)
+            # else:
+            #     # 如果是字符串或其他,转为强类型 string 即可
+            #     content_str = str(parsed_content)
+            results_text.append(parsed_content)
+            results_text.append("---")
         
         prompt = f"""你是一个专业的内容分析专家。现在需要你汇总和整合多个滑动窗口的解析结果。
 
@@ -130,14 +125,17 @@ class ResultAggregationNode(BaseNode):
 1. 仔细分析所有窗口的结果,识别重复和冗余信息
 2. 保留所有独特的、有价值的内容
 3. 对于重复出现的内容,只保留一次,选择描述最完整的版本
-4. 按照原始JSON格式输出,保持维度字段结构不变
+4. 按照原始格式输出,保持文本维度结构不变
 5. 每个维度字段下的"分块"数组应该包含所有去重后的独特内容
-6. 必须输出有效的JSON格式,不要添加任何额外的说明文字
+6. 必须严格包含输出格式的内容,不要添加任何额外的说明文字
+7. 对于角色人设,务必根据角色名称判断是否重复,如果重复,则选择描述最完整的版本
+8. 避免冗长描述
 
 **输出格式示例:**
 {output_format}
+[页码范围]:(2-5)
 
-请直接输出汇总后的JSON结果:"""
+请直接输出汇总后的结果:"""
         
         return prompt
     
@@ -252,78 +250,106 @@ class ResultAggregationNode(BaseNode):
             logger.error(f"上传图片到MinIO失败: {str(e)}")
             return None
     
+    def _extract_page_range_from_text(self, text: str) -> List[int]:
+        """
+        从文本中提取页码范围标记 [页码范围]:(2-5)
+        
+        Args:
+            text: 包含页码范围标记的文本
+            
+        Returns:
+            页码列表,例如 [2, 3, 4, 5]
+        """
+        import re
+        
+        # 匹配模式:[页码范围]:(数字-数字) 或 [页码范围]:(数字)
+        pattern = r'\[页码范围\]:\((\d+)(?:-(\d+))?\)'
+        
+        match = re.search(pattern, text)
+        if match:
+            start_page = int(match.group(1))
+            end_page = int(match.group(2)) if match.group(2) else start_page
+            
+            # 生成页码列表
+            page_numbers = list(range(start_page, end_page + 1))
+            logger.debug(f"从文本中提取到页码范围: {start_page}-{end_page} → {page_numbers}")
+            return page_numbers
+        
+        logger.debug("未找到页码范围标记,返回空列表")
+        return []
+    
     def _extract_page_numbers_from_windows(
         self, 
         windowed_results: List[Dict[str, Any]],
         chunk_content: str
     ) -> List[int]:
         """
-        从窗口结果中提取分块对应的页码范围
+        从窗口结果中提取分块对应的页码范围(适配Markdown格式)
         
         策略:找到包含该分块内容的所有窗口,合并它们的页码范围
         
         Args:
             windowed_results: 窗口结果列表
-            chunk_content: 分块内容
+            chunk_content: 分块内容(Markdown格式的文本)
             
         Returns:
             页码列表
         """
         page_numbers = set()
         
+        # 如果chunk_content为空或太短,使用默认策略
+        if not chunk_content or len(chunk_content.strip()) < 10:
+            if windowed_results:
+                page_numbers.add(windowed_results[0].get("center_page", 1))
+            return sorted(list(page_numbers))
+        
+        # 提取chunk_content的关键内容用于匹配
+        # 去除Markdown标记,提取纯文本内容
+        chunk_text = chunk_content.replace("###", "").replace("####", "").replace("**", "").replace("- ", "").replace("---", "").strip()
+        # 取前100个字符作为匹配关键词(避免过长)
+        chunk_keywords = chunk_text[:100] if len(chunk_text) > 100 else chunk_text
+        
         # 遍历所有窗口,找到包含该内容的窗口
         for result in windowed_results:
             parsed_content = result.get("parsed_content", {})
             
             # 检查该窗口是否包含该分块内容
             content_found = False
-            for key, value in parsed_content.items():
-                if isinstance(value, list):
-                    for item in value:
-                        if isinstance(item, dict):
-                            item_content = item.get("内容", "")
-                            # 简单的内容匹配(可以优化为更精确的匹配)
-                            if chunk_content and item_content and chunk_content in item_content:
-                                content_found = True
-                                break
-                if content_found:
-                    break
+            # 直接进行文本匹配
+            if chunk_keywords in parsed_content:
+                content_found = True
             
             # 如果找到,添加该窗口的页码范围
             if content_found:
                 window_page_numbers = result.get("page_numbers", [])
                 page_numbers.update(window_page_numbers)
         
-        # 如果没有找到匹配的窗口,使用中心页
+        # 如果没有找到匹配的窗口,使用第一个窗口的中心页
         if not page_numbers and windowed_results:
-            # 使用第一个窗口的中心页作为默认值
             page_numbers.add(windowed_results[0].get("center_page", 1))
         
         return sorted(list(page_numbers))
     
     def _split_by_chunks(
         self, 
-        aggregated_result: Dict[str, Any],
+        aggregated_result: Any,
         windowed_results: List[Dict[str, Any]],
         split_pages: List[Dict[str, Any]]
     ) -> List[Dict[str, Any]]:
         """
-        将汇总结果按"分块"对象拆分,并为每个分块拼接对应页码的图片上传到MinIO
+        将汇总结果按分块拆分,并为每个分块拼接对应页码的图片上传到MinIO
+        
+        支持两种格式:
+        1. Markdown格式:使用 --- 分隔分块
+        2. JSON格式:字典结构(向后兼容)
         
         Args:
-            aggregated_result: 汇总后的JSON结果
+            aggregated_result: 汇总后的结果(Markdown字符串或JSON字典)
             windowed_results: 窗口结果列表(用于提取页码范围)
             split_pages: 原始分页图片列表
             
         Returns:
-            拆分后的分块列表,格式为:
-            {
-                "page_number": int,
-                "chunk_id": "",
-                "content": dict,  # 直接存储chunk的JSON对象
-                "model": str,
-                "image_path": str
-            }
+            拆分后的分块列表
         """
         parsed_results = []
         
@@ -334,54 +360,60 @@ class ResultAggregationNode(BaseNode):
         page_map = {page.get('page_number'): page.get('image') for page in split_pages}
         
         chunk_index = 0
-        
-        # 遍历JSON中的所有维度字段(排除"页码"等元数据字段)
-        for key, value in aggregated_result.items():
-            if key in ["页码", "page_number"] or not isinstance(value, list):
-                continue
-            
-            # 将每个维度的分块对象展开到parsed_results中
-            for chunk in value:
-                if isinstance(chunk, dict):
-                    chunk_content = chunk.get("内容", "")
-                    
-                    # 提取该分块对应的页码范围
-                    page_numbers = self._extract_page_numbers_from_windows(
-                        windowed_results, 
-                        chunk_content
-                    )
-                    
-                    # 使用第一个页码作为主页码
-                    page_number = page_numbers[0] if page_numbers else 0
-                    
-                    # 拼接对应页码的图片
-                    stitched_image = self._stitch_images_for_pages(page_numbers, page_map)
-                    
-                    # 上传到MinIO
     
-                    image_url = None
-                    if stitched_image:
-                        image_url = self._upload_image_to_minio(
-                            stitched_image,
-                            self.dimension_id or 0,
-                            chunk_index
-                        )
-                    
-                    # 统一格式:直接存储chunk的JSON对象
-                    parsed_results.append({
-                        "page_number": page_number,
-                        "chunk_id": "",  # 后续由RAGFlowChunkNode填充
-                        "content": chunk,  # 直接存储chunk的JSON对象
-                        "model": self.model_name,
-                        "image_path": image_url or "",  # 拼接后图片的MinIO URL
-                        # 额外信息(供调试和后续使用)
-                        "_dimension": key,  # 维度名称
-                        "_page_numbers": page_numbers,  # 该分块对应的页码列表
-                        "_image": stitched_image,  # PIL图像对象(供后续节点使用)
-                        "_image_url": image_url  # MinIO URL
-                    })
-                    
-                    chunk_index += 1
+        # Markdown格式:按 --- 分割分块
+        logger.info("检测到Markdown格式的汇总结果,按 --- 分割分块")
+            
+        # 按 --- 分割
+        chunks = aggregated_result.split('---')
+            
+        for chunk_text in chunks:
+            chunk_text = chunk_text.strip()
+            if not chunk_text:
+                continue
+                
+            # 从文本中提取页码范围标记 [页码范围]:(2-5)
+            page_numbers = self._extract_page_range_from_text(chunk_text)
+                
+            # 如果没有找到页码范围标记,尝试从窗口结果中匹配
+            if not page_numbers:
+                logger.debug("未找到页码范围标记,尝试从窗口结果中匹配")
+                page_numbers = self._extract_page_numbers_from_windows(
+                    windowed_results, 
+                    chunk_text
+                )
+                
+            # 使用第一个页码作为主页码
+            page_number = page_numbers[0] if page_numbers else 0
+                
+            # 拼接对应页码的图片
+            stitched_image = self._stitch_images_for_pages(page_numbers, page_map)
+                
+            # 上传到MinIO
+            image_url = None
+            if stitched_image:
+                image_url = self._upload_image_to_minio(
+                    stitched_image,
+                    self.dimension_id or 0,
+                    chunk_index
+                )
+                
+            # 统一格式:直接存储chunk文本
+            parsed_results.append({
+                "page_number": page_number,
+                "chunk_id": "",
+                "content": chunk_text,  # Markdown格式的文本
+                "model": self.model_name,
+                "image_path": image_url or "",
+                # 额外信息
+                "_page_numbers": page_numbers,
+                "_image": stitched_image,
+                "_image_url": image_url
+            })
+                
+            chunk_index += 1
+        else:
+            logger.warning(f"未知的汇总结果格式: {type(aggregated_result)}")
         
         logger.info(f"共生成 {len(parsed_results)} 个分块,已上传到MinIO")
         
@@ -425,12 +457,9 @@ class ResultAggregationNode(BaseNode):
             
             logger.debug(f"Chat模型返回内容: {aggregated_content[:500]}...")
             
-            # 解析JSON结果
-            aggregated_result = parse_json_response(aggregated_content, expected_type=dict)
-            
             # 按"分块"拆分结果,并为每个分块拼接图片上传到MinIO
             parsed_results = self._split_by_chunks(
-                aggregated_result, 
+                aggregated_content,
                 windowed_results,
                 split_pages
             )
@@ -439,7 +468,7 @@ class ResultAggregationNode(BaseNode):
             
             return {
                 "parsed_results": parsed_results,
-                "aggregated_result": aggregated_result  # 保留完整的汇总结果供调试
+                "aggregated_result": aggregated_content  # 保留完整的汇总结果供调试
             }
             
         except Exception as e:

+ 6 - 6
src/datasets/parser/nodes/sliding_window_parse_node_async.py

@@ -137,15 +137,15 @@ class SlidingWindowParseNodeAsync(BaseNode):
             try:
                 # 异步调用模型
                 result = await self.parser.parse_image_async(image, center_page, prompt)
-                content = result.get("content", "")
+                parsed_content = result.get("content", "")
                 
                 # JSON解析(在协程中完成)
-                parsed_content = parse_json_response(content, expected_type=dict)
+                # parsed_content = parse_json_response(content, expected_type=dict)
                 
-                # 如果解析失败返回字符串,使用空字典并记录警告
-                if isinstance(parsed_content, str):
-                    logger.warning(f"窗口 (中心页: {center_page}) 异步解析结果无法转换为JSON,使用原始字符串")
-                    parsed_content = {"原始内容": parsed_content}
+                # # 如果解析失败返回字符串,使用空字典并记录警告
+                # if isinstance(parsed_content, str):
+                #     logger.warning(f"窗口 (中心页: {center_page}) 异步解析结果无法转换为JSON,使用原始字符串")
+                #     parsed_content = {"原始内容": parsed_content}
                 
                 logger.debug(f"窗口 (中心页: {center_page}) 异步解析完成")