""" Elasticsearch 搜索管理 """ from typing import List, Dict, Any, Optional from utils.es.base import ESConnection class SearchManager: """ Elasticsearch 搜索管理器 负责: - 全文检索 - 向量相似度检索(k-NN) - 混合检索(文本+向量) - 高亮显示 """ def __init__(self, es_connection: Optional[ESConnection] = None): """ 初始化搜索管理器 Args: es_connection: ES 连接实例,可选 """ self.es_conn = es_connection or ESConnection() self.es = self.es_conn.get_client() def search(self, index_name: str, query: Dict[str, Any], size: int = 10, from_: int = 0, fields: List[str] = None, highlight: Dict[str, Any] = None) -> Dict[str, Any]: """ 搜索文档 Args: index_name: 索引名称 query: 查询条件 size: 返回结果数量 from_: 起始位置 fields: 要返回的字段列表,可选 highlight: 高亮配置,可选 Returns: Dict: 搜索结果 """ try: body = { "query": query, "size": size, "from": from_ } if fields: body["_source"] = fields if highlight: body["highlight"] = highlight result = self.es.search(index=index_name, body=body) return result except Exception as e: print(f"搜索失败: {e}") return {"hits": {"total": 0, "hits": []}} def hybrid_search(self, index_name: str, text_query: str, vector_field: str, vector: List[float], size: int = 10, from_: int = 0, fields: List[str] = None, text_weight: float = 0.5, vector_weight: float = 0.5) -> Dict[str, Any]: """ 混合检索:向量相似度检索 + 全文检索 Args: index_name: 索引名称 text_query: 文本查询语句 vector_field: 向量字段名 vector: 检索向量 size: 返回结果数量 from_: 起始位置 fields: 要返回的字段列表,可选 text_weight: 文本检索权重 vector_weight: 向量检索权重 Returns: Dict: 搜索结果 """ try: # 构建混合检索查询 query = { "bool": { "should": [ { "query_string": { "query": text_query, "default_operator": "OR", "boost": text_weight } }, { "script_score": { "query": { "match_all": {} }, "script": { "source": "cosineSimilarity(params.query_vector, doc[params.vector_field]) + 1.0", "params": { "query_vector": vector, "vector_field": vector_field } }, "boost": vector_weight } } ] } } body = { "query": query, "size": size, "from": from_ } if fields: body["_source"] = fields result = self.es.search(index=index_name, body=body) return result except Exception as e: print(f"混合检索失败: {e}") return {"hits": {"total": 0, "hits": []}} def knn_search(self, index_name: str, vector_field: str, vector: List[float], k: int = 10, filter_query: Dict[str, Any] = None) -> Dict[str, Any]: """ 向量相似度检索(k-NN) Args: index_name: 索引名称 vector_field: 向量字段名 vector: 检索向量 k: 返回结果数量 filter_query: 过滤条件,可选 Returns: Dict: 搜索结果 """ try: knn = { "field": vector_field, "query_vector": vector, "k": k, "num_candidates": k * 10 } if filter_query: knn["filter"] = filter_query body = { "knn": knn } result = self.es.search(index=index_name, body=body) return result except Exception as e: print(f"向量检索失败: {e}") return {"hits": {"total": 0, "hits": []}} def match_search(self, index_name: str, field: str, value: str, size: int = 10, fields: List[str] = None) -> Dict[str, Any]: """ 简单匹配搜索 Args: index_name: 索引名称 field: 字段名 value: 匹配值 size: 返回结果数量 fields: 要返回的字段列表,可选 Returns: Dict: 搜索结果 """ query = { "match": { field: value } } return self.search(index_name, query, size=size, fields=fields) def match_all(self, index_name: str, size: int = 10, fields: List[str] = None) -> Dict[str, Any]: """ 匹配所有文档 Args: index_name: 索引名称 size: 返回结果数量 fields: 要返回的字段列表,可选 Returns: Dict: 搜索结果 """ query = { "match_all": {} } return self.search(index_name, query, size=size, fields=fields)