浏览代码

知识库上传

yingge 3 月之前
父节点
当前提交
fb3a29ffab

+ 5 - 3
.env

@@ -1,9 +1,10 @@
 # 模型配置
 MODEL_PROVIDER=openai
 MODEL_NAME=Qwen/Qwen3-VL-8B-Instruct
+CHAT_MODEL_NAME=deepseek-ai/DeepSeek-V3.2
 BASE_URL=https://api.siliconflow.cn/v1
 API_KEY=sk-xvrfniafyxprllrgedsgosdwcmfmrbnrvhhztssqsmnzacfj
-DASHSCOPE=sk-bc0f1026a41c4c92beb014be8973e4e2
+DASHSCOPE_API_KEY=sk-bc0f1026a41c4c92beb014be8973e4e2
 # embedding模型配置
 EMBEDDING_MODEL_NAME=Qwen/Qwen3-Embedding-0.6B
 MULTIMODAL_EMBEDDING_MODEL_NAME=qwen2.5-vl-embedding
@@ -12,7 +13,7 @@ RANK_MODEL_NAME=Qwen/Qwen3-Reranker-0.6B
 
 # RAGFLOW配置
 RAGFLOW_API_URL=http://192.168.16.134:9380/
-RAGFLOW_API_KEY=ragflow-sPJ06xiUdRrcfDRlOD-GN2gl-U2DLB-PbgNGckUu0KM
+RAGFLOW_API_KEY=ragflow-HZpOgDg9Vguv2qFRJ4Du7Fo0Lu3AmLaL-Ta0G_R28Y4
 DATASET_ID=a0f1aa03ed2c11f08b8f0242c0a85002
 RAGFLOW_USER_NAME=O75u85uh+PwmwmJvNebYUCNKpD812xhfnQOvB+Mwy+cHQtGBV2dy0tMQKQwGHiW7MiLJkHPqSLn7ULTzav0c2w3yIze71PAcIfxUScautg6xMMgtjHd4ex8peVyXTQcWc0bmD+GxADaZoOMnDz/XNUtB8mggx/VZ1RBdhrZMylTEGQUcUE8ylbLAVgDVOR6iqJEjGNZYoWUFiuYZAB6bIfPdaPG8Kr0KQrqHj3Y0zZHXl92AloXnw5RsaBOc3p01PKc+xYujrb+mdSGs8flzNHMusWhB/0bJ+t0XxqgePymCCk6+7nmm7M6iZ6pRlYEeERsjiNd/1lcMaWxda5jv4A==
 RAGFLOW_PASSWD=ggR2p1L7DRp5v+VcfikmfMYWh99QgBjtQV1fexSFP5C18HVXhkTD2MeMrcnJft8ciGeRUPDStaYZft3CbHOBY3Bzm2+/WX8iVuZ+5kMkmiAL9iUu0RxK74g1x11zy2CAoASy344ZiDy1p1co7hE0ksPdL62U38dxPGPaK8ngqfTsMvpb0XztWBUxaK5Izl3fqPKaYp+eqC75vvj6PeeaajIc7I7dqSai3a6jipruZeA8VxF/cWjVSKubQrm/YLs8mge2mcSM0pyUU2t9LeNS+hOiUyxRAqE2s+yGmRpeCMKIcfeGaurlVesy8v7zjIjZZBbEPV8rSZOEskGRSAG38Q==
@@ -32,6 +33,7 @@ INFINITY_DATABASE=book_image_db
 INFINITY_USER=admin
 INFINITY_PASSWORD=admin
 INFINITY_TABLE_NAME=book_page_image
+INFINITY_PAGE_DATASET_ID=90d73295f02411f0a76b0242c0a85002
 
 # MySQL配置
 MYSQL_HOST=192.168.16.134
@@ -55,7 +57,7 @@ TAG_DB_NAME=default_db
 TAG_DATASET_ID=18caf531f04d11f095670242c0a85002
 TAG_DOCUMENT_ID=3dda0a90f1e211f0a3b80242c0a85002
 #TAG_TABLE_NAME=ragflow_92162247e93e11f084830242ac1d0002_52275b36f03611f0a5340242c0a85002
-TAG_TABLE_NAME=ragflow_92162247e93e11f084830242ac1d0002_18caf531f04d11f095670242c0a85002
+TAG_TABLE_NAME=ragflow_92162247e93e11f084830242ac1d0002_18caf531f04d11f095670242c0a85002  6d2e0990f28b11f0b5200242c0a85002
 
 # LANGFUSE
 LANGFUSE_PUBLIC_KEY=pk-lf-6918a148-be72-4211-a22d-183a23e6643e

文件差异内容过多而无法显示
+ 138 - 0
HTTP API.md


+ 0 - 0
api/dataset/__init__.py


+ 0 - 0
api/dataset/services/__init__.py


+ 56 - 0
api/dataset/services/dataset_manage_service.py

@@ -0,0 +1,56 @@
+"""
+数据集管理服务
+
+该文件提供数据集管理功能,支持:
+- PDF文件解析
+- 数据集创建和管理
+- 调用PDF解析工作流
+"""
+
+import os
+import tempfile
+from typing import Dict, Any, Optional
+from parser.pdf_parser.pdf_parser_workflow import PDFParsingWorkflow
+from conf.settings import vector_db_settings
+
+
+class DatasetManageService:
+    """数据集管理服务类"""
+    
+    def __init__(self):
+        """初始化数据集管理服务"""
+        self.pdf_workflow = PDFParsingWorkflow()
+    
+    def parse_pdf(self, series_name: str, pdf_file: bytes, pdf_filename: str) -> Dict[str, Any]:
+        """
+        解析PDF文件
+        
+        Args:
+            series_name: 系列名
+            pdf_file: PDF文件字节数据
+            pdf_filename: PDF文件名
+            
+        Returns:
+            Dict[str, Any]: 解析结果
+        """
+        try:
+            # 创建临时文件,使用原始文件名称
+            temp_dir = tempfile.gettempdir()
+            temp_file_path = os.path.join(temp_dir, pdf_filename)
+            with open(temp_file_path, 'wb') as temp_file:
+                temp_file.write(pdf_file)
+            
+            try:
+                # 运行PDF解析工作流
+                result = self.pdf_workflow.run(
+                    pdf_path=temp_file_path,
+                    page_dataset_id=vector_db_settings.infinity_page_dataset_id,
+                    dataset_name=series_name
+                )
+                
+                return result
+            finally:
+                # 删除临时文件
+                os.unlink(temp_file_path)
+        except Exception as e:
+            raise Exception(f"解析PDF文件失败: {str(e)}")

+ 0 - 0
api/db/__init__.py


+ 0 - 0
api/sdk/__init__.py


+ 56 - 0
api/sdk/dataset_manage.py

@@ -0,0 +1,56 @@
+"""
+数据集管理 API
+
+该文件提供数据集管理的 API 接口,支持:
+- PDF 文件上传和解析
+- 数据集创建
+"""
+
+from fastapi import FastAPI, HTTPException, UploadFile, File, Form
+from typing import Dict, Any
+from api.dataset.services.dataset_manage_service import DatasetManageService
+
+
+# 创建 FastAPI 应用
+app = FastAPI(
+    title="数据集管理 API",
+    description="数据集管理服务,提供 PDF 解析和数据集创建功能",
+    version="1.0.0"
+)
+
+# 创建数据集管理服务实例
+dataset_service = DatasetManageService()
+
+
+@app.post("/parse-pdf", response_model=Dict[str, Any])
+async def parse_pdf(
+    file: UploadFile = File(...),
+    series_name: str = Form(...)
+    
+):
+    """
+    解析 PDF 文件接口
+    
+    - **file**: PDF 文件附件
+    - **series_name**: 系列名
+    """
+    try:
+        # 验证文件格式
+        if not file.filename.endswith((".pdf", ".PDF")):
+            raise HTTPException(status_code=400, detail="只支持 PDF 格式的文件")
+        
+        # 读取文件内容
+        file_content = await file.read()
+        
+        # 调用解析 PDF 方法
+        result = dataset_service.parse_pdf(
+            series_name=series_name,
+            pdf_file=file_content,
+            pdf_filename=file.filename
+        )
+        
+        return {"success": True, "result": result}
+    except HTTPException as e:
+        raise e
+    except Exception as e:
+        raise HTTPException(status_code=500, detail=f"解析 PDF 文件失败: {str(e)}")

+ 8 - 8
conf/age_level.json

@@ -1,10 +1,10 @@
 {
-    "L1": "0-2",
-    "L2": "2-3",
-    "L3": "3-4",
-    "L4": "4-5",
-    "L5": "5-6",
-    "L6": "6-10",
-    "L7": "10-14",
-    "L8": "14-100"
+    "L1": [0, 1 ,2],
+    "L2": [2, 3],
+    "L3": [3, 4],
+    "L4": [4, 5],
+    "L5": [5, 6],
+    "L6": [6, 7, 8, 9, 10],
+    "L7": [10, 11, 12, 13, 14],
+    "L8": [14, 15, 16, 17, 18, 19, 20]
 }

+ 1 - 1
conf/config.py

@@ -67,7 +67,7 @@ class ModelConfig:
     @staticmethod
     def get_dashscope_api_key() -> str:
         """获取DASHSCOPE API密钥"""
-        return os.getenv("DASHSCOPE", "")
+        return os.getenv("DASHSCOPE_API_KEY", "")
 
 class RagflowConfig:
     """RAGFLOW配置类"""

+ 46 - 0
conf/rag_parser_config.py

@@ -0,0 +1,46 @@
+class RagParserDefaults:
+
+    DATASET_PERMISSION="team"
+
+    DATASET_CHUNK_METHOD="naive"
+
+    DATASET_CONFIG_DICT = {
+            "chunk_token_num": 256,
+            "delimiter": "\n!?;。;!?",
+            "html4excel": False,
+            "layout_recognize": "Pro/Qwen/Qwen2.5-VL-7B-Instruct@SILICONFLOW",
+            "auto_keywords": 5,
+            "tag_kb_ids": [],
+            "topn_tags": 3,
+            "task_page_size": 4,
+            "raptor": {
+                "max_cluster": 64,
+                "max_token": 256,
+                "prompt": "Please summarize the following paragraphs. Be careful with the numbers, do not make things up. Paragraphs as following:\n      {cluster_content}\nThe above is the content you need to summarize.",
+                "random_seed": 0,
+                "threshold": 0.1,
+                "use_raptor": True
+            },
+            "graphrag": {
+                "resolution": True,
+                "use_graphrag": True,
+                "method": "general",
+                "entity_types": [
+                    "event",
+                    "Book",
+                    "Author",
+                    "Illustrator",
+                    "Series",
+                    "Theme",
+                    "Genre",
+                    "Character",
+                    "Setting",
+                    "AgeGroup",
+                    "Competency",
+                    "ArtStyle",
+                    "Award",
+                    "Publisher",
+                    "Role"
+                ]
+            }
+        }

+ 3 - 1
conf/settings.py

@@ -18,12 +18,13 @@ class ModelSettings(BaseSettings):
     """模型配置类"""
     model_provider: str = Field(default="openai", alias="MODEL_PROVIDER")
     model_name: str = Field(default="Qwen/Qwen3-VL-8B-Instruct", alias="MODEL_NAME")
+    chat_model_name: str = Field(default="deepseek-ai/DeepSeek-V3.2", alias="CHAT_MODEL_NAME")
     embedding_model_name: str = Field(default="Qwen/Qwen3-Embedding-0.6B", alias="EMBEDDING_MODEL_NAME")
     base_url: str = Field(default="https://api.openai.com/v1", alias="BASE_URL")
     api_key: str = Field(default="", alias="API_KEY")
     rank_model_name: str = Field(default="Qwen/Qwen3-Reranker-0.6B", alias="RANK_MODEL_NAME")
     multimodal_embedding_model_name: str = Field(default="qwen2.5-vl-embedding", alias="MULTIMODAL_EMBEDDING_MODEL_NAME")
-    dashscope: str = Field(default="", alias="DASHSCOPE")
+    dashscope_api_key: str = Field(default="", alias="DASHSCOPE_API_KEY")
     
     model_config = SettingsConfigDict(
         env_file=".env",
@@ -72,6 +73,7 @@ class VectorDBSettings(BaseSettings):
     infinity_password: str = Field(default="admin", alias="INFINITY_PASSWORD")
     infinity_database: str = Field(default="test", alias="INFINITY_DATABASE")
     infinity_table_name: str = Field(default="test", alias="INFINITY_TABLE_NAME")
+    infinity_page_dataset_id: str = Field(default="", alias="INFINITY_PAGE_DATASET_ID")
     
     model_config = SettingsConfigDict(
         env_file=".env",

+ 5 - 1
main.py

@@ -6,6 +6,7 @@ from contextlib import asynccontextmanager
 # 导入所有子应用
 from api.search_infinity import app as search_app
 from api.tag_manage import app as tag_app
+from api.sdk.dataset_manage import app as dataset_app
 
 # 定义主应用的生命周期管理
 @asynccontextmanager
@@ -47,6 +48,8 @@ main_app = FastAPI(
 main_app.mount("/search", search_app, name="search_api")
 # 2. 标签管理 API - 访问路径: /tag/*
 main_app.mount("/tag", tag_app, name="tag_api")
+# 3. 数据集管理 API - 访问路径: /dataset/*
+main_app.mount("/dataset", dataset_app, name="dataset_api")
 
 # 主应用根路径
 @main_app.get("/")
@@ -57,7 +60,8 @@ async def root():
         "available_apps": {
             "search_api": "访问路径: /search, 文档: /search/docs",
             "hybrid_http_api": "访问路径: /hybrid, 文档: /hybrid/docs",
-            "tag_api": "访问路径: /tag, 文档: /tag/docs"
+            "tag_api": "访问路径: /tag, 文档: /tag/docs",
+            "dataset_api": "访问路径: /dataset, 文档: /dataset/docs"
         }
     }
 

+ 109 - 0
model/openai_chat_model.py

@@ -0,0 +1,109 @@
+from typing import Optional, Dict, Any
+from langchain.chat_models import init_chat_model
+from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
+from conf.settings import model_settings
+
+class OpenAIChatModel:
+    """
+    OpenAI 风格的聊天模型封装
+    
+    基于 langchain 的 init_chat_model 实现,
+    默认使用 deepseek-r1 模型,支持用户指定其他模型。
+    """
+    
+    def __init__(
+        self,
+        model_provider: str = model_settings.model_provider,
+        model_name: str = model_settings.chat_model_name,
+        api_key: Optional[str] = model_settings.api_key,
+        base_url: Optional[str] = model_settings.base_url,
+        temperature: float = 0.7,
+        max_tokens: Optional[int] = None,
+        **kwargs
+    ):
+        """
+        初始化 OpenAI Chat 模型
+        
+        Args:
+            model_name: 模型名称,默认为 "deepseek-r1"
+            api_key: API 密钥
+            base_url: API 基础 URL
+            temperature: 生成文本的随机性,范围 0-2,默认为 0.7
+            max_tokens: 最大生成 token 数
+            **kwargs: 其他参数
+        """
+        # 使用 langchain 的 init_chat_model 初始化模型
+        self.chat_model = init_chat_model(
+            model_provider=model_provider,
+            model=model_name,
+            api_key=api_key,
+            base_url=base_url,
+            temperature=temperature,
+            max_tokens=max_tokens,
+            **kwargs
+        )
+    
+    def get_chat_model(self):
+        """
+        获取聊天模型实例
+        
+        Returns:
+            聊天模型实例(由 langchain.init_chat_model 返回的类型)
+        """
+        return self.chat_model
+    
+    def generate_response(
+        self,
+        prompt: str,
+        system_prompt: Optional[str] = None,
+        **kwargs
+    ) -> str:
+        """
+        生成响应
+        
+        Args:
+            prompt: 用户提示
+            system_prompt: 系统提示
+            **kwargs: 其他参数
+            
+        Returns:
+            str: 生成的响应
+        """
+        # 构建消息列表
+        messages = []
+        
+        # 添加系统提示(如果有)
+        if system_prompt:
+            messages.append(SystemMessage(content=system_prompt))
+        
+        # 添加用户提示
+        messages.append(HumanMessage(content=prompt))
+        
+        # 生成响应
+        response = self.chat_model.invoke(messages, **kwargs)
+        
+        # 解析响应
+        if isinstance(response, AIMessage):
+            return response.content
+        else:
+            # 对于其他类型的响应,尝试获取内容
+            return str(response)
+    
+    def chat(
+        self,
+        prompt: str,
+        system_prompt: Optional[str] = None,
+        **kwargs
+    ) -> str:
+        """
+        聊天接口(别名,向后兼容)
+        
+        Args:
+            prompt: 用户提示
+            system_prompt: 系统提示
+            **kwargs: 其他参数
+            
+        Returns:
+            str: 生成的响应
+        """
+        return self.generate_response(prompt, system_prompt, **kwargs)

+ 1 - 1
model/qwen_vl.py

@@ -4,7 +4,7 @@ import base64
 import io
 from langchain.chat_models import init_chat_model
 from conf.settings import model_settings
-from langfuse.callback import CallbackHandler
+from langfuse.langchain import CallbackHandler
 
 class QWenVLParser:
     """QWEN VL模型图像解析工具"""

+ 115 - 67
parser/pdf_parser/pdf_parser_workflow.py

@@ -1,30 +1,29 @@
 import os
 import concurrent.futures
+import time
 from concurrent.futures import ThreadPoolExecutor
 from langgraph.graph import StateGraph, START, END
 from typing import List, Dict, Any
 from pydantic import BaseModel, Field, ConfigDict
 from parser.pdf_parser.pdf_splitter import PDFSplitter
 from model.qwen_vl import QWenVLParser
-from utils.ragflow_sdk import DataSetUtil, DocumentUtil, ChunkUtil
 from utils.ragflow.ragflow_service import RAGFlowService
 from model.multimodal_embedding import Embedding
 from conf.settings import model_settings, vector_db_settings
 from utils.infinity import get_client
+from langfuse.langchain import CallbackHandler
+from conf.rag_parser_config import RagParserDefaults
 
 # 定义工作流状态类
 class PDFParsingState(BaseModel):
     """PDF解析工作流状态"""
     model_config = ConfigDict(arbitrary_types_allowed=True)
     pdf_path: str = Field(..., description="PDF文件路径")
-    dataset_id: str = Field(..., description="数据集ID")
-    page_dataset_id: str = Field(..., description="页面数据集ID")
-    ragflow_service: RAGFlowService = Field(default_factory=RAGFlowService, description="RAGFlow服务实例")
-    dataset_util: DataSetUtil = Field(default_factory=DataSetUtil, description="数据集工具类实例")
-    document_util: DocumentUtil = Field(default_factory=DocumentUtil, description="文档工具类实例")
-    chunk_util: ChunkUtil = Field(default_factory=ChunkUtil, description="文档工具类实例")
-    embedding_model: Embedding = Field(default_factory=Embedding, description="多模态嵌入模型实例")
+    dataset_name: str = Field(..., description="数据集名称")
+    dataset_id: str = Field(default="", description="RAGFLOW数据集ID")
     document_id: str = Field(default="", description="上传后的文档ID")
+    page_dataset_id: str = Field(..., description="页面数据集ID")
+    page_document_id: str = Field(default="", description="上传后的页面文档ID")
     split_pages: List[Dict[str, Any]] = Field(default_factory=list, description="拆分后的页面列表")
     current_page: Dict[str, Any] = Field(default_factory=dict, description="当前处理的页面")
     parsed_results: List[Dict[str, Any]] = Field(default_factory=list, description="解析结果列表")
@@ -46,6 +45,9 @@ class PDFParsingWorkflow:
         """
         self.model_name = model_name
         self.workflow = self._build_workflow()
+        self.ragflow_service = RAGFlowService()
+        self.langfuse_handler = CallbackHandler()
+        self.embedding_model = Embedding(model_name=model_settings.multimodal_embedding_model_name, api_key=model_settings.dashscope_api_key)
         
     
     def _build_workflow(self):
@@ -53,8 +55,17 @@ class PDFParsingWorkflow:
         # 创建状态图
         graph = StateGraph(PDFParsingState)
         
+        # 添加查询知识库是否存在节点
+        graph.add_node("get_ragflow_dataset", self.get_ragflow_dataset)
+
+        # 添加创建知识库节点
+        graph.add_node("create_ragflow_dataset", self.create_ragflow_dataset)
+
         # 添加上传文档节点
         graph.add_node("upload_document", self._upload_document_node)
+
+        # 添加上传图书页面文档节点
+        graph.add_node("upload_page_document", self._upload_page_document_node)
         
         # 添加解析文档节点
         graph.add_node("parse_document", self._parse_document_node)
@@ -64,6 +75,9 @@ class PDFParsingWorkflow:
         
         # 添加解析图像节点
         graph.add_node("parse_image", self._parse_image_node)
+
+        # 添加解析图书页面图像节点
+        graph.add_node("create_ragflow_chunk", self.create_ragflow_chunk)
         
         # 添加向量化入库节点
         graph.add_node("vectorize_store", self._vectorize_store_node)
@@ -72,10 +86,21 @@ class PDFParsingWorkflow:
         graph.add_node("complete", self._complete_node)
         
         # 定义边
-        # 定义RagFLow解析文档
-        graph.add_edge(START, "upload_document")
+        # 查询知识库是否存在
+        graph.add_edge(START, "get_ragflow_dataset")
+        # 添加条件边,判断知识库是否存在
+        graph.add_conditional_edges(
+            "get_ragflow_dataset",
+            self._check_dataset_exists,
+            {
+                "exists": "upload_document",
+                "not_exists": "create_ragflow_dataset"
+            }
+        )
         # 添加解析文档边
+        graph.add_edge("create_ragflow_dataset", "upload_document")
         graph.add_edge("upload_document", "parse_document")
+        graph.add_edge("upload_document", "upload_page_document")
         graph.add_edge("parse_document", "split_pdf")
         # 定义图片解析边
         graph.add_edge("split_pdf", "parse_image")
@@ -86,45 +111,55 @@ class PDFParsingWorkflow:
             self._should_continue_parsing,
             {
                 "continue": "parse_image",
-                "complete": "vectorize_store"
+                "complete": "vectorize_store",
             }
         )
         
-        # 添加向量化入库边
-        graph.add_edge("vectorize_store", "complete")
+        # 添加从vectorize_store到create_ragflow_chunk的边
+        graph.add_edge("vectorize_store", "create_ragflow_chunk")
+        
+        graph.add_edge("create_ragflow_chunk", "complete")
         
         graph.add_edge("complete", END)
         
         # 编译工作流
         return graph.compile()
     
-    def get_ragflow_dataset(self, dataset_name: str) -> str:
+    def get_ragflow_dataset(self, state: PDFParsingState) -> str:
         """获取RAGFLOW数据集ID"""
         try:
-            dataset_id = self.dataset_util.get_dataset(name=dataset_name)
-            print(f"数据集 {dataset_name} 的ID为: {dataset_id}")
-            return dataset_id
+            dataset = self.ragflow_service.get_dataset(name=state.dataset_name)
+            dataset_id = dataset["id"] if dataset else ""
+            print(f"数据集 {state.dataset_name} 的ID为: {dataset_id}")
+            return {
+                "dataset_id": dataset_id
+            }
         except Exception as e:
-            print(f"获取数据集ID时出错: {str(e)}")
-            raise
+            raise Exception(f"获取数据集ID时出错: {str(e)}")
 
-    def create_ragflow_dataset(self, state: PDFParsingState, dataset_name: str) -> str:
-        """创建RAGFLOW数据集"""
-        if state.dataset_id:
-            print(f"数据集 {dataset_name} 已存在,数据集ID: {state.dataset_id}")
-            return state.dataset_id
+    def _check_dataset_exists(self, state: PDFParsingState) -> str:
+        """检查RAGFLOW数据集是否存在"""
+        # 判断state.dataset_id是否为空,为空则返回"not_exists",否则返回"exists"
+        if state.dataset_id == "":
+            return "not_exists"
+        else:
+            return "exists"
         
-        print(f"开始创建数据集: {dataset_name}")
+    def create_ragflow_dataset(self, state: PDFParsingState) -> str:
+        """创建RAGFLOW数据集"""      
+        print(f"开始创建数据集: {state.dataset_name}")
         
         try: 
             # 创建数据集
-            dataset_id = self.dataset_util.create_dataset(
-                chunk_method="naive",
-                dataset_name=dataset_name,
-                dataset_desc="",
-            )
+            dataset = self.ragflow_service.create_dataset(name=state.dataset_name, description="",
+                                             permission=RagParserDefaults.DATASET_PERMISSION,
+                                             chunk_method=RagParserDefaults.DATASET_CHUNK_METHOD,
+                                             parser_config=RagParserDefaults.DATASET_CONFIG_DICT)
+            dataset_id = dataset["id"]                     
             print(f"数据集创建成功,数据集ID: {dataset_id}")
-            return dataset_id
+            return {
+                "dataset_id": dataset_id
+            }
         except Exception as e:
             print(f"创建数据集时出错: {str(e)}")
             raise
@@ -135,24 +170,42 @@ class PDFParsingWorkflow:
         
         try:
             # 上传文档
-            document_info_list = state.ragflow_service.upload_document(
+            document_info_list = self.ragflow_service.upload_document(
                 dataset_id=state.dataset_id,
                 file_path=state.pdf_path
             )
-            # 上传文档
-            document_info_list2 = state.ragflow_service.upload_document(
-                dataset_id=state.page_dataset_id,
-                file_path=state.pdf_path
-            )
             
             # 检查响应
             if document_info_list and len(document_info_list) > 0:
                 document_id = document_info_list[0]["id"]
-                page_document_id = document_info_list2[0]["id"]
                 print(f"文档上传成功,文档ID: {document_id}")
                 return {
                     "document_id": document_id,
-                    "page_document_id": page_document_id
+                }
+            else:
+                print("文档上传失败: 未返回有效的文档信息")
+                raise Exception("文档上传失败: 未返回有效的文档信息")
+        except Exception as e:
+            print(f"上传文档时出错: {str(e)}")
+            raise
+
+    def _upload_page_document_node(self, state: PDFParsingState) -> Dict[str, Any]:
+        """RAGFLOW上传页面文档节点"""
+        print(f"开始上传页面文档到数据集 {state.dataset_id}: {state.pdf_path}")
+        
+        try:
+            # 上传文档
+            document_info_list = self.ragflow_service.upload_document(
+                dataset_id=state.page_dataset_id,
+                file_path=state.pdf_path
+            ) 
+            
+            # 检查响应
+            if document_info_list and len(document_info_list) > 0:
+                page_document_id = document_info_list[0]["id"]
+                print(f"文档上传成功,文档ID: {page_document_id}")
+                return {
+                    "page_document_id": page_document_id,
                 }
             else:
                 print("文档上传失败: 未返回有效的文档信息")
@@ -167,7 +220,7 @@ class PDFParsingWorkflow:
         
         try:        
             # 解析文档
-            parse_success = state.ragflow_service.parse_document(
+            parse_success = self.ragflow_service.parse_document(
                 dataset_id=state.dataset_id,
                 document_ids=[state.document_id]
             )
@@ -322,26 +375,22 @@ class PDFParsingWorkflow:
         print(f"开始单页上传,共 {len(state.parsed_results)} 页")
         
         # 遍历所有解析结果,上传单页
-        for parsed_result in state.parsed_results:
+         # 遍历所有解析结果,生成向量化文档
+        for i, parsed_result in enumerate(state.parsed_results):
             page_number = parsed_result.get("page_number")
             text = parsed_result.get("content", "")
-            image = state.split_pages[page_number - 1].get("image")
+            image_path = state.split_pages[i].get("image_path")
             
             # 上传单页到RagFlow Chunk
-            chunk = state.chunk_util.add_chunk(
-                dataset_name=state.dataset_name,
-                document_id=state.page_document_id,
-                content=text,
-            )
-
-            infinity_client = get_client()
-            infinity_client.update(database_name=state.dataset_name, table_name="", cond=f"id = {chunk_id}", data={"tag_kwd": tag_name})
-            
-            # 检查响应
-            if document_info and document_info.get("id"):
-                print(f"第 {page_number} 页上传成功,文档ID: {document_info['id']}")
-            else:
-                print(f"第 {page_number} 页上传失败")
+            chunk = self.ragflow_service.create_chunk(dataset_id=state.page_dataset_id, 
+                                              document_id=state.page_document_id, 
+                                              content=text)
+            chunk_id = chunk["chunk"]["id"]
+            print(f"上传第 {page_number} 页,Chunk ID: {chunk_id}")
+            # # 睡眠50ms,避免上传过快
+            # time.sleep(0.05)
+            # result = get_client().update(database_name=state.dataset_name, table_name="", cond=f"id = '{chunk_id}'", data={"img_id": img_id})
+            # print(f"更新第 {page_number} 页,Chunk ID: {chunk_id},结果: {result}")
 
     def _vectorize_store_node(self, state: PDFParsingState) -> Dict[str, Any]:
         """向量化入库节点"""
@@ -349,7 +398,7 @@ class PDFParsingWorkflow:
         
         # 创建索引(如果不存在)
         index_name = f"{vector_db_settings.infinity_table_name}"
-        state.vector_db.create_index(index_name)
+        # get_client().create_index()
         
         # 准备要入库的文档列表
         documents_to_store = []
@@ -369,7 +418,7 @@ class PDFParsingWorkflow:
 
                 # 获取多模态嵌入向量
                 print(f"正在生成第 {page_number} 页的多模态嵌入...")
-                embedding = state.embedding_model.get_multimodal_embedding(text, image)
+                embedding = self.embedding_model.get_multimodal_embedding(text, image)
                 
                 # 生成1024维稠密向量(如果嵌入向量维度不是1024,这里需要处理)
                 dense_vector_1024 = embedding[:1024]  # 取前1024维
@@ -395,8 +444,11 @@ class PDFParsingWorkflow:
         # 批量入库
         if documents_to_store:
             print(f"开始入库,共 {len(documents_to_store)} 个文档")
-            infinity_client = get_client()
-            result = infinity_client.insert(index_name, documents_to_store)
+            result = get_client().insert(
+                table_name=vector_db_settings.infinity_table_name,
+                documents=documents_to_store,
+                database_name=vector_db_settings.infinity_database
+            )
             print(f"入库结果: {result}")
         
         return {
@@ -414,7 +466,7 @@ class PDFParsingWorkflow:
             "is_complete": True
         }
     
-    def run(self, pdf_path: str, page_dataset_id: str, ragflow_api_url: str, rag_flow_api_key: str) -> Dict[str, Any]:
+    def run(self, pdf_path: str, page_dataset_id: str, dataset_name: str) -> Dict[str, Any]:
         """
         运行PDF解析工作流
         
@@ -430,13 +482,9 @@ class PDFParsingWorkflow:
         initial_state = PDFParsingState(
             pdf_path=pdf_path,
             page_dataset_id=page_dataset_id,
-            embedding_model=Embedding(model_name=model_settings.multimodal_embedding_model_name, api_key=model_settings.dashscope_api_key),
-            dataset_util=DataSetUtil(),
-            document_util=DocumentUtil(),
-            chunk_util=ChunkUtil(),
-            ragflow_service=RAGFlowService(api_url=ragflow_api_url, api_key=rag_flow_api_key)
+            dataset_name=dataset_name
         )
-        result = self.workflow.invoke(initial_state)
+        result = self.workflow.invoke(initial_state, config={"callbacks": [self.langfuse_handler]})
         
         # 检查结果类型,如果是字典直接返回,否则调用dict()方法
         if isinstance(result, dict):

+ 1 - 1
parser/pdf_parser/pdf_splitter.py

@@ -22,7 +22,7 @@ class PDFSplitter:
                 - image_path: MinIO中保存的图片URL
         """
         import os
-        from utils.minio.minio_util import MinIOUtil
+        from utils.file.minio.minio_util import MinIOUtil
         
         try:
             # 初始化MinioUtil

+ 94 - 0
test_ragflow_http_api.py

@@ -0,0 +1,94 @@
+from utils.ragflow.ragflow_service import RAGFlowService
+from utils.infinity import InfinityClient
+
+DATASET_CONFIG_DICT = {
+            "chunk_token_num": 256,
+            "delimiter": "\n!?;。;!?",
+            "html4excel": False,
+            "layout_recognize": "Pro/Qwen/Qwen2.5-VL-7B-Instruct@SILICONFLOW",
+            "auto_keywords": 5,
+            "tag_kb_ids": [],
+            "topn_tags": 3,
+            "task_page_size": 4,
+            "raptor": {
+                "max_cluster": 64,
+                "max_token": 256,
+                "prompt": "Please summarize the following paragraphs. Be careful with the numbers, do not make things up. Paragraphs as following:\n      {cluster_content}\nThe above is the content you need to summarize.",
+                "random_seed": 0,
+                "threshold": 0.1,
+                "use_raptor": True
+            },
+            "graphrag": {
+                "resolution": True,
+                "use_graphrag": True,
+                "method": "general",
+                "entity_types": [
+                    "event",
+                    "Book",
+                    "Author",
+                    "Illustrator",
+                    "Series",
+                    "Theme",
+                    "Genre",
+                    "Character",
+                    "Setting",
+                    "AgeGroup",
+                    "Competency",
+                    "ArtStyle",
+                    "Award",
+                    "Publisher"
+                ]
+            }
+        }
+ragflow_service = RAGFlowService()
+
+dataset_ids = [
+    "c2be78a4f10711f095230242c0a85002"
+]
+
+def create_dataset():
+    dataset = ragflow_service.create_dataset(name="test_http_dataset1", description="测试HTTP数据集1",
+                                             permission="team",
+                                             chunk_method="naive",
+                                             parser_config=DATASET_CONFIG_DICT)
+    print(dataset)
+
+def delete_dataset(dataset_id: str):
+    flg = ragflow_service.delete_datasets(dataset_ids=[dataset_id])
+    print(flg)
+
+def delete_datasets(dataset_ids: list[str]):
+    flg = ragflow_service.delete_datasets(dataset_ids=dataset_ids)
+    print(flg)
+
+def list_datasets(name: str = None):
+    datasets = ragflow_service.list_datasets(name=name)
+    print(datasets)
+
+def get_dataset(name: str = None, dataset_id: str = None):
+    dataset = ragflow_service.get_dataset(name=name, dataset_id=dataset_id)
+    print(dataset)
+
+def add_chunk(dataset_id: str, document_id: str, content: str, important_keywords: list[int] = None):
+    chunk = ragflow_service.create_chunk(dataset_id=dataset_id, document_id=document_id, content=content, important_keywords=important_keywords)
+    print(chunk)
+    return chunk
+import os
+def test_image():
+    url = "http://192.168.16.134:9000/bookpage/daa1861c-2096-42c0-b8e3-a163f96f0f66.png?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=ck7I8Esssx6rzZrXQ5uP%2F20260109%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20260109T074307Z&X-Amz-Expires=3600&X-Amz-SignedHeaders=host&X-Amz-Signature=6150ffc414cccbedc255bc0a72d85fd4e693a59b112789af61e8a0e93d00e5dc"
+    # 截取url中的daa1861c-2096-42c0-b8e3-a163f96f0f66.png部分
+    img_id = os.path.basename(url).split("?")[0]
+    print(img_id)
+
+if __name__ == "__main__":
+    test_image()
+    # dataset_id = "18caf531f04d11f095670242c0a85002"
+    # document_id = "3dda0a90f1e211f0a3b80242c0a85002"
+    # tag = "社会L3_人际交往L3_同理心L3"	
+    # content="能感知他人情绪,对同伴的困难产生理解并尝试回应"
+    # important_keywords = ["3", "4"]
+    # chunk = add_chunk(dataset_id=dataset_id, document_id=document_id, content=content, important_keywords=important_keywords)
+    # chunk_id = chunk["chunk"]["id"]
+    # infinity_client = InfinityClient()
+    # infinity_client.update(database_name="default_db", table_name="ragflow_92162247e93e11f084830242ac1d0002_18caf531f04d11f095670242c0a85002", cond=f"id = '{chunk_id}'", data={"tag_kwd": tag})
+    # print(chunk_id)

+ 97 - 0
test_search.py

@@ -0,0 +1,97 @@
+import time
+from langchain.chat_models import init_chat_model
+from model.multimodal_embedding import Embedding
+from conf.settings import model_settings, ragflow_settings, tag_search_settings
+from utils.infinity import get_client
+from model.jina_rerank import JinaRerank
+from langchain_core.documents import Document
+from utils.infinity.result_util import convert_to_langchain_docs
+
+# 初始化多模态嵌入模型
+embedding_model = Embedding(
+    model_name=model_settings.embedding_model_name,
+    api_key=model_settings.api_key
+)
+
+llm = init_chat_model(
+    model_provider=model_settings.model_provider,
+    model=model_settings.model_name,
+    base_url=model_settings.base_url,
+    api_key=model_settings.api_key,
+    temperature=0.3
+)
+
+compressor = JinaRerank(
+    base_url=model_settings.base_url,
+    jina_api_key=model_settings.api_key,
+    model=model_settings.rank_model_name, # 建议先确认硅基后台此模型 ID 是否正确
+    top_n=5
+)
+
+output_fields = [
+                    "important_keywords",
+                    "content",
+                    "kb_id",
+                    "doc_id"
+                ]
+
+search_query = {
+    "matching_text": """
+     莉莉兰中都有哪些小虫虫
+    """,
+    "query_vector": [],
+    "vector_field": "q_1024_vec",
+    "match_field": "content",
+    "topn": 5
+}
+
+def main():
+   
+    
+    infinity_client = get_client(database="ragflow_db")
+    # 问题向量化
+     # 记录开始时间
+    start_time = time.time()
+    embedding = embedding_model.get_text_embedding(search_query["matching_text"])
+    embedding_time = time.time() - start_time
+    print(f"向量化耗时: {embedding_time:.4f} 秒")
+    search_query["query_vector"] = embedding
+    # TAG_TABLE_NAME="ragflow_92162247e93e11f084830242ac1d0002_52275b36f03611f0a5340242c0a85002"
+    TAG_TABLE_NAME="ragflow_92162247e93e11f084830242ac1d0002_6d2e0990f28b11f0b5200242c0a85002"
+    results = infinity_client.vector_search(TAG_TABLE_NAME, output_fields, search_query).to_result()
+    candidate_docs = convert_to_langchain_docs(results)
+    # print(candidate_docs)
+    
+    # # 4. 直接调用重排序逻辑
+    reranked_docs = compressor.compress_documents(
+        documents=candidate_docs, 
+        query=search_query["matching_text"],
+        top_n=3
+    )
+    # # print(reranked_docs)
+    # # 5. 查看结果
+    for i, doc in enumerate(reranked_docs):
+        print(f"排名 {i+1}: 分数 {doc.metadata['relevance_score']:.4f}")
+        print(f"内容: {doc.page_content}")
+        print(f"标签: {doc.metadata['important_keywords']}\n")
+    
+    # 记录结束时间并计算执行时间
+    end_time = time.time()
+    execution_time = end_time - start_time
+    print(f"执行时间: {execution_time:.4f} 秒")
+
+# from utils.asymmetric_encryption import AsymmetricEncryption
+
+# def main2():
+#     # passwd = "zhangqi@lelequ.net"
+#     # loaded_public_pem = AsymmetricEncryption.load_key_from_file(r"D:\project\work\graph_rag_server\public_key.pem")
+#     loaded_private_pem = AsymmetricEncryption.load_key_from_file(r"D:\project\work\graph_rag_server\private_key.pem")
+#     # encrypted = AsymmetricEncryption.encrypt(passwd, loaded_public_pem)
+#     # print(encrypted)
+#     decrypted = AsymmetricEncryption.decrypt(ragflow_settings.ragflow_user_name, loaded_private_pem)
+#     print(decrypted)
+#     # assert decrypted2 == test_message, "使用加载的密钥解密失败!"
+#     # print("✓ 使用加载的密钥加密解密测试通过!")
+
+if __name__ == "__main__":
+    main()

+ 2 - 2
utils/infinity/client.py

@@ -255,8 +255,8 @@ def get_client(
     host: str = vector_db_settings.infinity_host,
     port: str = vector_db_settings.infinity_sdk_port,
     database: str = vector_db_settings.infinity_database,
-    min_connections: int = 2,
-    max_connections: int = 10
+    min_connections: int = 1,
+    max_connections: int = 1
 ) -> InfinityClient:
     """
     获取全局客户端实例(单例模式)

+ 8 - 4
utils/infinity/pool.py

@@ -70,7 +70,9 @@ class InfinityConnectionPool:
     def _init_connections(self):
         """初始化最小连接数"""
         for _ in range(self.min_connections):
-            self._create_connection()
+            # 初始化时需要获取锁,因为_create_connection现在没有内部锁
+            with self.lock:
+                self._create_connection()
     
     def _create_connection(self) -> Any:
         """创建新连接"""
@@ -94,9 +96,9 @@ class InfinityConnectionPool:
             connection.__dict__['_last_used'] = time.time()
             connection.__dict__['_is_valid'] = True
             
-            with self.lock:
-                self.connections.append(connection)
-                self.connection_count += 1
+            # 注意:这里不需要再获取锁,因为调用此方法时已经在acquire方法中持有了锁
+            self.connections.append(connection)
+            self.connection_count += 1
             
             return connection
         except Exception as e:
@@ -106,6 +108,8 @@ class InfinityConnectionPool:
         """检查连接是否有效"""
         try:
             # 通过执行简单查询检查连接是否有效
+            # 注意:这里不应该在持有锁的情况下执行网络操作
+            # 但由于此方法是在锁内被调用的,我们需要尽量减少操作时间
             connection.get_database(self.database)
             return True
         except Exception:

+ 1 - 6
utils/infinity/result_util.py

@@ -67,12 +67,7 @@ def convert_to_langchain_docs(obj: Any) -> List[Document]:
     # 将数据转换为 LangChain 的 Document 格式
     candidate_docs = [
         Document(page_content=item["content"], 
-            metadata={
-                "docnm": item["docnm"], 
-                "tag_kwd": item["tag_kwd"], 
-                "kb_id": item["kb_id"], 
-                "doc_id": item["doc_id"]
-            }) 
+            metadata={k: v for k, v in item.items() if k != "content"}) 
         for item in res[0]
     ]
     return candidate_docs

+ 6 - 6
utils/ragflow/chunk_service.py

@@ -5,12 +5,12 @@ class ChunkService:
         self.http_client = http_client
     
     def create_chunk(self, dataset_id: str, document_id: str, content: str, 
-                    meta_fields: Dict = None) -> Dict[str, Any]:
+                    important_keywords: List[str]) -> Dict[str, Any]:
         endpoint = f"/api/v1/datasets/{dataset_id}/documents/{document_id}/chunks"
         
         data = {"content": content}
-        if meta_fields is not None:
-            data["meta_fields"] = meta_fields
+        if important_keywords is not None:
+            data["important_keywords"] = important_keywords
         
         response = self.http_client.post(endpoint, json_data=data)
         
@@ -20,14 +20,14 @@ class ChunkService:
             raise Exception(f"创建切片失败: {response.get('message', '未知错误')}")
     
     def update_chunk(self, dataset_id: str, chunk_id: str, content: str = None,
-                    meta_fields: Dict = None) -> Dict[str, Any]:
+                    important_keywords: List[str] = None) -> Dict[str, Any]:
         endpoint = f"/api/v1/datasets/{dataset_id}/chunks/{chunk_id}"
         
         data = {}
         if content is not None:
             data["content"] = content
-        if meta_fields is not None:
-            data["meta_fields"] = meta_fields
+        if important_keywords is not None:
+            data["important_keywords"] = important_keywords
         
         response = self.http_client.post(endpoint, json=data)
         

+ 3 - 1
utils/ragflow/dataset_service.py

@@ -31,7 +31,7 @@ class DatasetService:
     def delete_datasets(self, dataset_ids: List[str]) -> bool:
         endpoint = "/api/v1/datasets"
         
-        response = self.http_client.post(endpoint, json_data={"dataset_ids": dataset_ids})
+        response = self.http_client.delete(endpoint, json_data={"ids": dataset_ids})
         
         if response.get("code") == 0:
             return True
@@ -76,6 +76,8 @@ class DatasetService:
         
         if response.get("code") == 0 and response.get("data"):
             return response["data"]
+        elif response.get("code") == 108:
+            return None
         else:
             raise Exception(f"列出数据集失败: {response.get('message', '未知错误')}")
     

+ 12 - 8
utils/ragflow/ragflow_service.py

@@ -92,8 +92,8 @@ class RAGFlowService:
     
     def create_dataset(self, name: str, description: str = None, 
                       embedding_model: str = None, permission: str = None,
-                      chunk_method: str = None) -> DatasetInfo:
-        return self.dataset_service.create_dataset(name, description, embedding_model, permission, chunk_method)
+                      chunk_method: str = None, parser_config: dict = None) -> DatasetInfo:
+        return self.dataset_service.create_dataset(name, description, embedding_model, permission, chunk_method, parser_config)
     
     def delete_datasets(self, dataset_ids: List[str]) -> bool:
         return self.dataset_service.delete_datasets(dataset_ids)
@@ -107,8 +107,12 @@ class RAGFlowService:
                      desc: bool = True, name: str = None, dataset_id: str = None) -> List[DatasetInfo]:
         return self.dataset_service.list_datasets(page, size, orderby, desc, name, dataset_id)
     
-    def get_dataset(self, dataset_id: str) -> DatasetInfo:
-        return self.dataset_service.get_dataset(dataset_id)
+    def get_dataset(self, name: Optional[str] = None, dataset_id: Optional[str] = None) -> DatasetInfo:
+        _list = self.list_datasets(name=name, dataset_id=dataset_id)
+        if _list is None:
+            return None
+        elif len(_list) > 0:
+            return _list[0]
     
     def get_knowledge_graph(self, dataset_id: str) -> Dict[str, Any]:
         return self.dataset_service.get_knowledge_graph(dataset_id)
@@ -167,12 +171,12 @@ class RAGFlowService:
         return self.document_service.parse_document(dataset_id, document_ids)
     
     def create_chunk(self, dataset_id: str, document_id: str, content: str, 
-                    meta_fields: Dict = None) -> ChunkInfo:
-        return self.chunk_service.create_chunk(dataset_id, document_id, content, meta_fields)
+                    important_keywords: List[str] = None) -> ChunkInfo:
+        return self.chunk_service.create_chunk(dataset_id, document_id, content, important_keywords)
     
     def update_chunk(self, dataset_id: str, chunk_id: str, content: str = None,
-                    meta_fields: Dict = None) -> ChunkInfo:
-        return self.chunk_service.update_chunk(dataset_id, chunk_id, content, meta_fields)
+                    important_keywords: List[str] = None) -> ChunkInfo:
+        return self.chunk_service.update_chunk(dataset_id, chunk_id, content, important_keywords)
     
     def delete_chunk(self, dataset_id: str, chunk_id: str) -> bool:
         return self.chunk_service.delete_chunk(dataset_id, chunk_id)

+ 100 - 0
workflow/search/dataset_search_workflow.py

@@ -0,0 +1,100 @@
+from concurrent.futures import ThreadPoolExecutor
+from langgraph.graph import StateGraph, START, END
+from typing import List, Dict, Any
+from pydantic import BaseModel, Field, ConfigDict
+from model.qwen_vl import QWenVLParser
+from model.openai_chat_model import OpenAIChatModel
+from utils.ragflow.ragflow_service import RAGFlowService
+from model.multimodal_embedding import Embedding
+from conf.settings import model_settings, vector_db_settings
+from utils.infinity import get_client
+from langfuse.langchain import CallbackHandler
+
+# 定义工作流状态类
+class DatasetSearchState(BaseModel):
+    """知识库检索工作流状态"""
+    query: str = Field(..., description="用户查询")
+    dataset_ids: List[str] = Field(..., description="知识库ID列表")
+    results: List[Dict[str, Any]] = Field(default_factory=list, description="检索结果列表")
+
+# 创建工作流构建器
+class DatasetSearchWorkflow:
+    """知识库检索工作流"""
+    
+    def __init__(self):
+        """
+        初始化PDF解析工作流
+        """
+        self.model_name = model_name
+        self.workflow = self._build_workflow()
+        self.ragflow_service = RAGFlowService()
+        self.langfuse_handler = CallbackHandler()
+        
+    
+    def _build_workflow(self):
+        """构建langgraph工作流,实现基于条件路由的并行处理"""
+        # 创建状态图
+        graph = StateGraph(PDFParsingState)
+        
+
+        
+        # 编译工作流
+        return graph.compile()
+
+    # 意图识别,判断用户问题是图书推荐类、还是图书内容问答类、还是其他问题
+    def intent_recognition(self, query: str) -> str:
+        """
+        意图识别,判断用户问题是图书推荐类、还是图书内容问答类、还是其他问题
+        
+        Args:
+            query: 用户查询
+            
+        Returns:
+            str: 意图分类,例如"推荐图书"、"图书内容问答"、"其他问题"
+        """
+        # 初始化 OpenAI Chat 模型
+        chat_model = OpenAIChatModel()
+        # 构建提示模板
+        prompt_template = """
+        你是一个意图分类模型,你的任务是根据用户查询判断其意图是推荐图书、图书内容问答还是其他问题。
+        用户查询:{query}
+        请判断用户意图,并返回分类结果,如果是推荐图书类意图,返回"Recommend_books";
+        如果是图书内容问答类意图,返回"图书内容问答";
+        如果是其他问题,返回"其他问题"。
+        注意:
+        1. 推荐图书类意图指的是用户想知道推荐的图书,例如"推荐一本关于Python的图书"。
+        2. 图书内容问答类意图指的是用户想知道图书的具体内容,例如"这本书的作者是谁"。
+        3. 其他问题指的是用户的查询与推荐图书或图书内容问答无关的问题,例如"你好"、"你是谁"等。
+        4. 意图分类结果必须是"推荐图书"、"图书内容问答"或"其他问题"中的一个。
+        """
+        # 格式化提示模板
+        formatted_prompt = prompt_template.format(query=query)
+        # 调用模型生成意图分类
+        response = chat_model.invoke(formatted_prompt)
+        # 解析模型输出,提取意图分类
+        intent = response.content.strip()
+        return intent
+    
+    
+    def run(self, query: str, dataset_ids: List[str]) -> Dict[str, Any]:
+        """
+        运行知识库检索工作流
+        
+        Args:
+            query: 用户查询
+            dataset_ids: 知识库ID列表
+            
+        Returns:
+            Dict: 包含最终状态的字典
+        """
+        initial_state = DatasetSearchState(
+            query=query,
+            dataset_ids=dataset_ids
+        )
+        result = self.workflow.invoke(initial_state, config={"callbacks": [self.langfuse_handler]})
+        
+        # 检查结果类型,如果是字典直接返回,否则调用dict()方法
+        if isinstance(result, dict):
+            return result
+        else:
+            return result.dict()

部分文件因为文件数量过多而无法显示