dataset_search_workflow.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. from concurrent.futures import ThreadPoolExecutor
  2. from langgraph.graph import StateGraph, START, END
  3. from typing import List, Dict, Any
  4. from pydantic import BaseModel, Field, ConfigDict
  5. from model.qwen_vl import QWenVLParser
  6. from model.openai_chat_model import OpenAIChatModel
  7. from utils.ragflow.ragflow_service import RAGFlowService
  8. from model.multimodal_embedding import Embedding
  9. from conf.settings import model_settings, vector_db_settings
  10. from utils.infinity import get_client
  11. from langfuse.langchain import CallbackHandler
  12. # 定义工作流状态类
  13. class DatasetSearchState(BaseModel):
  14. """知识库检索工作流状态"""
  15. query: str = Field(..., description="用户查询")
  16. dataset_ids: List[str] = Field(..., description="知识库ID列表")
  17. results: List[Dict[str, Any]] = Field(default_factory=list, description="检索结果列表")
  18. # 创建工作流构建器
  19. class DatasetSearchWorkflow:
  20. """知识库检索工作流"""
  21. def __init__(self):
  22. """
  23. 初始化PDF解析工作流
  24. """
  25. self.model_name = model_name
  26. self.workflow = self._build_workflow()
  27. self.ragflow_service = RAGFlowService()
  28. self.langfuse_handler = CallbackHandler()
  29. def _build_workflow(self):
  30. """构建langgraph工作流,实现基于条件路由的并行处理"""
  31. # 创建状态图
  32. graph = StateGraph(PDFParsingState)
  33. # 编译工作流
  34. return graph.compile()
  35. # 意图识别,判断用户问题是图书推荐类、还是图书内容问答类、还是其他问题
  36. def intent_recognition(self, query: str) -> str:
  37. """
  38. 意图识别,判断用户问题是图书推荐类、还是图书内容问答类、还是其他问题
  39. Args:
  40. query: 用户查询
  41. Returns:
  42. str: 意图分类,例如"推荐图书"、"图书内容问答"、"其他问题"
  43. """
  44. # 初始化 OpenAI Chat 模型
  45. chat_model = OpenAIChatModel()
  46. # 构建提示模板
  47. prompt_template = """
  48. 你是一个意图分类模型,你的任务是根据用户查询判断其意图是推荐图书、图书内容问答还是其他问题。
  49. 用户查询:{query}
  50. 请判断用户意图,并返回分类结果,如果是推荐图书类意图,返回"Recommend_books";
  51. 如果是图书内容问答类意图,返回"图书内容问答";
  52. 如果是其他问题,返回"其他问题"。
  53. 注意:
  54. 1. 推荐图书类意图指的是用户想知道推荐的图书,例如"推荐一本关于Python的图书"。
  55. 2. 图书内容问答类意图指的是用户想知道图书的具体内容,例如"这本书的作者是谁"。
  56. 3. 其他问题指的是用户的查询与推荐图书或图书内容问答无关的问题,例如"你好"、"你是谁"等。
  57. 4. 意图分类结果必须是"推荐图书"、"图书内容问答"或"其他问题"中的一个。
  58. """
  59. # 格式化提示模板
  60. formatted_prompt = prompt_template.format(query=query)
  61. # 调用模型生成意图分类
  62. response = chat_model.invoke(formatted_prompt)
  63. # 解析模型输出,提取意图分类
  64. intent = response.content.strip()
  65. return intent
  66. def run(self, query: str, dataset_ids: List[str]) -> Dict[str, Any]:
  67. """
  68. 运行知识库检索工作流
  69. Args:
  70. query: 用户查询
  71. dataset_ids: 知识库ID列表
  72. Returns:
  73. Dict: 包含最终状态的字典
  74. """
  75. initial_state = DatasetSearchState(
  76. query=query,
  77. dataset_ids=dataset_ids
  78. )
  79. result = self.workflow.invoke(initial_state, config={"callbacks": [self.langfuse_handler]})
  80. # 检查结果类型,如果是字典直接返回,否则调用dict()方法
  81. if isinstance(result, dict):
  82. return result
  83. else:
  84. return result.dict()