Procházet zdrojové kódy

多维度流程拆分

yingge před 3 měsíci
rodič
revize
a192ae40cc

+ 13 - 1
doc/init.sql

@@ -45,4 +45,16 @@ CREATE TABLE IF NOT EXISTS api_keys (
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
 
 -- 插入一个示例 API 密钥(仅供测试使用)
-INSERT IGNORE INTO api_keys (api_key, is_active) VALUES ('sk-test1234567890', TRUE);
+INSERT IGNORE INTO api_keys (api_key, is_active) VALUES ('sk-test1234567890', TRUE);
+
+CREATE TABLE IF NOT EXISTS ragflow_user (
+    id INT AUTO_INCREMENT PRIMARY KEY COMMENT "主键ID",
+    user_id VARCHAR(64) NOT NULL UNIQUE COMMENT "RagFLow的用户id",
+    api_key VARCHAR(255) NOT NULL UNIQUE COMMENT "RagFLow的API密钥",
+    created_at DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT "创建时间",
+    expired_at DATETIME NULL COMMENT "过期时间",
+    is_active BOOLEAN DEFAULT TRUE COMMENT "是否有效",
+    INDEX idx_api_key (api_key),
+    INDEX idx_is_active (is_active)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
+

+ 5 - 5
main.py

@@ -52,15 +52,15 @@ async def main_lifespan(app: FastAPI):
     logger.info("✅ 提示词维度向量数据库表/索引已初始化")
 
     # 5. 启动Chunk更新定时任务
-    from src.job.chunk_update_job import start_scheduler, shutdown_scheduler
-    start_scheduler()
-    logger.info("✅ Chunk update scheduler started")
+    # from src.job.chunk_update_job import start_scheduler, shutdown_scheduler
+    # start_scheduler()
+    # logger.info("✅ Chunk update scheduler started")
     
     yield
 
     # 1. 关闭Chunk更新定时任务
-    shutdown_scheduler()
-    logger.info("✅ Chunk update scheduler shutdown")
+    # shutdown_scheduler()
+    # logger.info("✅ Chunk update scheduler shutdown")
 
     # 2. 关闭全局线程池
     from src.utils.async_utils import ThreadPoolManager

+ 2 - 0
sql/prompt_schema.sql

@@ -2,11 +2,13 @@
 CREATE TABLE `prompt_dimensions` (
   `id` int NOT NULL AUTO_INCREMENT,
   `name` varchar(255) NOT NULL COMMENT '维度名称,如:摘要生成、问答生成',
+  `dataset_id` varchar(64) NOT NULL COMMENT '数据集id',
   `description` text COMMENT '维度描述',
   `created_at` datetime DEFAULT CURRENT_TIMESTAMP,
   `updated_at` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
   PRIMARY KEY (`id`),
   UNIQUE KEY `idx_name` (`name`)
+  UNIQUE KEY `idx_dataset_id` (`dataset_id`)
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='提示词维度表';
 
 -- 提示词版本表

+ 79 - 26
src/api/db/services/prompt_service.py

@@ -8,7 +8,7 @@
 from typing import List, Dict, Any, Optional
 from src.utils.mysql import get_global_mysql_client
 from src.utils.vector_db import get_vector_db_client
-from src.conf.settings import vector_db_settings
+from src.conf.settings import vector_db_settings, ragflow_settings
 from src.common.logging_config import get_logger
 
 logger = get_logger(__name__)
@@ -20,12 +20,12 @@ INFINITY_TABLE_COLUMNS = [
     {"name": "file_name", "type": "varchar", "default": ""},
     {"name": "page_number", "type": "int", "default": 0},
     {"name": "content", "type": "varchar", "default": ""},
-    {"name": "doc_content", "type": "varchar", "default": ""},
     {"name": "image_path", "type": "varchar", "default": ""},
-    {"name": "dataset_id", "type": "varchar", "default": ""},
     {"name": "document_id", "type": "varchar", "default": ""},
+    {"name": "chunk_id", "type": "varchar", "default": ""},
     {"name": "ability_tags", "type": "varchar", "default": ""},
     {"name": "content_tag", "type": "varchar", "default": ""},
+    {"name": "metadata", "type": "varchar", "default": ""},
     {"name": "dense_vector_1024", "type": "vector,1024,float"},
 ]
 
@@ -35,13 +35,13 @@ ES_INDEX_MAPPINGS = {
         "id": {"type": "keyword"},
         "file_name": {"type": "keyword"},
         "page_number": {"type": "integer"},
-        "content": {"type": "text", "analyzer": "standard"},
-        "doc_content": {"type": "text", "analyzer": "standard"},
+        "content": {"type": "text", "analyzer": "ik_smart"},
         "image_path": {"type": "keyword"},
-        "dataset_id": {"type": "keyword"},
         "document_id": {"type": "keyword"},
+        "chunk_id": {"type": "keyword"},
         "ability_tags": {"type": "keyword"},
         "content_tag": {"type": "keyword"},
+        "metadata": {"type": "object"},
         "dense_vector_1024": {
             "type": "dense_vector",
             "dims": 1024,
@@ -50,6 +50,13 @@ ES_INDEX_MAPPINGS = {
         }
     }
 }
+# Elasticsearch 索引设置定义
+ES_INDEX_SETTINGS = {
+    "index": {
+        "number_of_shards": 2,
+        "number_of_replicas": 0
+    }
+}
 
 
 class PromptService:
@@ -66,9 +73,9 @@ class PromptService:
             self._vector_client = get_vector_db_client()
         return self._vector_client
     
-    def _get_table_name(self, dimension_id: int) -> str:
+    def _get_table_name(self, dimension: Dict) -> str:
         """获取维度对应的表名/索引名"""
-        return f"book_{dimension_id}"
+        return f"{ragflow_settings.custom_dataset_prefix}_{dimension['id']}_{dimension['dataset_id']}"
     
     def init_vector_db_tables(self):
         """
@@ -115,7 +122,7 @@ class PromptService:
             client = self._get_vector_client()
             
             for dim in dimensions:
-                index_name = self._get_table_name(dim['id'])
+                index_name = self._get_table_name(dim)
                 if not client.index_exists(index_name):
                     self._create_es_index(index_name)
                     logger.info(f"✅ 创建 ES 索引: {index_name} (维度: {dim['name']})")
@@ -135,27 +142,29 @@ class PromptService:
     def _create_es_index(self, index_name: str):
         """创建 Elasticsearch 索引"""
         client = self._get_vector_client()
-        client.create_index(
+        res = client.create_index(
             index_name=index_name,
-            mappings=ES_INDEX_MAPPINGS
+            mappings=ES_INDEX_MAPPINGS,
+            settings=ES_INDEX_SETTINGS
         )
+        logger.info(f"✅ 创建 ES 索引: {index_name} (响应: {res})")
     
-    def _create_vector_db_table(self, dimension_id: int, dimension_name: str):
+    def _create_vector_db_table(self, dimension: Dict):
         """
         为维度创建向量数据库表/索引
         
         根据配置自动选择 Infinity 或 Elasticsearch。
         """
         db_type = vector_db_settings.vector_db_type
-        table_name = self._get_table_name(dimension_id)
-        
+        # table_name = self._get_table_name(dimension_id)
+        table_name = self._get_table_name(dimension)
         try:
             if db_type == "infinity":
                 self._create_infinity_table(table_name)
-                logger.info(f"✅ 创建 Infinity 表: {table_name} (维度: {dimension_name})")
+                logger.info(f"✅ 创建 Infinity 表: {table_name} (维度: {dimension['name']})")
             elif db_type == "es":
                 self._create_es_index(table_name)
-                logger.info(f"✅ 创建 ES 索引: {table_name} (维度: {dimension_name})")
+                logger.info(f"✅ 创建 ES 索引: {table_name} (维度: {dimension['name']})")
         except Exception as e:
             logger.error(f"创建向量数据库表/索引失败: {str(e)}")
     
@@ -165,7 +174,7 @@ class PromptService:
         """
         添加维度
         
-        创建维度后会自动创建对应的向量数据库表/索引 (book_{dimension_id})。
+        创建维度后会自动创建对应的向量数据库表/索引 (book_{dataset_id})。
         
         Args:
             name: 维度名称
@@ -174,17 +183,36 @@ class PromptService:
         Returns:
             新建的维度信息
         """
+        # 1. 先创建 RAGFlow 数据集
+        from src.utils.ragflow.ragflow_service import RAGFlowService
+        from src.conf.rag_parser_config import RagParserDefaults
+        
+        ragflow_service = RAGFlowService(api_key="ragflow-XelVBvv8Uc6dZLNb1aBIKdbsupucEjESotOPTZZBrG4")
+        logger.info(f"开始创建 RAGFlow 数据集: {name}")
+        
+        dataset = ragflow_service.create_dataset(
+            name=name,
+            description=description or f"维度: {name}",
+            permission=RagParserDefaults.DATASET_PERMISSION,
+            chunk_method=RagParserDefaults.DATASET_CHUNK_METHOD,
+            parser_config=RagParserDefaults.DATASET_CONFIG_DICT
+        )
+        
+        dataset_id = dataset['id']
+        logger.info(f"RAGFlow 数据集创建成功,ID: {dataset_id}")
+        
+        # 2. 将维度信息存入 MySQL,同时保存 dataset_id
         sql = """
-            INSERT INTO prompt_dimensions (name, description)
-            VALUES (%s, %s)
+            INSERT INTO prompt_dimensions (name, description, dataset_id)
+            VALUES (%s, %s, %s)
         """
-        self._db.execute(sql, [name, description])
+        self._db.execute(sql, [name, description, dataset_id])
         
-        # 获取新建的维度
+        # 3. 获取新建的维度
         dimension = self.get_dimension_by_name(name)
-        
-        # 创建对应的向量数据库表/索引
-        self._create_vector_db_table(dimension['id'], name)
+
+        # 2. 创建对应的向量数据库表/索引(使用 dataset_id 作为维度ID)
+        self._create_vector_db_table(dimension)
         
         return dimension
     
@@ -203,6 +231,12 @@ class PromptService:
         sql = "SELECT * FROM prompt_dimensions ORDER BY created_at DESC"
         return self._db.fetch_all(sql)
     
+    def get_dataset_id_by_dimension_id(self, dimension_id: int) -> Optional[str]:
+        """根据维度ID获取对应的RAGFlow数据集ID"""
+        sql = "SELECT dataset_id FROM prompt_dimensions WHERE id = %s"
+        result = self._db.fetch_one(sql, [dimension_id])
+        return result['dataset_id'] if result else None
+
     def update_dimension(self, dimension_id: int, name: str = None, description: str = None) -> int:
         """更新维度信息"""
         updates = []
@@ -323,6 +357,25 @@ class PromptService:
         result = self._db.fetch_one(sql, [dimension_name])
         return result['content'] if result else None
     
+    def get_active_dimension_by_id(self, dimension_id: int) -> Optional[Dict[str, Any]]:
+        """
+        根据维度ID获取当前激活的提示词内容和数据集ID
+        
+        Args:
+            dimension_id: 维度ID
+            
+        Returns:
+            包含提示词内容和数据集ID的字典,若不存在则返回 None
+        """
+        sql = """
+            SELECT pv.content, pd.dataset_id 
+            FROM prompt_versions pv
+            JOIN prompt_dimensions pd ON pv.dimension_id = pd.id
+            WHERE pv.dimension_id = %s AND pv.is_active = 1
+        """
+        result = self._db.fetch_one(sql, [dimension_id])
+        return result if result else None
+    
     def get_active_prompt_by_id(self, dimension_id: int) -> Optional[str]:
         """
         根据维度ID获取当前激活的提示词内容
@@ -339,7 +392,7 @@ class PromptService:
         """
         result = self._db.fetch_one(sql, [dimension_id])
         return result['content'] if result else None
-    
+
     def set_active_version(self, dimension_id: int, version_number: int) -> int:
         """设置激活版本"""
         # 先取消当前激活版本
@@ -372,4 +425,4 @@ def get_prompt_service() -> PromptService:
     global _prompt_service
     if _prompt_service is None:
         _prompt_service = PromptService()
-    return _prompt_service
+    return _prompt_service

+ 30 - 38
src/api/sdk/dataset_manage.py

@@ -8,6 +8,7 @@
 - 数据集创建
 """
 
+import io
 import tempfile
 import os
 from fastapi import FastAPI, UploadFile, File, Form
@@ -122,11 +123,11 @@ async def pdf_parse_v2(
         if not file.filename.endswith((".pdf", ".PDF")):
             return Result.error(code=400, message="只支持 PDF 格式的文件")
         
-        # 保存文件到临时目录
+        # 读取文件内容到内存
         file_content = await file.read()
-        with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_file:
-            tmp_file.write(file_content)
-            pdf_path = tmp_file.name
+        original_filename = file.filename
+        pdf_content = io.BytesIO(file_content)
+        pdf_content.seek(0)
         
         logger.info(f"开始PDF解析V2: {file.filename}")
         
@@ -136,15 +137,15 @@ async def pdf_parse_v2(
                 from src.datasets.parser.workflows import PDFParsingWorkflowV2
                 workflow = PDFParsingWorkflowV2()
                 result = workflow.run(
-                    pdf_path=pdf_path,
+                    pdf_content=pdf_content,
+                    original_filename=original_filename,
                     page_dataset_id=page_dataset_id,
                     dataset_name=dataset_name
                 )
                 return result
             finally:
-                # 清理临时文件
-                if os.path.exists(pdf_path):
-                    os.unlink(pdf_path)
+                # 清理内存
+                pdf_content.close()
         
         # 提交到任务队列
         task_queue = get_task_queue()
@@ -192,11 +193,13 @@ async def qa_parse_v2(
         if not file.filename.endswith((".pdf", ".PDF")):
             return Result.error(code=400, message="只支持 PDF 格式的文件")
         
-        # 保存文件到临时目录
+        # 保存文件到临时目录,使用原始文件名
         file_content = await file.read()
-        with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_file:
+        original_filename = file.filename
+        temp_dir = tempfile.gettempdir()
+        pdf_path = os.path.join(temp_dir, original_filename)
+        with open(pdf_path, 'wb') as tmp_file:
             tmp_file.write(file_content)
-            pdf_path = tmp_file.name
         
         logger.info(f"开始QA解析V2: {file.filename}")
         
@@ -259,11 +262,13 @@ async def image_parse_v2(
         if not file.filename.endswith((".zip", ".ZIP")):
             return Result.error(code=400, message="只支持 ZIP 格式的压缩包")
         
-        # 保存文件到临时目录
+        # 保存文件到临时目录,使用原始文件名
         file_content = await file.read()
-        with tempfile.NamedTemporaryFile(delete=False, suffix=".zip") as tmp_file:
+        original_filename = file.filename
+        temp_dir = tempfile.gettempdir()
+        zip_path = os.path.join(temp_dir, original_filename)
+        with open(zip_path, 'wb') as tmp_file:
             tmp_file.write(file_content)
-            zip_path = tmp_file.name
         
         logger.info(f"开始图片解析V2: {file.filename}")
         
@@ -341,11 +346,8 @@ async def clear_completed_tasks():
 
 @app.post("/v2/dynamic-parse")
 async def dynamic_parse_v2(
-    file: UploadFile = File(..., description="图片压缩包(ZIP)"),
-    dimension_ids: str = Form(..., description="维度ID列表,逗号分隔,如: 1,2,3"),
-    book_name: str = Form(..., description="书名"),
-    dataset_id: str = Form(..., description="数据集ID"),
-    document_id: str = Form(default="", description="文档ID")
+    file: UploadFile = File(..., description="PDF文档"),
+    dimension_ids: str = Form(..., description="维度ID列表,逗号分隔,如: 1,2,3")
 ):
     """
     动态多维度解析接口 (V2工作流)
@@ -373,11 +375,13 @@ async def dynamic_parse_v2(
         if not dim_ids:
             return Result.error(code=400, message="维度ID列表不能为空")
         
-        # 保存文件到临时目录
+        # 保存文件到临时目录,使用原始文件名
         file_content = await file.read()
-        with tempfile.NamedTemporaryFile(delete=False, suffix=".zip") as tmp_file:
+        original_filename = file.filename
+        temp_dir = tempfile.gettempdir()
+        pdf_path = os.path.join(temp_dir, original_filename)
+        with open(pdf_path, 'wb') as tmp_file:
             tmp_file.write(file_content)
-            zip_path = tmp_file.name
         
         logger.info(f"开始动态多维度解析: {file.filename}, 维度: {dim_ids}")
         
@@ -385,30 +389,18 @@ async def dynamic_parse_v2(
         def run_dynamic_workflow():
             try:
                 from src.datasets.parser.workflows.dynamic_dimension_workflow import DynamicDimensionWorkflow
-                from src.utils.file.image_util import image_util
-                
-                # 解压图片并获取页面列表
-                image_urls = image_util.process_image_zip(zip_path, book_name)
-                image_pages = [
-                    {"page_number": i + 1, "image_url": url}
-                    for i, url in enumerate(image_urls)
-                ]
                 
                 # 执行工作流
                 workflow = DynamicDimensionWorkflow()
                 result = workflow.run(
-                    dimension_ids=dim_ids,
-                    image_pages=image_pages,
-                    split_pages=image_pages,
-                    document_id=document_id or f"{book_name}_{dataset_id}",
-                    dataset_id=dataset_id,
-                    pdf_path=""
+                    pdf_path=pdf_path,
+                    dimension_ids=dim_ids
                 )
                 return result
             finally:
                 # 清理临时文件
-                if os.path.exists(zip_path):
-                    os.unlink(zip_path)
+                if os.path.exists(pdf_path):
+                    os.unlink(pdf_path)
         
         # 提交到任务队列
         task_queue = get_task_queue()

+ 3 - 1
src/conf/settings.py

@@ -39,6 +39,8 @@ class RagflowSettings(BaseSettings):
     """RAGFLOW配置类"""
     ragflow_api_url: str = Field(default="http://192.168.16.134/", alias="RAGFLOW_API_URL")
     ragflow_api_key: str = Field(default="", alias="RAGFLOW_API_KEY")
+    ragflow_dataset_prefix: str = Field(default="ragflow_", alias="RAGFLOW_DATASET_PREFIX")
+    custom_dataset_prefix: str = Field(default="ragbook_", alias="CUSTOM_DATASET_PREFIX")
     dataset_id: str = Field(default="", alias="DATASET_ID")
     ragflow_user_name: str = Field(default="", alias="RAGFLOW_USER_NAME")
     ragflow_passwd: str = Field(default="", alias="RAGFLOW_PASSWD")
@@ -78,7 +80,7 @@ class VectorDBSettings(BaseSettings):
     infinity_page_dataset_id: str = Field(default="", alias="INFINITY_PAGE_DATASET_ID")
     infinity_page_table_name: str = Field(default="", alias="INFINITY_PAGE_TABLE_NAME")
     infinity_ragflow_database: str = Field(default="default_db", alias="INFINITY_RAGFLOW_DATABASE")
-    infinity_dataset_prefix: str = Field(default="ragbook_", alias="INFINITY_DATASET_PREFIX")
+    infinity_dataset_prefix: str = Field(default="ragbook", alias="INFINITY_DATASET_PREFIX")
     
     model_config = SettingsConfigDict(
         env_file=".env",

+ 6 - 0
src/datasets/parser/nodes/__init__.py

@@ -18,6 +18,9 @@ from src.datasets.parser.nodes.qa_generate_node import QAGenerateNode
 from src.datasets.parser.nodes.complete_node import CompleteNode
 from src.datasets.parser.nodes.pdf_ocr_node import PDFOCRNode
 from src.datasets.parser.nodes.export_csv_node import ExportCSVNode
+from src.datasets.parser.nodes.prompt_retrieval_node import PromptRetrievalNode
+from src.datasets.parser.nodes.table_name_generation_node import TableNameGenerationNode
+from src.datasets.parser.nodes.dimension_result_node import DimensionResultNode
 
 __all__ = [
     "PDFSplitNode",
@@ -32,4 +35,7 @@ __all__ = [
     "CompleteNode",
     "PDFOCRNode",
     "ExportCSVNode",
+    "PromptRetrievalNode",
+    "TableNameGenerationNode",
+    "DimensionResultNode",
 ]

+ 88 - 0
src/datasets/parser/nodes/dimension_result_node.py

@@ -0,0 +1,88 @@
+"""
+维度结果记录节点
+"""
+
+from typing import Dict, Any
+from src.datasets.parser.core.base import BaseNode
+from src.common.logging_config import get_logger
+
+
+logger = get_logger(__name__)
+
+
+class DimensionResultNode(BaseNode):
+    """
+    维度结果记录节点
+    
+    记录维度处理结果到状态中。
+    """
+    
+    def __init__(self, dimension_id: int):
+        """
+        初始化维度结果记录节点
+        
+        Args:
+            dimension_id: 维度ID
+        """
+        self.dimension_id = dimension_id
+    
+    @property
+    def name(self) -> str:
+        """节点名称"""
+        return f"dimension_result_dim_{self.dimension_id}"
+    
+    def execute(self, state: Any) -> Dict[str, Any]:
+        """
+        执行维度结果记录
+        
+        Args:
+            state: 包含状态信息的对象
+            
+        Returns:
+            包含更新后维度结果的字典
+        """
+        logger.info(f"[Result-{self.dimension_id}] 开始记录维度结果")
+        
+        # 获取现有维度结果
+        dim_results = dict(getattr(state, 'dimension_results', {}) or {})
+        
+        # 获取处理过程中的信息
+        prompt_error = getattr(state, 'prompt_error', None)
+        table_error = getattr(state, 'table_error', None)
+        parsed_results = getattr(state, 'parsed_results', [])
+        vectorized_pages = getattr(state, 'vectorized_pages', 0)
+        table_name = getattr(state, 'table_name', None)
+        
+        # 构建维度结果
+        dimension_result = {
+            "dimension_id": self.dimension_id,
+            "success": not (prompt_error or table_error),
+            "skipped": bool(prompt_error or table_error),
+            "reason": prompt_error or table_error or None
+        }
+        
+        # 如果成功,添加详细信息
+        if not dimension_result["skipped"]:
+            dimension_result.update({
+                "table_name": table_name,
+                "parsed_pages": len(parsed_results),
+                "vectorized_pages": vectorized_pages
+            })
+        
+        # 更新维度结果字典
+        dim_results[self.dimension_id] = dimension_result
+        
+        # 计算总向量化页面数
+        total_vectorized = sum(
+            result.get('vectorized_pages', 0) 
+            for result in dim_results.values() 
+            if not result.get('skipped')
+        )
+        
+        logger.info(f"[Result-{self.dimension_id}] 维度结果记录完成,成功: {dimension_result['success']}")
+        
+        return {
+            "dimension_results": dim_results,
+            "total_vectorized_pages": total_vectorized,
+            "dimension_id": self.dimension_id
+        }

+ 19 - 11
src/datasets/parser/nodes/export_csv_node.py

@@ -1,8 +1,10 @@
 import tempfile
 import csv
+import os
 from typing import Dict, Any
 from src.datasets.parser.core.base import BaseNode
 from src.common.logging_config import get_logger
+from src.utils.file.file_utils import generate_unique_filename
 
 logger = get_logger(__name__)
 
@@ -26,18 +28,24 @@ class ExportCSVNode(BaseNode):
         
         logger.info(f"开始导出 {len(qa_pairs)} 个QA对到CSV")
         
+        # 从状态中获取原始文件名
+        pdf_path = getattr(state, 'pdf_path', '')
+        if pdf_path:
+            # 提取文件名(不含扩展名)
+            original_filename = os.path.splitext(os.path.basename(pdf_path))[0]
+            csv_filename = f"{original_filename}.csv"
+        else:
+            csv_filename = f"{generate_unique_filename()}.csv"
+        
         # 创建临时CSV文件
-        with tempfile.NamedTemporaryFile(
-            mode='w',
-            suffix='.csv',
-            delete=False,
-            encoding='utf-8',
-            newline=''
-        ) as f:
-            writer = csv.DictWriter(f, fieldnames=['question', 'answer'])
-            writer.writeheader()
-            writer.writerows(qa_pairs)
-            csv_path = f.name
+        temp_dir = tempfile.gettempdir()
+        csv_path = os.path.join(temp_dir, csv_filename)
+        
+        with open(csv_path, 'w', encoding='utf-8', newline='') as f:
+            writer = csv.writer(f)
+            # 不写入表头,直接写入数据
+            for qa in qa_pairs:
+                writer.writerow([qa.get('question', ''), qa.get('answer', '')])
         
         logger.info(f"CSV导出完成: {csv_path}")
         return {"csv_path": csv_path}

+ 15 - 7
src/datasets/parser/nodes/image_parse_node.py

@@ -12,6 +12,7 @@ from src.model.qwen_vl import QWenVLParser
 from src.conf.settings import model_settings
 from src.common.logging_config import get_logger
 from src.utils.async_utils import ThreadPoolManager
+from src.utils.markdown_utils import parse_markdown_json
 
 logger = get_logger(__name__)
 
@@ -34,8 +35,7 @@ class ImageParseNode(BaseNode):
     def __init__(
         self,
         model_name: Optional[str] = None,
-        max_workers: int = 5,
-        prompt_template: Optional[str] = None
+        max_workers: int = 5
     ):
         """
         初始化图像解析节点
@@ -47,7 +47,6 @@ class ImageParseNode(BaseNode):
         """
         self.model_name = model_name or model_settings.vl_model_name
         self.max_workers = max_workers  # 保留兼容性但不再使用
-        self.prompt_template = prompt_template or self._default_prompt_template()
     
     @property
     def name(self) -> str:
@@ -68,7 +67,7 @@ class ImageParseNode(BaseNode):
             }}
         """
     
-    def _parse_single_page(self, page: Dict[str, Any]) -> Dict[str, Any]:
+    def _parse_single_page(self, page: Dict[str, Any], prompt_template: str) -> Dict[str, Any]:
         """
         解析单个页面
         
@@ -81,13 +80,22 @@ class ImageParseNode(BaseNode):
         page_number = page.get("page_number", 0)
         image = page.get("image")
         
-        prompt = self.prompt_template.format(page_number=page_number)
+        prompt = prompt_template.format(page_number=page_number)
         
         logger.debug(f"开始解析第 {page_number} 页")
         
         try:
             parser = QWenVLParser(self.model_name)
             result = parser.parse_image(image, page_number, prompt)
+            
+            # 处理markdown格式的JSON标签
+            # parsed_content = parse_markdown_json(result)
+            # if parsed_content:
+            #     result = parsed_content
+            # else:
+            #     # 如果解析失败,保留原始内容
+            #     logger.warning(f"解析JSON内容失败,保留原始内容")
+            
             logger.debug(f"第 {page_number} 页解析完成")
             return result
         except Exception as e:
@@ -110,7 +118,7 @@ class ImageParseNode(BaseNode):
         """
         # 支持多种状态字段名称
         pages = getattr(state, 'split_pages', None) or getattr(state, 'image_pages', [])
-        
+        prompt_template = state.dimension_prompt or self._default_prompt_template()
         if not pages:
             logger.warning("没有待解析的页面")
             return {"parsed_results": [], "processed_pages": 0}
@@ -122,7 +130,7 @@ class ImageParseNode(BaseNode):
         # 使用全局线程池
         pool = ThreadPoolManager.get_pool("parser")
         future_to_page = {
-            pool.submit(self._parse_single_page, page): page
+            pool.submit(self._parse_single_page, page, prompt_template): page
             for page in pages
         }
         

+ 1 - 1
src/datasets/parser/nodes/pdf_ocr_node.py

@@ -68,7 +68,7 @@ class PDFOCRNode(BaseNode):
         
         # 拆分PDF为图片
         splitter = PDFSplitter()
-        pages = splitter.split_pdf(pdf_path=pdf_path, is_upload=False)
+        pages = splitter.split_pdf(pdf_path=pdf_path, dataset_id=state.dataset_id, is_upload=False)
         
         if not pages:
             logger.warning("PDF拆分后没有页面")

+ 15 - 5
src/datasets/parser/nodes/pdf_split_node.py

@@ -36,7 +36,7 @@ class PDFSplitNode(BaseNode):
         执行PDF拆分
         
         Args:
-            state: 包含pdf_path的状态
+            state: 包含pdf_path或pdf_content的状态
             
         Returns:
             包含split_pages的更新字典
@@ -44,14 +44,24 @@ class PDFSplitNode(BaseNode):
         from src.datasets.parser.pdf_parser.pdf_splitter import PDFSplitter
         
         pdf_path = getattr(state, 'pdf_path', None)
-        if not pdf_path:
-            raise ValueError("State must contain 'pdf_path' field")
+        pdf_content = getattr(state, 'pdf_content', None)
+        original_filename = getattr(state, 'original_filename', None)
         
-        logger.info(f"开始拆分PDF: {pdf_path}")
+        if not pdf_path and not pdf_content:
+            raise ValueError("State must contain either 'pdf_path' or 'pdf_content' field")
+        
+        if pdf_content:
+            logger.info(f"开始拆分PDF: {original_filename or '内存中的PDF'}")
+        else:
+            logger.info(f"开始拆分PDF: {pdf_path}")
         
         # 拆分PDF
         splitter = PDFSplitter()
-        split_pages = splitter.split_pdf(pdf_path)
+        split_pages = splitter.split_pdf(
+            pdf_path=pdf_path,
+            pdf_content=pdf_content,
+            original_filename=original_filename
+        )
         
         logger.info(f"PDF拆分完成,共 {len(split_pages)} 页")
         

+ 73 - 0
src/datasets/parser/nodes/prompt_retrieval_node.py

@@ -0,0 +1,73 @@
+"""
+提示词获取节点
+"""
+
+from typing import Dict, Any, Optional
+from unittest import result
+from src.datasets.parser.core.base import BaseNode
+from src.api.db.services.prompt_service import get_prompt_service
+from src.common.logging_config import get_logger
+
+
+logger = get_logger(__name__)
+
+
+class PromptRetrievalNode(BaseNode):
+    """
+    提示词获取节点
+    
+    从数据库获取指定维度的激活提示词。
+    """
+    
+    def __init__(self, dimension_id: int):
+        """
+        初始化提示词获取节点
+        
+        Args:
+            dimension_id: 维度ID
+        """
+        self.dimension_id = dimension_id
+        self._prompt_service = None
+    
+    @property
+    def name(self) -> str:
+        """节点名称"""
+        return f"prompt_retrieval_dim_{self.dimension_id}"
+    
+    @property
+    def prompt_service(self):
+        """懒加载提示词服务"""
+        if self._prompt_service is None:
+            self._prompt_service = get_prompt_service()
+        return self._prompt_service
+    
+    def execute(self, state: Any) -> Dict[str, Any]:
+        """
+        执行提示词获取
+        
+        Args:
+            state: 包含状态信息的对象
+            
+        Returns:
+            包含提示词信息的字典
+        """
+        logger.info(f"[Prompt-{self.dimension_id}] 开始获取提示词")
+        
+        # 获取提示词
+        result = self.prompt_service.get_active_dimension_by_id(self.dimension_id)
+        
+        if not result.get('content'):
+            logger.warning(f"[Prompt-{self.dimension_id}] 没有激活的提示词")
+            return {
+                "dimension_prompt": None,
+                "prompt_error": "no_active_prompt",
+                "dimension_id": self.dimension_id
+            }
+        
+        logger.info(f"[Prompt-{self.dimension_id}] 提示词获取成功")
+        
+        return {
+            "dimension_prompt": result.get('content'),
+            "dataset_id": result.get('dataset_id'),
+            "dimension_id": self.dimension_id
+        }

+ 1 - 1
src/datasets/parser/nodes/qa_generate_node.py

@@ -75,7 +75,7 @@ class QAGenerateNode(BaseNode):
             2. 答案应该准确、完整,直接来源于文本
             3. 问题应该自然,像真实用户会问的问题
             4. 避免过于简单或过于复杂的问题
-            5. 如果json不完整,则去除不完整的元素,只返回完整的json数组
+            5. 校验必须以完整的JSON数组格式输出
 
             文本内容:
             {chunk}

+ 18 - 17
src/datasets/parser/nodes/ragflow_nodes.py

@@ -116,14 +116,13 @@ class RAGFlowDocumentUploadNode(BaseNode):
         - document_id: 文档ID
     """
     
-    def __init__(self, target_field: str = "document_id"):
+    def __init__(self):
         """
         初始化文档上传节点
         
         Args:
             target_field: 存储文档ID的目标字段名
         """
-        self.target_field = target_field
         self.ragflow_service = RAGFlowService()
     
     @property
@@ -141,12 +140,13 @@ class RAGFlowDocumentUploadNode(BaseNode):
             包含document_id的更新字典
         """
         dataset_id = getattr(state, 'dataset_id', '')
-        file_path = getattr(state, 'pdf_path', '') or getattr(state, 'csv_path', '')
+        # file_path = getattr(state, 'pdf_path', '') or getattr(state, 'csv_path', '')
+        file_path = getattr(state, 'csv_path', '') or getattr(state, 'pdf_path', '')
         
         if not dataset_id:
             raise ValueError("State must contain 'dataset_id' field")
         if not file_path:
-            raise ValueError("State must contain 'pdf_path' or 'csv_path' field")
+            raise ValueError("State must contain 'upload_file' or 'pdf_path' field")
         
         logger.info(f"开始上传文档到数据集 {dataset_id}: {file_path}")
         
@@ -158,7 +158,7 @@ class RAGFlowDocumentUploadNode(BaseNode):
         if document_info_list and len(document_info_list) > 0:
             document_id = document_info_list[0]["id"]
             logger.info(f"文档上传成功,ID: {document_id}")
-            return {self.target_field: document_id}
+            return {"document_id": document_id}
         
         raise Exception("文档上传失败: 未返回有效的文档信息")
 
@@ -246,8 +246,8 @@ class RAGFlowChunkNode(BaseNode):
         from src.conf.settings import vector_db_settings
         import os
         
-        page_dataset_id = getattr(state, 'page_dataset_id', '')
-        page_document_id = getattr(state, 'page_document_id', '')
+        page_dataset_id = getattr(state, 'dataset_id', '')
+        page_document_id = getattr(state, 'document_id', '')
         parsed_results = getattr(state, 'parsed_results', [])
         split_pages = getattr(state, 'split_pages', [])
         
@@ -258,7 +258,7 @@ class RAGFlowChunkNode(BaseNode):
             text = parsed_result.get("content", "")
             image_path = split_pages[i].get("image_path", "") if i < len(split_pages) else ""
             
-            img_id = f"{vector_db_settings.infinity_page_dataset_id}-{os.path.basename(image_path).split('.')[0]}.png" if image_path else ""
+            # img_id = f"{page_dataset_id}-{os.path.basename(image_path).split('.')[0]}.png" if image_path else ""
             
             chunk = self.ragflow_service.create_chunk(
                 dataset_id=page_dataset_id,
@@ -266,17 +266,18 @@ class RAGFlowChunkNode(BaseNode):
                 content=text
             )
             chunk_id = chunk["chunk"]["id"]
+            parsed_result["chunk_id"] = chunk_id
             logger.debug(f"创建第 {page_number} 页Chunk,ID: {chunk_id}")
             
-            # 记录到定时任务表
-            if img_id:
-                get_chunk_record_service().record_chunk_add(
-                    database_name=vector_db_settings.infinity_ragflow_database,
-                    table_name=vector_db_settings.infinity_page_table_name,
-                    chunk_id=chunk_id,
-                    cond=f"id = '{chunk_id}'",
-                    data={"img_id": img_id}
-                )
+            # # 记录到定时任务表
+            # if img_id:
+            #     get_chunk_record_service().record_chunk_add(
+            #         database_name=vector_db_settings.infinity_ragflow_database,
+            #         table_name=vector_db_settings.infinity_page_table_name,
+            #         chunk_id=chunk_id,
+            #         cond=f"id = '{chunk_id}'",
+            #         data={"img_id": img_id}
+            #     )
         
         logger.info(f"Chunks创建完成")
         return {}

+ 77 - 0
src/datasets/parser/nodes/table_name_generation_node.py

@@ -0,0 +1,77 @@
+"""
+表名生成节点
+"""
+
+from typing import Dict, Any
+from src.datasets.parser.core.base import BaseNode
+# from src.api.db.services.prompt_service import get_prompt_service
+from src.common.logging_config import get_logger
+from src.conf.settings import ragflow_settings
+
+
+logger = get_logger(__name__)
+
+
+class TableNameGenerationNode(BaseNode):
+    """
+    表名生成节点
+    
+    根据维度 ID 和数据集 ID 生成向量表名。
+    """
+    
+    def __init__(self, dimension_id: int):
+        """
+        初始化表名生成节点
+        
+        Args:
+            dimension_id: 维度ID
+        """
+        self.dimension_id = dimension_id
+        # self._prompt_service = None
+    
+    @property
+    def name(self) -> str:
+        """节点名称"""
+        return f"table_name_generation_dim_{self.dimension_id}"
+    
+    # @property
+    # def prompt_service(self):
+    #     """懒加载提示词服务"""
+    #     if self._prompt_service is None:
+    #         self._prompt_service = get_prompt_service()
+    #     return self._prompt_service
+    
+    def execute(self, state: Any) -> Dict[str, Any]:
+        """
+        执行表名生成
+        
+        Args:
+            state: 包含状态信息的对象
+            
+        Returns:
+            包含表名信息的字典
+        """
+        logger.info(f"[Table-{self.dimension_id}] 开始生成表名")
+        
+        # 查询 dimension_id 对应的 dataset_id
+        # dataset_id = self.prompt_service.get_dataset_id_by_dimension_id(self.dimension_id)
+        
+        if not state.dataset_id:
+            logger.warning(f"[Table-{self.dimension_id}] 未找到维度对应的数据集ID")
+            return {
+                "table_name": None,
+                "table_error": "no_dataset_id",
+                "dimension_id": self.dimension_id
+            }
+        
+        # 生成表名
+        # table_name = f"{vector_db_settings.infinity_dataset_prefix}{self.dimension_id}_{state.dataset_id}"
+        table_name = f"{ragflow_settings.custom_dataset_prefix}_{self.dimension_id}_{state.dataset_id}"
+        
+        logger.info(f"[Table-{self.dimension_id}] 表名生成成功: {table_name}")
+        
+        return {
+            "table_name": table_name,
+            # "dataset_id": dataset_id,
+            "dimension_id": self.dimension_id
+        }

+ 7 - 5
src/datasets/parser/nodes/vectorize_node.py

@@ -37,7 +37,6 @@ class VectorizeNode(BaseNode):
     
     def __init__(
         self,
-        table_name: Optional[str] = None,
         database_name: Optional[str] = None,
         embedding_model_name: Optional[str] = None
     ):
@@ -49,7 +48,6 @@ class VectorizeNode(BaseNode):
             database_name: Infinity数据库名
             embedding_model_name: 嵌入模型名称
         """
-        self.table_name = table_name or vector_db_settings.infinity_table_name
         self.database_name = database_name or vector_db_settings.infinity_database
         self.embedding_model_name = embedding_model_name or model_settings.multimodal_embedding_model_name
         self._embedding_model = None
@@ -104,6 +102,7 @@ class VectorizeNode(BaseNode):
                 text = parsed_result.get("content", "")
                 image = split_pages[i].get("image") if i < len(split_pages) else None
                 image_path = split_pages[i].get("image_path", "") if i < len(split_pages) else ""
+                chunk_id = parsed_result.get("chunk_id", "")
                 
                 # 获取多模态嵌入向量
                 logger.debug(f"正在生成第 {page_number} 页的多模态嵌入...")
@@ -120,13 +119,16 @@ class VectorizeNode(BaseNode):
                 document = {
                     "id": f"{document_id}_{page_number}",
                     "file_name": file_name,
-                    "file_page_count": file_page_count,
                     "page_number": page_number,
                     "content": text,
                     "image_path": image_path,
                     "dense_vector_1024": dense_vector_1024,
                     "dataset_id": dataset_id,
-                    "document_id": document_id
+                    "document_id": document_id,
+                    "chunk_id": chunk_id,
+                    "metadata": {
+                        "file_page_count": file_page_count,
+                    }
                 }
                 
                 documents_to_store.append(document)
@@ -138,7 +140,7 @@ class VectorizeNode(BaseNode):
         if documents_to_store:
             logger.info(f"开始入库,共 {len(documents_to_store)} 个文档")
             result = get_client().insert(
-                table_name=self.table_name,
+                table_name=state.table_name,
                 documents=documents_to_store,
                 database_name=self.database_name
             )

+ 104 - 51
src/datasets/parser/pdf_parser/pdf_splitter.py

@@ -2,20 +2,84 @@ import fitz
 from PIL import Image
 import io
 import os
-from typing import List, Dict, Tuple
-from src.conf.settings import vector_db_settings
+import concurrent.futures
+from typing import List, Dict
 from utils.file.minio.minio_util import MinIOUtil
+from src.utils.async_utils import ThreadPoolManager
 
 class PDFSplitter:
     """PDF扫描件按页拆分工具"""
     
     @staticmethod
-    def split_pdf(pdf_path: str, is_upload: bool = True) -> List[Dict[str, any]]:
+    def _process_page(page_num: int, pdf_document: fitz.Document, pdf_filename: str, dataset_id: int, is_upload: bool) -> Dict[str, any]:
+        """
+        处理单个PDF页面
+        
+        Args:
+            page_num: 页码索引
+            pdf_document: PDF文档对象
+            pdf_filename: PDF文件名(不含扩展名)
+            dataset_id: 数据集ID
+            is_upload: 是否上传到MinIO
+            
+        Returns:
+            Dict: 包含页面信息的字典
+        """
+        # 获取页面
+        page = pdf_document[page_num]
+        # 页码从1开始
+        page_number = page_num + 1
+        
+        # 将页面转换为图像
+        # 使用较高分辨率,DPI=300
+        pix = page.get_pixmap(dpi=300)
+        
+        # 将fitz pixmap转换为PIL图像
+        image = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
+        
+        if is_upload:
+            # 初始化MinioUtil
+            minio_util = MinIOUtil()
+            # 将图像转换为字节流,便于后续处理
+            image_bytes = io.BytesIO()
+            image.save(image_bytes, format='PNG')
+            image_bytes.seek(0)
+            
+            # 生成图片文件名
+            image_filename = f"{pdf_filename}_{page_number}.png"
+            
+            # 重置字节流指针
+            image_bytes.seek(0)
+            
+            # 上传图片到MinIO,获取URL
+            bucket_name = str(dataset_id) if dataset_id else "bookpage"
+            image_url = minio_util.custom_upload_file(file=image_bytes, original_filename=image_filename, bucket_name=bucket_name)
+        
+            return {
+                "page_number": page_number,
+                "image": image,
+                "image_bytes": image_bytes,
+                "image_path": image_url
+            }
+        else:
+            return {
+                "page_number": page_number,
+                "image": image,
+                "image_bytes": None,
+                "image_path": None
+            }
+
+    @staticmethod
+    def split_pdf(pdf_path: str, dataset_id: int = None, is_upload: bool = True, pdf_content: io.BytesIO = None, original_filename: str = None) -> List[Dict[str, any]]:
         """
         将PDF按页拆分,转换为图像并记录页码,同时保存图片到MinIO
         
         Args:
             pdf_path: PDF文件路径
+            dataset_id: 数据集ID
+            is_upload: 是否上传到MinIO
+            pdf_content: PDF文件内容(字节流),如果提供则优先使用
+            original_filename: 原始文件名,如果提供则优先使用
             
         Returns:
             List[Dict]: 包含每一页信息的列表,每个字典包含:
@@ -25,59 +89,47 @@ class PDFSplitter:
                 - image_path: MinIO中保存的图片URL
         """
 
-        
+
         pdf_document = None
         try:            
             # 打开PDF文件
-            pdf_document = fitz.open(pdf_path)
-            
-            # 获取PDF文件名(不含扩展名)
-            pdf_filename = os.path.splitext(os.path.basename(pdf_path))[0]
+            if pdf_content:
+                # 使用内存中的PDF内容
+                pdf_document = fitz.open(stream=pdf_content, filetype="pdf")
+                # 使用提供的原始文件名
+                if original_filename:
+                    pdf_filename = os.path.splitext(os.path.basename(original_filename))[0]
+                else:
+                    pdf_filename = "temp_pdf"
+            else:
+                # 使用文件路径
+                pdf_document = fitz.open(pdf_path)
+                # 获取PDF文件名(不含扩展名)
+                pdf_filename = os.path.splitext(os.path.basename(pdf_path))[0]
             
+            # 使用全局线程池管理器进行并行处理
             result = []
-            for page_num in range(len(pdf_document)):
-                # 获取页面
-                page = pdf_document[page_num]
-                # 页码从1开始
-                page_number = page_num + 1
-                
-                # 将页面转换为图像
-                # 使用较高分辨率,DPI=300
-                pix = page.get_pixmap(dpi=300)
-                
-                # 将fitz pixmap转换为PIL图像
-                image = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
-                
-                if is_upload:
-                    # 初始化MinioUtil
-                    minio_util = MinIOUtil()
-                    # 将图像转换为字节流,便于后续处理
-                    image_bytes = io.BytesIO()
-                    image.save(image_bytes, format='PNG')
-                    image_bytes.seek(0)
-                    
-                    # 生成图片文件名
-                    image_filename = f"{pdf_filename}_{page_number}.png"
-                    
-                    # 重置字节流指针
-                    image_bytes.seek(0)
-                    
-                    # 上传图片到MinIO,获取URL
-                    image_url = minio_util.custom_upload_file(file=image_bytes, original_filename=image_filename, bucket_name=vector_db_settings.infinity_page_dataset_id)
-                
-                    result.append({
-                        "page_number": page_number,
-                        "image": image,
-                        "image_bytes": image_bytes,
-                        "image_path": image_url
-                    })
-                else:
-                    result.append({
-                        "page_number": page_number,
-                        "image": image,
-                        "image_bytes": None,
-                        "image_path": None
-                    })
+            # 提交所有页面处理任务
+            future_to_page = {
+                ThreadPoolManager.submit(
+                    "parser",
+                    PDFSplitter._process_page,
+                    page_num,
+                    pdf_document,
+                    pdf_filename,
+                    dataset_id,
+                    is_upload
+                ): page_num
+                for page_num in range(len(pdf_document))
+            }
+            
+            # 收集处理结果
+            for future in concurrent.futures.as_completed(future_to_page):
+                try:
+                    page_result = future.result()
+                    result.append(page_result)
+                except Exception as e:
+                    raise Exception(f"处理页面失败: {str(e)}")
             
             # 将result根据page_number排序
             result.sort(key=lambda x: x["page_number"])
@@ -85,6 +137,7 @@ class PDFSplitter:
         except Exception as e:
             raise Exception(f"PDF拆分失败: {str(e)}")
         finally:
+            ThreadPoolManager.shutdown_all()
             # 确保PDF文件总是被关闭
             if pdf_document is not None:
                 try:

+ 32 - 1
src/datasets/parser/states/parser_states.py

@@ -4,6 +4,7 @@
 定义各类解析工作流使用的状态类。
 """
 
+import io
 from typing import List, Dict, Any, Optional
 from pydantic import Field, ConfigDict
 from src.datasets.parser.core.base import BaseState
@@ -18,7 +19,9 @@ class PDFParsingState(BaseState):
     model_config = ConfigDict(arbitrary_types_allowed=True)
     
     # 输入参数
-    pdf_path: str = Field(..., description="PDF文件路径")
+    pdf_path: str = Field(default="", description="PDF文件路径")
+    pdf_content: Optional[io.BytesIO] = Field(default=None, description="PDF文件内容(字节流)")
+    original_filename: str = Field(default="", description="原始文件名")
     dataset_name: str = Field(..., description="数据集名称")
     page_dataset_id: str = Field(..., description="页面数据集ID")
     
@@ -104,3 +107,31 @@ class VectorizationMixin(BaseState):
     """
     vectorized_results: List[Dict[str, Any]] = Field(default_factory=list, description="向量化结果列表")
     vectorized_count: int = Field(default=0, description="已向量化数量")
+
+
+class DynamicDimensionState(BaseState):
+    """
+    动态维度解析状态
+    
+    用于动态多维度解析工作流,支持多个维度的并行处理。
+    """
+    # 输入参数
+    pdf_path: str = Field(default="", description="PDF文件路径")
+    dimension_ids: List[int] = Field(default_factory=list, description="维度ID列表")
+    dataset_id: str = Field(default="", description="数据集ID")
+    dimension_prompt: str = Field(default="", description="维度提示词")
+    # dataset_name: str = Field(default="", description="数据集名称")
+    document_id: str = Field(default="", description="文档ID")
+    
+    # # RAGFlow 相关
+    # ragflow_api_url: str = Field(default="", description="RAGFlow API URL")
+    # rag_flow_api_key: str = Field(default="", description="RAGFlow API密钥")
+    
+    # 中间状态
+    split_pages: List[Dict[str, Any]] = Field(default_factory=list, description="拆分后的页面列表")
+    parsed_results: List[Dict[str, Any]] = Field(default_factory=list, description="解析结果列表")
+    
+    # 输出 - 每个维度的结果
+    dimension_results: Dict[int, Dict[str, Any]] = Field(default_factory=dict, description="每个维度的解析结果")
+    total_vectorized_pages: int = Field(default=0, description="总向量化页面数")
+    is_complete: bool = Field(default=False, description="是否完成")

+ 11 - 0
src/datasets/parser/workflow_nodes/__init__.py

@@ -0,0 +1,11 @@
+"""
+工作流节点模块
+
+提供工作流特定的节点组件,如维度技能节点等。
+"""
+
+from src.datasets.parser.workflow_nodes.dimension_skill_node import DimensionSkillNode
+
+__all__ = [
+    "DimensionSkillNode",
+]

+ 138 - 0
src/datasets/parser/workflow_nodes/dimension_skill_node.py

@@ -0,0 +1,138 @@
+"""
+维度技能节点
+"""
+
+from typing import Dict, Any
+from src.datasets.parser.core.base import BaseNode
+from src.datasets.parser.core.workflow_builder import WorkflowBuilder
+from src.datasets.parser.nodes import (
+    ImageParseNode, 
+    VectorizeNode, 
+    PromptRetrievalNode, 
+    TableNameGenerationNode, 
+    DimensionResultNode,
+    RAGFlowDocumentUploadNode,
+    RAGFlowChunkNode
+)
+from src.datasets.parser.states.parser_states import DynamicDimensionState
+from src.common.logging_config import get_logger
+
+logger = get_logger(__name__)
+
+
+class DimensionSkillNode(BaseNode):
+    """
+    维度技能节点
+    
+    单个维度的处理节点,作为子工作流的构建器和执行器,包含:
+    1. 获取维度提示词
+    2. 生成向量表名
+    3. 使用提示词解析图片
+    4. 向量化入库
+    5. 记录维度结果
+    
+    每个维度都是独立的 LangGraph 节点,可被 Langfuse 追踪。
+    """
+    
+    def __init__(
+        self,
+        dimension_id: int,
+        model_name: str = "Qwen/Qwen3-VL-8B-Instruct",
+        max_workers: int = 5
+    ):
+        """
+        初始化维度技能节点
+        
+        Args:
+            dimension_id: 维度ID
+            model_name: VL模型名称
+            max_workers: 并行处理的最大工作线程数
+        """
+        self.dimension_id = dimension_id
+        self.model_name = model_name
+        self.max_workers = max_workers
+    
+    @property
+    def name(self) -> str:
+        """节点名称,格式: skill_dim_{id}"""
+        return f"skill_dim_{self.dimension_id}"
+    
+    def _build_sub_workflow(self):
+        """
+        构建子工作流
+        
+        Returns:
+            编译后的 LangGraph 工作流
+        """
+        logger.info(f"[Skill-{self.dimension_id}] 开始构建子工作流")
+
+
+        # 创建工作流构建器
+        builder = WorkflowBuilder(DynamicDimensionState)
+        
+        # 创建节点
+        prompt_node = PromptRetrievalNode(self.dimension_id)
+        document_upload_node = RAGFlowDocumentUploadNode()
+        table_name_node = TableNameGenerationNode(self.dimension_id)
+        parse_node = ImageParseNode(
+            model_name=self.model_name,
+            max_workers=self.max_workers
+        )
+        chunk_node = RAGFlowChunkNode()
+        vectorize_node = VectorizeNode()
+        result_node = DimensionResultNode(self.dimension_id)
+        
+        # 添加节点
+        builder.add_nodes(
+            prompt_node,
+            document_upload_node,
+            table_name_node,
+            parse_node,
+            chunk_node,
+            vectorize_node,
+            result_node
+        )
+        
+        # 设置边
+        builder.set_entry(prompt_node.name)
+        builder.add_edge(prompt_node.name, document_upload_node.name)
+        builder.add_edge(document_upload_node.name, table_name_node.name)
+        builder.add_edge(table_name_node.name, parse_node.name)
+        builder.add_edge(parse_node.name, chunk_node.name)
+        builder.add_edge(chunk_node.name, vectorize_node.name)
+        builder.add_edge(vectorize_node.name, result_node.name)
+        builder.set_finish(result_node.name)
+        
+        # 构建并返回工作流
+        workflow = builder.build()
+        logger.info(f"[Skill-{self.dimension_id}] 子工作流构建完成")
+        return workflow
+    
+    def execute(self, state: DynamicDimensionState) -> Dict[str, Any]:
+        """
+        执行维度技能
+        
+        构建并执行子工作流,包含以下步骤:
+        1. 获取提示词
+        2. 生成向量表名
+        3. 解析图片
+        4. 向量化入库
+        5. 记录维度结果
+        """
+        logger.info(f"[Skill-{self.dimension_id}] 开始执行维度技能")
+        
+        # 构建子工作流
+        workflow = self._build_sub_workflow()
+        
+        # 执行子工作流
+        result = workflow.invoke(state)
+        
+        # 处理结果
+        if isinstance(result, dict):
+            final_result = result
+        else:
+            final_result = result.dict() if hasattr(result, 'dict') else dict(result)
+        
+        logger.info(f"[Skill-{self.dimension_id}] 维度技能执行完成")
+        
+        return final_result

+ 9 - 172
src/datasets/parser/workflows/dynamic_dimension_workflow.py

@@ -9,168 +9,21 @@
           → pdf_split → skill_dim_1 → skill_dim_3 → skill_dim_5 → complete → END
 """
 
-from typing import Dict, Any, List, Optional
-from langgraph.graph import START, END
+from typing import Dict, Any, List
 from langfuse.langchain import CallbackHandler
 
 from src.datasets.parser.core.workflow_builder import WorkflowBuilder
-from src.datasets.parser.core.base import BaseNode, BaseState
-from src.datasets.parser.states.parser_states import PDFParsingState
+from src.datasets.parser.states.parser_states import DynamicDimensionState
 from src.datasets.parser.nodes import (
     PDFSplitNode,
-    ImageParseNode,
-    VectorizeNode,
-    CompleteNode,
-    RAGFlowDatasetNode,
-    RAGFlowDocumentUploadNode,
-    RAGFlowDocumentParseNode,
+    CompleteNode
 )
-from src.api.db.services.prompt_service import get_prompt_service
+from src.datasets.parser.workflow_nodes import DimensionSkillNode
 from src.common.logging_config import get_logger
 
 logger = get_logger(__name__)
 
 
-class DynamicDimensionState(BaseState):
-    """动态维度解析状态"""
-    # 输入参数
-    pdf_path: str = ""
-    dimension_ids: List[int] = []
-    dataset_id: str = ""
-    dataset_name: str = ""
-    document_id: str = ""
-    
-    # RAGFlow 相关
-    ragflow_api_url: str = ""
-    rag_flow_api_key: str = ""
-    
-    # 中间状态
-    split_pages: List[Dict[str, Any]] = []
-    parsed_results: List[Dict[str, Any]] = []
-    
-    # 输出 - 每个维度的结果
-    dimension_results: Dict[int, Dict[str, Any]] = {}
-    total_vectorized_pages: int = 0
-    is_complete: bool = False
-
-
-class DimensionSkillNode(BaseNode):
-    """
-    维度技能节点
-    
-    单个维度的处理节点,包含:
-    1. 获取维度提示词
-    2. 使用提示词解析图片
-    3. 向量化入库到 book_{dimension_id}
-    
-    每个维度都是独立的 LangGraph 节点,可被 Langfuse 追踪。
-    """
-    
-    def __init__(
-        self,
-        dimension_id: int,
-        model_name: str = "Qwen/Qwen3-VL-8B-Instruct",
-        max_workers: int = 5
-    ):
-        self.dimension_id = dimension_id
-        self.model_name = model_name
-        self.max_workers = max_workers
-        self._prompt_service = None
-        self._prompt = None  # 缓存提示词
-    
-    @property
-    def name(self) -> str:
-        """节点名称,格式: skill_dim_{id}"""
-        return f"skill_dim_{self.dimension_id}"
-    
-    @property
-    def prompt_service(self):
-        """懒加载提示词服务"""
-        if self._prompt_service is None:
-            self._prompt_service = get_prompt_service()
-        return self._prompt_service
-    
-    def _get_table_name(self) -> str:
-        """获取维度对应的表名"""
-        return f"book_{self.dimension_id}"
-    
-    def _get_prompt(self) -> Optional[str]:
-        """获取并缓存维度提示词"""
-        if self._prompt is None:
-            self._prompt = self.prompt_service.get_active_prompt_by_id(self.dimension_id)
-        return self._prompt
-    
-    def execute(self, state: DynamicDimensionState) -> Dict[str, Any]:
-        """
-        执行维度技能
-        
-        1. 获取提示词
-        2. 解析图片
-        3. 向量化入库
-        """
-        logger.info(f"[Skill-{self.dimension_id}] 开始执行维度技能")
-        
-        # 1. 获取提示词
-        prompt = self._get_prompt()
-        if not prompt:
-            logger.warning(f"[Skill-{self.dimension_id}] 没有激活的提示词,跳过")
-            # 更新状态中的维度结果
-            dim_results = dict(getattr(state, 'dimension_results', {}) or {})
-            dim_results[self.dimension_id] = {
-                "dimension_id": self.dimension_id,
-                "skipped": True,
-                "reason": "no_active_prompt"
-            }
-            return {"dimension_results": dim_results}
-        
-        table_name = self._get_table_name()
-        logger.info(f"[Skill-{self.dimension_id}] 表名: {table_name}")
-        
-        # 2. 创建并执行解析节点
-        parse_node = ImageParseNode(
-            model_name=self.model_name,
-            max_workers=self.max_workers,
-            prompt_template=prompt
-        )
-        parse_result = parse_node.execute(state)
-        parsed_results = parse_result.get("parsed_results", [])
-        
-        logger.info(f"[Skill-{self.dimension_id}] 解析完成,共 {len(parsed_results)} 页")
-        
-        # 3. 创建临时状态用于向量化
-        temp_state = DynamicDimensionState(
-            split_pages=getattr(state, 'split_pages', []),
-            parsed_results=parsed_results,
-            document_id=getattr(state, 'document_id', ''),
-            dataset_id=getattr(state, 'dataset_id', ''),
-            pdf_path=getattr(state, 'pdf_path', '')
-        )
-        
-        # 4. 执行向量化
-        vectorize_node = VectorizeNode(table_name=table_name)
-        vectorize_result = vectorize_node.execute(temp_state)
-        vectorized_pages = vectorize_result.get('vectorized_pages', 0)
-        
-        logger.info(f"[Skill-{self.dimension_id}] 入库完成,共 {vectorized_pages} 页到 {table_name}")
-        
-        # 5. 更新状态
-        dim_results = dict(getattr(state, 'dimension_results', {}) or {})
-        dim_results[self.dimension_id] = {
-            "dimension_id": self.dimension_id,
-            "table_name": table_name,
-            "parsed_pages": parse_result.get("processed_pages", 0),
-            "vectorized_pages": vectorized_pages,
-            "success": True
-        }
-        
-        total_vectorized = getattr(state, 'total_vectorized_pages', 0) + vectorized_pages
-        
-        return {
-            "dimension_results": dim_results,
-            "total_vectorized_pages": total_vectorized
-        }
-
-
 class DynamicDimensionWorkflow:
     """
     动态多维度解析工作流 (LangGraph 动态构建方案)
@@ -215,9 +68,6 @@ class DynamicDimensionWorkflow:
         logger.info(f"动态构建工作流,维度: {dimension_ids}")
         
         # 创建固定节点
-        dataset_node = RAGFlowDatasetNode(create_if_not_exists=True)
-        upload_node = RAGFlowDocumentUploadNode(target_field="document_id")
-        parse_doc_node = RAGFlowDocumentParseNode()
         split_node = PDFSplitNode()
         complete_node = CompleteNode(message_template="动态多维度解析完成")
         
@@ -226,18 +76,12 @@ class DynamicDimensionWorkflow:
         
         # 添加固定节点
         builder.add_nodes(
-            dataset_node,
-            upload_node,
-            parse_doc_node,
             split_node,
             complete_node
         )
         
-        # 定义固定边: START → ragflow_dataset → upload → parse → pdf_split
-        builder.set_entry("ragflow_dataset")
-        builder.add_edge("ragflow_dataset", "ragflow_document_upload")
-        builder.add_edge("ragflow_document_upload", "ragflow_document_parse")
-        builder.add_edge("ragflow_document_parse", "pdf_split")
+        # 定义固定边: START → pdf_split
+        builder.set_entry("pdf_split")
         
         # 动态添加维度技能节点
         prev_node = "pdf_split"
@@ -266,11 +110,6 @@ class DynamicDimensionWorkflow:
         self,
         pdf_path: str,
         dimension_ids: List[int],
-        dataset_id: str = "",
-        dataset_name: str = "",
-        document_id: str = "",
-        ragflow_api_url: str = "",
-        rag_flow_api_key: str = ""
     ) -> Dict[str, Any]:
         """
         运行动态多维度解析工作流
@@ -305,11 +144,9 @@ class DynamicDimensionWorkflow:
         initial_state = DynamicDimensionState(
             pdf_path=pdf_path,
             dimension_ids=dimension_ids,
-            dataset_id=dataset_id,
-            dataset_name=dataset_name or pdf_path.split("/")[-1].split("\\")[-1].replace(".pdf", ""),
-            document_id=document_id,
-            ragflow_api_url=ragflow_api_url,
-            rag_flow_api_key=rag_flow_api_key,
+            # dataset_name=dataset_name or pdf_path.split("/")[-1].split("\\")[-1].replace(".pdf", ""),
+            # ragflow_api_url=ragflow_api_url,
+            # rag_flow_api_key=rag_flow_api_key,
             dimension_results={},
             total_vectorized_pages=0
         )

+ 13 - 6
src/datasets/parser/workflows/pdf_workflow.py

@@ -3,8 +3,8 @@ PDF解析工作流V2
 
 使用组件化节点构建的PDF解析工作流。
 """
-
-from typing import Dict, Any
+import io
+from typing import Dict, Any, Optional
 from langgraph.graph import START, END
 from langfuse.langchain import CallbackHandler
 
@@ -61,8 +61,8 @@ class PDFParsingWorkflowV2:
         # 创建节点实例
         dataset_node = RAGFlowDatasetNode(create_if_not_exists=True)
         dataset_condition = DatasetExistsCondition()
-        upload_node = RAGFlowDocumentUploadNode(target_field="document_id")
-        page_upload_node = RAGFlowDocumentUploadNode(target_field="page_document_id")
+        upload_node = RAGFlowDocumentUploadNode()
+        page_upload_node = RAGFlowDocumentUploadNode()
         parse_doc_node = RAGFlowDocumentParseNode()
         split_node = PDFSplitNode()
         image_parse_node = ImageParseNode(model_name=self.model_name)
@@ -99,22 +99,29 @@ class PDFParsingWorkflowV2:
         
         return builder.build()
     
-    def run(self, pdf_path: str, page_dataset_id: str, dataset_name: str) -> Dict[str, Any]:
+    def run(self, pdf_path: str = "", pdf_content: Optional[io.BytesIO] = None, original_filename: str = "", page_dataset_id: str = "", dataset_name: str = "") -> Dict[str, Any]:
         """
         运行PDF解析工作流
         
         Args:
             pdf_path: PDF文件路径
+            pdf_content: PDF文件内容(字节流)
+            original_filename: 原始文件名
             page_dataset_id: 页面数据集ID
             dataset_name: 数据集名称
             
         Returns:
             包含最终状态的字典
         """
-        logger.info(f"开始运行PDF解析工作流: {pdf_path}")
+        if pdf_content:
+            logger.info(f"开始运行PDF解析工作流: {original_filename or '内存中的PDF'}")
+        else:
+            logger.info(f"开始运行PDF解析工作流: {pdf_path}")
         
         initial_state = PDFParsingState(
             pdf_path=pdf_path,
+            pdf_content=pdf_content,
+            original_filename=original_filename,
             page_dataset_id=page_dataset_id,
             dataset_name=dataset_name
         )

+ 23 - 7
src/utils/http_client.py

@@ -2,6 +2,7 @@ import requests
 import logging
 import os
 import json
+import io
 from typing import Dict, Any, Optional
 from urllib3.util.retry import Retry
 from requests.adapters import HTTPAdapter
@@ -355,7 +356,8 @@ class HTTPClient:
             logger.error(f"Request failed: {str(e)}")
             raise
     
-    def upload_file(self, endpoint: str, file_path: str, file_field_name: str = 'file',
+    def upload_file(self, endpoint: str, file_path: str = None, file_content: io.BytesIO = None, 
+                   file_field_name: str = 'file', original_filename: str = None,
                    data: Optional[Dict] = None, headers: Optional[Dict] = None) -> Dict[str, Any]:
         """
         上传文件
@@ -363,7 +365,9 @@ class HTTPClient:
         Args:
             endpoint: API端点路径(以/开头)
             file_path: 本地文件路径
+            file_content: 文件内容(字节流),如果提供则优先使用
             file_field_name: 表单字段名称
+            original_filename: 原始文件名,如果提供则优先使用
             data: 额外的表单数据
             headers: 自定义请求头
             
@@ -373,11 +377,23 @@ class HTTPClient:
         Raises:
             requests.exceptions.RequestException: 请求失败时抛出
         """
-        # 打开文件并构建files字典
-        with open(file_path, 'rb') as f:
+        # 构建files字典
+        if file_content:
+            # 使用字节流上传
+            filename = original_filename if original_filename else "uploaded_file.pdf"
             files = {
-                file_field_name: (os.path.basename(file_path), f)
+                file_field_name: (filename, file_content)
             }
-            
-            # 发送POST请求
-            return self.post(endpoint, data=data, files=files, headers=headers)
+        else:
+            # 使用文件路径上传
+            with open(file_path, 'rb') as f:
+                filename = original_filename if original_filename else os.path.basename(file_path)
+                files = {
+                    file_field_name: (filename, f)
+                }
+                
+                # 发送POST请求
+                return self.post(endpoint, data=data, files=files, headers=headers)
+        
+        # 发送POST请求(字节流方式)
+        return self.post(endpoint, data=data, files=files, headers=headers)

+ 70 - 0
src/utils/markdown_utils.py

@@ -0,0 +1,70 @@
+"""
+Markdown工具类
+
+提供处理markdown格式内容的工具函数。
+"""
+
+import json
+from typing import Any, Dict, Optional
+
+
+def parse_markdown_json(content: str) -> Optional[Dict[str, Any]]:
+    """
+    解析markdown格式的JSON内容
+    
+    从markdown格式的字符串中提取并解析JSON内容,去除 ```json 和 ``` 标签。
+    
+    Args:
+        content: 包含markdown格式JSON的字符串
+        
+    Returns:
+        解析后的JSON字典,如果解析失败返回None
+    """
+    if not content:
+        return None
+    
+    # 去除首尾空白
+    stripped_content = content.strip()
+    
+    # 检查是否以 ```json 开头并以 ``` 结尾
+    if stripped_content.startswith("```json") and stripped_content.endswith("```"):
+        # 提取 ```json 和 ``` 之间的内容
+        json_content = stripped_content.replace("```json", "", 1)
+        json_content = json_content.rstrip("```").strip()
+        
+        try:
+            # 解析为JSON
+            return json.loads(json_content)
+        except json.JSONDecodeError:
+            # 解析失败
+            return None
+    
+    return None
+
+
+def extract_json_from_markdown(content: str) -> str:
+    """
+    从markdown字符串中提取JSON内容
+    
+    去除 ```json 和 ``` 标签,返回纯JSON字符串。
+    
+    Args:
+        content: 包含markdown格式JSON的字符串
+        
+    Returns:
+        纯JSON字符串,如果没有找到JSON标签返回原始内容
+    """
+    if not content:
+        return content
+    
+    # 去除首尾空白
+    stripped_content = content.strip()
+    
+    # 检查是否以 ```json 开头并以 ``` 结尾
+    if stripped_content.startswith("```json") and stripped_content.endswith("```"):
+        # 提取 ```json 和 ``` 之间的内容
+        json_content = stripped_content.replace("```json", "", 1)
+        json_content = json_content.rstrip("```").strip()
+        return json_content
+    
+    return content

+ 41 - 0
src/utils/ragflow/dataset_service.py

@@ -1,3 +1,4 @@
+import io
 from typing import Dict, Any, List, Optional
 
 class DatasetService:
@@ -179,3 +180,43 @@ class DatasetService:
             return response.get("data", {})
         else:
             raise Exception(f"运行RAPTOR失败: {response.get('message', '未知错误')}")
+    
+    def upload_document(self, dataset_id: str, file_path: str = None, 
+                       file_content: io.BytesIO = None, original_filename: str = None,
+                       metadata: Dict = None) -> Dict[str, Any]:
+        """
+        上传文档到数据集
+        
+        Args:
+            dataset_id: 数据集ID
+            file_path: 本地文件路径
+            file_content: 文件内容(字节流),如果提供则优先使用
+            original_filename: 原始文件名,如果提供则优先使用
+            metadata: 文档元数据
+            
+        Returns:
+            Dict: 响应JSON数据
+        
+        Raises:
+            Exception: 上传失败时抛出
+        """
+        endpoint = f"/api/v1/datasets/{dataset_id}/documents"
+        
+        # 构建表单数据
+        data = {}
+        if metadata:
+            data["metadata"] = metadata
+        
+        # 使用http_client的upload_file方法上传文件
+        response = self.http_client.upload_file(
+            endpoint=endpoint,
+            file_path=file_path,
+            file_content=file_content,
+            original_filename=original_filename,
+            data=data
+        )
+        
+        if response.get("code") == 0 and response.get("data"):
+            return response["data"]
+        else:
+            raise Exception(f"上传文档失败: {response.get('message', '未知错误')}")