| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100 |
- from concurrent.futures import ThreadPoolExecutor
- from langgraph.graph import StateGraph, START, END
- from typing import List, Dict, Any
- from pydantic import BaseModel, Field, ConfigDict
- from model.qwen_vl import QWenVLParser
- from model.openai_chat_model import OpenAIChatModel
- from utils.ragflow.ragflow_service import RAGFlowService
- from model.multimodal_embedding import Embedding
- from conf.settings import model_settings, vector_db_settings
- from utils.infinity import get_client
- from langfuse.langchain import CallbackHandler
- # 定义工作流状态类
- class DatasetSearchState(BaseModel):
- """知识库检索工作流状态"""
- query: str = Field(..., description="用户查询")
- dataset_ids: List[str] = Field(..., description="知识库ID列表")
- results: List[Dict[str, Any]] = Field(default_factory=list, description="检索结果列表")
- # 创建工作流构建器
- class DatasetSearchWorkflow:
- """知识库检索工作流"""
-
- def __init__(self):
- """
- 初始化PDF解析工作流
- """
- self.model_name = model_name
- self.workflow = self._build_workflow()
- self.ragflow_service = RAGFlowService()
- self.langfuse_handler = CallbackHandler()
-
-
- def _build_workflow(self):
- """构建langgraph工作流,实现基于条件路由的并行处理"""
- # 创建状态图
- graph = StateGraph(PDFParsingState)
-
-
- # 编译工作流
- return graph.compile()
- # 意图识别,判断用户问题是图书推荐类、还是图书内容问答类、还是其他问题
- def intent_recognition(self, query: str) -> str:
- """
- 意图识别,判断用户问题是图书推荐类、还是图书内容问答类、还是其他问题
-
- Args:
- query: 用户查询
-
- Returns:
- str: 意图分类,例如"推荐图书"、"图书内容问答"、"其他问题"
- """
- # 初始化 OpenAI Chat 模型
- chat_model = OpenAIChatModel()
- # 构建提示模板
- prompt_template = """
- 你是一个意图分类模型,你的任务是根据用户查询判断其意图是推荐图书、图书内容问答还是其他问题。
- 用户查询:{query}
- 请判断用户意图,并返回分类结果,如果是推荐图书类意图,返回"Recommend_books";
- 如果是图书内容问答类意图,返回"图书内容问答";
- 如果是其他问题,返回"其他问题"。
- 注意:
- 1. 推荐图书类意图指的是用户想知道推荐的图书,例如"推荐一本关于Python的图书"。
- 2. 图书内容问答类意图指的是用户想知道图书的具体内容,例如"这本书的作者是谁"。
- 3. 其他问题指的是用户的查询与推荐图书或图书内容问答无关的问题,例如"你好"、"你是谁"等。
- 4. 意图分类结果必须是"推荐图书"、"图书内容问答"或"其他问题"中的一个。
- """
- # 格式化提示模板
- formatted_prompt = prompt_template.format(query=query)
- # 调用模型生成意图分类
- response = chat_model.invoke(formatted_prompt)
- # 解析模型输出,提取意图分类
- intent = response.content.strip()
- return intent
-
-
- def run(self, query: str, dataset_ids: List[str]) -> Dict[str, Any]:
- """
- 运行知识库检索工作流
-
- Args:
- query: 用户查询
- dataset_ids: 知识库ID列表
-
- Returns:
- Dict: 包含最终状态的字典
- """
- initial_state = DatasetSearchState(
- query=query,
- dataset_ids=dataset_ids
- )
- result = self.workflow.invoke(initial_state, config={"callbacks": [self.langfuse_handler]})
-
- # 检查结果类型,如果是字典直接返回,否则调用dict()方法
- if isinstance(result, dict):
- return result
- else:
- return result.dict()
|