Просмотр исходного кода

19维度图书解析流程调整-新增滑动窗口模式

yingge 3 месяцев назад
Родитель
Сommit
770cfa9bff

+ 52 - 42
src/datasets/parser/nodes/ragflow_nodes.py

@@ -277,50 +277,60 @@ class RAGFlowChunkNode(BaseNode):
         logger.info(f"开始创建Chunks,共 {len(parsed_results)} 页")
         
         for i, parsed_result in enumerate(parsed_results):
-            page_number = parsed_result.get("page_number", i + 1)
-            content = parsed_result.get("content", "")
-            
-            # 处理content:如果是字典,转换为JSON字符串;如果是字符串,直接使用
-            if isinstance(content, dict):
-                text = json.dumps(content, ensure_ascii=False, indent=2)
-            else:
-                text = str(content)
-            
-            # 优先从parsed_result中获取image_path(滑动窗口模式)
-            # 如果没有,则从split_pages中获取(分页模式)
-            image_path = parsed_result.get("image_path", "")
-            if not image_path and i < len(split_pages):
-                image_path = split_pages[i].get("image_path", "")
-            
-            # 生成img_id
-            img_id = ""
-            if image_path:
-                # 从URL或路径中提取文件名
-                filename = os.path.basename(image_path)
-                # 如果是MinIO URL,可能包含查询参数,需要清理
-                if '?' in filename:
-                    filename = filename.split('?')[0]
+            try:
+                # 安全获取page_number
+                if not isinstance(parsed_result, dict):
+                    logger.warning(f"解析结果 {i} 不是字典类型: {type(parsed_result)},跳过")
+                    continue
+                
+                page_number = parsed_result.get("page_number", i + 1)
+                content = parsed_result.get("content", "")
+                
+                # 处理content:如果是字典,转换为JSON字符串;如果是字符串,直接使用
+                if isinstance(content, dict):
+                    text = json.dumps(content, ensure_ascii=False, indent=2)
+                else:
+                    text = str(content)
+                
+                # 优先从parsed_result中获取image_path(滑动窗口模式)
+                # 如果没有,则从split_pages中获取(分页模式)
+                image_path = parsed_result.get("image_path", "")
+                if not image_path and i < len(split_pages):
+                    if isinstance(split_pages[i], dict):
+                        image_path = split_pages[i].get("image_path", "")
+                
                 # 生成img_id
-                img_id = f"bookpage-{filename.split('.')[0]}.png"
-            
-            chunk = self.ragflow_service.create_chunk(
-                dataset_id=page_dataset_id,
-                document_id=page_document_id,
-                content=text
-            )
-            chunk_id = chunk["chunk"]["id"]
-            parsed_result["chunk_id"] = chunk_id
-            logger.debug(f"创建第 {page_number} 页Chunk,ID: {chunk_id}, img_id: {img_id}")
-            
-            # 记录到定时任务表
-            if img_id:
-                get_chunk_record_service().record_chunk_add(
-                    database_name=vector_db_settings.infinity_ragflow_database,
-                    table_name=ragflow_settings.ragflow_dataset_prefix + "_" + ragflow_id,
-                    chunk_id=chunk_id,
-                    cond=f"_id = '{chunk_id}'",
-                    data={"img_id": img_id}
+                img_id = ""
+                if image_path:
+                    # 从URL或路径中提取文件名
+                    filename = os.path.basename(image_path)
+                    # 如果是MinIO URL,可能包含查询参数,需要清理
+                    if '?' in filename:
+                        filename = filename.split('?')[0]
+                    # 生成img_id
+                    img_id = f"bookpage-{filename.split('.')[0]}.png"
+                
+                chunk = self.ragflow_service.create_chunk(
+                    dataset_id=page_dataset_id,
+                    document_id=page_document_id,
+                    content=text
                 )
+                chunk_id = chunk["chunk"]["id"]
+                parsed_result["chunk_id"] = chunk_id
+                logger.debug(f"创建第 {page_number} 页Chunk,ID: {chunk_id}, img_id: {img_id}")
+                
+                # 记录到定时任务表
+                if img_id:
+                    get_chunk_record_service().record_chunk_add(
+                        database_name=vector_db_settings.infinity_ragflow_database,
+                        table_name=ragflow_settings.ragflow_dataset_prefix + "_" + ragflow_id,
+                        chunk_id=chunk_id,
+                        cond=f"_id = '{chunk_id}'",
+                        data={"img_id": img_id}
+                    )
+            except Exception as e:
+                logger.error(f"创建第 {i + 1} 个Chunk时出错: {str(e)}")
+                continue
         
         logger.info(f"Chunks创建完成")
         return {

+ 30 - 3
src/datasets/parser/nodes/result_aggregation_node.py

@@ -381,8 +381,18 @@ class ResultAggregationNode(BaseNode):
             return parsed_results
         
         try:
-            # 创建页码到图片的映射
-            page_map = {page.get('page_number'): page.get('image') for page in split_pages}
+            # 创建页码到图片的映射,安全处理可能的异常
+            page_map = {}
+            for page in split_pages:
+                if isinstance(page, dict):
+                    page_num = page.get('page_number')
+                    page_img = page.get('image')
+                    if page_num is not None and page_img is not None:
+                        page_map[page_num] = page_img
+                    else:
+                        logger.warning(f"页面数据缺少必要字段: {page}")
+                else:
+                    logger.warning(f"页面数据格式错误,期望字典,实际: {type(page)}")
         except Exception as e:
             logger.error(f"创建页码映射失败: {str(e)}")
             page_map = {}
@@ -632,7 +642,24 @@ class ResultAggregationNode(BaseNode):
             }
         """
         parsed_results = []
-        page_map = {page.get('page_number'): page.get('image') for page in split_pages}
+        
+        # 安全创建页码到图片的映射
+        page_map = {}
+        try:
+            for page in split_pages:
+                if isinstance(page, dict):
+                    page_num = page.get('page_number')
+                    page_img = page.get('image')
+                    if page_num is not None and page_img is not None:
+                        page_map[page_num] = page_img
+                    else:
+                        logger.warning(f"页面数据缺少必要字段: {page}")
+                else:
+                    logger.warning(f"页面数据格式错误,期望字典,实际: {type(page)}")
+        except Exception as e:
+            logger.error(f"创建页码映射失败: {str(e)}")
+            page_map = {}
+        
         chunk_index = 0
         
         for result in windowed_results:

+ 1 - 4
src/datasets/parser/nodes/sliding_window_parse_node.py

@@ -159,16 +159,13 @@ class SlidingWindowParseNode(BaseNode):
             # parse_image返回的是字典,需要提取content字段
             content = result.get("content", "")
             
-            # JSON解析也在工作线程中完成,避免阻塞主线程
-            parsed_content = parse_json_response(content, expected_type=dict)
-            
             logger.debug(f"窗口 (中心页: {center_page}) 解析完成")
             
             result_dict = {
                 "center_page": center_page,
                 "page_range": page_range,
                 "page_numbers": windowed_page.get("page_numbers", []),
-                "parsed_content": parsed_content
+                "parsed_content": content
             }
             
             # === 存入缓存 ===

+ 7 - 2
src/datasets/parser/workflow_nodes/dimension_page_split_node.py

@@ -16,6 +16,7 @@ from src.datasets.parser.nodes import (
     SummaryNode
 )
 from src.datasets.parser.states.parser_states import DynamicDimensionState
+from langfuse.langchain import CallbackHandler
 from src.common.logging_config import get_logger
 
 logger = get_logger(__name__)
@@ -38,6 +39,7 @@ class DimensionPageSplitNode(BaseNode):
     def __init__(
         self,
         dimension_id: int,
+        langfuse_handler: CallbackHandler,
         model_name: str = "Qwen/Qwen3-VL-8B-Instruct",
         max_workers: int = 5
     ):
@@ -50,6 +52,7 @@ class DimensionPageSplitNode(BaseNode):
             max_workers: 并行处理的最大工作线程数
         """
         self.dimension_id = dimension_id
+        self.langfuse_handler = langfuse_handler
         self.model_name = model_name
         self.max_workers = max_workers
     
@@ -148,8 +151,10 @@ class DimensionPageSplitNode(BaseNode):
         workflow = self._build_sub_workflow(state)
         
         # 执行子工作流
-        result = workflow.invoke(state)
-        
+        result = workflow.invoke(
+            state,
+            config={"callbacks": [self.langfuse_handler]}
+        )
         # 处理结果
         if isinstance(result, dict):
             final_result = result

+ 9 - 2
src/datasets/parser/workflow_nodes/dimension_sliding_window_node.py

@@ -21,6 +21,7 @@ from src.datasets.parser.nodes.sliding_window_parse_node import SlidingWindowPar
 from src.datasets.parser.nodes.sliding_window_parse_node_async import SlidingWindowParseNodeAsync
 from src.datasets.parser.nodes.result_aggregation_node import ResultAggregationNode
 from src.datasets.parser.states.parser_states import DynamicDimensionState
+from langfuse.langchain import CallbackHandler
 from src.common.logging_config import get_logger
 
 logger = get_logger(__name__)
@@ -49,12 +50,14 @@ class DimensionSlidingWindowNode(BaseNode):
     def __init__(
         self,
         dimension_id: int,
+        langfuse_handler: CallbackHandler,
         model_name: str = "Qwen/Qwen3-VL-8B-Instruct",
         max_workers: int = 5,
         window_size: int = 3,
         skip_stitching: bool = False,
         use_async: bool = False,
-        max_concurrent: int = 50
+        max_concurrent: int = 50,
+
     ):
         """
         初始化维度滑动窗口节点
@@ -69,6 +72,7 @@ class DimensionSlidingWindowNode(BaseNode):
             max_concurrent: 异步版本的最大并发数,默认50
         """
         self.dimension_id = dimension_id
+        self.langfuse_handler = langfuse_handler
         self.model_name = model_name
         self.max_workers = max_workers
         self.window_size = window_size
@@ -220,7 +224,10 @@ class DimensionSlidingWindowNode(BaseNode):
         workflow = self._build_sub_workflow(state)
         
         # 执行子工作流
-        result = workflow.invoke(state)
+        result = workflow.invoke(
+            state,
+            config={"callbacks": [self.langfuse_handler]}
+        )
         
         # 处理结果
         if isinstance(result, dict):

+ 3 - 0
src/datasets/parser/workflows/dynamic_dimension_workflow.py

@@ -201,6 +201,7 @@ class DynamicDimensionWorkflow:
             elif decomposition_method == 1:
                 skill_node = DimensionPageSplitNode(
                     dimension_id=dim_id,
+                    langfuse_handler=self.langfuse_handler,
                     model_name=self.model_name,
                     max_workers=optimal_workers  # 使用优化后的线程数
                 )
@@ -208,6 +209,7 @@ class DynamicDimensionWorkflow:
                 # 只有第一个滑动窗口维度需要执行拆分,后续维度复用结果
                 skill_node = DimensionSlidingWindowNode(
                     dimension_id=dim_id,
+                    langfuse_handler=self.langfuse_handler,
                     model_name=self.model_name,
                     max_workers=optimal_workers,  # 使用优化后的线程数(同步版本)
                     max_concurrent=self.max_concurrent,  # 异步版本的并发数
@@ -224,6 +226,7 @@ class DynamicDimensionWorkflow:
                 logger.warning(f"未知的分解方法: {decomposition_method},使用默认分页模式")
                 skill_node = DimensionPageSplitNode(
                     dimension_id=dim_id,
+                    langfuse_handler=self.langfuse_handler,
                     model_name=self.model_name,
                     max_workers=optimal_workers  # 使用优化后的线程数
                 )