from concurrent.futures import ThreadPoolExecutor from langgraph.graph import StateGraph, START, END from typing import List, Dict, Any from pydantic import BaseModel, Field, ConfigDict from model.qwen_vl import QWenVLParser from model.openai_chat_model import OpenAIChatModel from utils.ragflow.ragflow_service import RAGFlowService from model.multimodal_embedding import Embedding from conf.settings import model_settings, vector_db_settings from utils.infinity import get_client from langfuse.langchain import CallbackHandler # 定义工作流状态类 class DatasetSearchState(BaseModel): """知识库检索工作流状态""" query: str = Field(..., description="用户查询") dataset_ids: List[str] = Field(..., description="知识库ID列表") results: List[Dict[str, Any]] = Field(default_factory=list, description="检索结果列表") # 创建工作流构建器 class DatasetSearchWorkflow: """知识库检索工作流""" def __init__(self): """ 初始化PDF解析工作流 """ self.model_name = model_name self.workflow = self._build_workflow() self.ragflow_service = RAGFlowService() self.langfuse_handler = CallbackHandler() def _build_workflow(self): """构建langgraph工作流,实现基于条件路由的并行处理""" # 创建状态图 graph = StateGraph(PDFParsingState) # 编译工作流 return graph.compile() # 意图识别,判断用户问题是图书推荐类、还是图书内容问答类、还是其他问题 def intent_recognition(self, query: str) -> str: """ 意图识别,判断用户问题是图书推荐类、还是图书内容问答类、还是其他问题 Args: query: 用户查询 Returns: str: 意图分类,例如"推荐图书"、"图书内容问答"、"其他问题" """ # 初始化 OpenAI Chat 模型 chat_model = OpenAIChatModel() # 构建提示模板 prompt_template = """ 你是一个意图分类模型,你的任务是根据用户查询判断其意图是推荐图书、图书内容问答还是其他问题。 用户查询:{query} 请判断用户意图,并返回分类结果,如果是推荐图书类意图,返回"Recommend_books"; 如果是图书内容问答类意图,返回"图书内容问答"; 如果是其他问题,返回"其他问题"。 注意: 1. 推荐图书类意图指的是用户想知道推荐的图书,例如"推荐一本关于Python的图书"。 2. 图书内容问答类意图指的是用户想知道图书的具体内容,例如"这本书的作者是谁"。 3. 其他问题指的是用户的查询与推荐图书或图书内容问答无关的问题,例如"你好"、"你是谁"等。 4. 意图分类结果必须是"推荐图书"、"图书内容问答"或"其他问题"中的一个。 """ # 格式化提示模板 formatted_prompt = prompt_template.format(query=query) # 调用模型生成意图分类 response = chat_model.invoke(formatted_prompt) # 解析模型输出,提取意图分类 intent = response.content.strip() return intent def run(self, query: str, dataset_ids: List[str]) -> Dict[str, Any]: """ 运行知识库检索工作流 Args: query: 用户查询 dataset_ids: 知识库ID列表 Returns: Dict: 包含最终状态的字典 """ initial_state = DatasetSearchState( query=query, dataset_ids=dataset_ids ) result = self.workflow.invoke(initial_state, config={"callbacks": [self.langfuse_handler]}) # 检查结果类型,如果是字典直接返回,否则调用dict()方法 if isinstance(result, dict): return result else: return result.dict()