# Infinity搜索API服务 from fastapi import FastAPI, HTTPException from src.api.db.services.vector_search_service import VectorSearchService from src.utils.vector_db import get_vector_db_client from src.common.result import Result from src.utils.async_utils import run_in_threadpool from src.api.db.models import SearchRequest from src.conf.settings import vector_db_settings # 创建FastAPI应用 app = FastAPI( title="Vector Search API", description="向量数据库搜索API服务(支持 Infinity / Elasticsearch)", version="2.0.0" ) # 1. 普通搜索接口 @app.post("/text") async def search(request: SearchRequest): """ 普通搜索接口 - **table_name**: 表名 - **output_fields**: 要返回的字段列表 - **query**: 查询条件,包含field、query和topn字段 - **database_name**: 数据库名称(可选,默认使用客户端配置的数据库) """ try: search_service = VectorSearchService(client=get_vector_db_client()) result = await run_in_threadpool(search_service.search, request) return Result.success(data=result, message="搜索成功") except Exception as e: return Result.error(code=500, message=f"搜索失败: {str(e)}") # 2. 向量搜索接口 @app.post("/vector") async def vector_search(request: SearchRequest): """ 向量搜索接口 - **table_name**: 表名 - **output_fields**: 要返回的字段列表 - **query**: 查询条件,包含vector_field、query_vector和topn字段 - **database_name**: 数据库名称(可选,默认使用客户端配置的数据库) """ try: search_service = VectorSearchService(client=get_vector_db_client()) result = await run_in_threadpool(search_service.vector_search, request) return Result.success(data=result, message="向量搜索成功") except Exception as e: return Result.error(code=500, message=f"向量搜索失败: {str(e)}") # 3. 混合搜索接口 @app.post("/hybrid") async def hybrid_search(request: SearchRequest): """ 混合搜索接口 - **table_name**: 表名 - **output_fields**: 要返回的字段列表 - **query**: 查询条件,包含vector_field、query_vector、field、query、topn和fusion_weight字段 - **database_name**: 数据库名称(可选,默认使用客户端配置的数据库) """ try: search_service = VectorSearchService(client=get_vector_db_client()) result = await run_in_threadpool(search_service.hybrid_search, request) return Result.success(data=result, message="混合搜索成功") except Exception as e: return Result.error(code=500, message=f"混合搜索失败: {str(e)}") # 4. 问答对检索 @app.post("/question") async def question_search(request: SearchRequest): """ 问答对检索接口 - **output_fields**: 要返回的字段列表 - **query**: 查询条件,包含vector_field、query_vector、field、query、topn和fusion_weight字段 - **database_name**: 数据库名称(可选,默认使用客户端配置的数据库) """ try: output_fields = ["content_with_weight"] search_service = VectorSearchService( client=get_vector_db_client(database="ragflow_db"), table_name="ragflow_f3abf26bf80c11f0953d0242ac180002", vector_field="q_1024_vec", output_fields=output_fields ) result = await run_in_threadpool(search_service.hybrid_search, request) return Result.success(data=result, message="问答对检索成功") except Exception as e: return Result.error(code=500, message=f"问答对检索失败: {str(e)}")