#!/usr/bin/env python3 """ 混合检索MCP服务 使用fastmcp框架实现,提供图片解析后的向量化入库和混合检索功能 """ import sys import os import requests from io import BytesIO from typing import List, Dict, Any from fastmcp import FastMCP # 添加项目根目录到Python路径 sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from PIL import Image from utils.infinity_util import InfinityVectorDB from model.multimodal_embedding import Embedding from conf.settings import model_settings, ragflow_settings, vector_db_settings # 初始化fastmcp应用 mcp = FastMCP("Multi_Vector_Search") # 初始化向量数据库 vector_db = InfinityVectorDB() # 初始化多模态嵌入模型 embedding_model = Embedding( model_name=model_settings.multimodal_embedding_model_name, api_key=model_settings.dashscope_api_key ) @mcp.tool(name="hybrid_search") def hybrid_search(request: Dict[str, Any]) -> Dict[str, Any]: """ 混合检索API 使用文本查询和向量查询进行混合检索 """ try: # 解析请求参数 text_query = request["text_query"] image_url = request["image"] topn = request.get("topn", 2) print(f"开始混合检索,数据库: {vector_db_settings.infinity_database}, 知识库id: {ragflow_settings.dataset_id}, 文本查询: {text_query}, 返回数量: {topn}") # 构建索引名称 index_name = f"pdf_documents_{ragflow_settings.dataset_id}" print(f"开始生成多模态嵌入,文本长度: {len(text_query)}") # 处理image_url为image: Image.Image if isinstance(image_url, str): # 下载图片 response = requests.get(image_url) response.raise_for_status() # 检查HTTP状态码 # 将响应内容转换为字节流 image_bytes = BytesIO(response.content) # 创建Image对象 image = Image.open(image_bytes) # 生成多模态嵌入向量 embedding = embedding_model.get_multimodal_embedding(text_query, image) print(f"多模态嵌入生成完成,向量长度: {len(embedding)}") # 执行混合检索 result = vector_db.hybrid_search( index_name=index_name, match_method="dense", vector_field="dense_vector_1024", query_vector=embedding, element_type="float", metric_type="cosine", topn=topn, text_query=text_query, text_field="content" ) print(f"混合检索完成,总命中数: {result.get('total', 0)}") # 返回成功响应 return { "success": True, "message": "混合检索成功", "output": result.get("output", []), "total": result.get("total", topn) } except Exception as e: print(f"混合检索失败: {str(e)}") return { "success": False, "message": str(e) } if __name__ == "__main__": mcp.run(transport="sse", host="0.0.0.0", port=18000) # 启动HTTP服务器,使用uvicorn运行FastAPI应用 # import uvicorn # uvicorn.run(mcp.http_app, host="0.0.0.0", port=18000, transport="stdio")