""" Elasticsearch 连接管理器(向后兼容接口) 该文件提供与旧版 es_conn.py 兼容的接口,同时内部使用新的工程化模块。 """ import re import json import time from typing import Any, List, Dict, Optional, Union from elasticsearch import Elasticsearch, helpers from elasticsearch.helpers import bulk, BulkIndexError from elastic_transport import ConnectionTimeout from elasticsearch.exceptions import NotFoundError from services.utils.es.base import ESConnection as _ESConnection from services.utils.es.index import IndexManager from services.utils.es.document import DocumentManager from services.utils.es.search import SearchManager # 单例装饰器 class singleton: def __init__(self, cls): self.cls = cls self._instance = None def __call__(self, *args, **kwargs): if self._instance is None: self._instance = self.cls(*args, **kwargs) return self._instance @singleton class ESConnection: """ Elasticsearch 连接管理器(向后兼容) 支持: - 单例模式 - 连接池管理 - CRUD操作 - 向量相似度检索 + 全文检索的混合检索 - 动态模板映射 """ def __init__(self, hosts: List[str] = None, **kwargs): """ 初始化 Elasticsearch 连接 Args: hosts: Elasticsearch 主机列表,格式如 ["http://localhost:9200"] **kwargs: 其他 Elasticsearch 客户端配置参数 """ # 使用新的 ESConnection 作为底层连接 self._es_conn = _ESConnection(hosts=hosts, **kwargs) # 初始化管理器 self.index_manager = IndexManager(self._es_conn) self.document_manager = DocumentManager(self._es_conn) self.search_manager = SearchManager(self._es_conn) # 向后兼容属性 self.es = self._es_conn.get_client() self.dynamic_templates = self._es_conn.dynamic_templates def _get_dynamic_templates(self) -> Dict[str, Any]: """ 获取动态模板映射配置(向后兼容方法) """ return self.dynamic_templates def create_index(self, index_name: str, mappings: Dict[str, Any] = None, settings: Dict[str, Any] = None) -> bool: """ 创建索引 """ return self.index_manager.create_index(index_name, mappings, settings) def insert(self, index_name: str, document: Dict[str, Any], id: str = None, refresh: bool = False) -> bool: """ 插入单个文档 """ return self.document_manager.insert(index_name, document, id, refresh) def bulk_insert(self, index_name: str, documents: List[Dict[str, Any]], refresh: bool = False) -> Dict[str, Any]: """ 批量插入文档 """ return self.document_manager.bulk_insert(index_name, documents, refresh) def update(self, index_name: str, id: str, update_body: Dict[str, Any], refresh: bool = False) -> bool: """ 更新文档 """ return self.document_manager.update(index_name, id, update_body, refresh) def delete(self, index_name: str, id: str, refresh: bool = False) -> bool: """ 删除文档 """ return self.document_manager.delete(index_name, id, refresh) def delete_by_query(self, index_name: str, query: Dict[str, Any], refresh: bool = False) -> Dict[str, Any]: """ 按查询条件删除文档 """ return self.document_manager.delete_by_query(index_name, query, refresh) def get(self, index_name: str, id: str, fields: List[str] = None) -> Optional[Dict[str, Any]]: """ 获取单个文档 """ return self.document_manager.get(index_name, id, fields) def search(self, index_name: str, query: Dict[str, Any], size: int = 10, from_: int = 0, fields: List[str] = None, highlight: Dict[str, Any] = None) -> Dict[str, Any]: """ 搜索文档 """ return self.search_manager.search(index_name, query, size, from_, fields, highlight) def hybrid_search(self, index_name: str, text_query: str, vector_field: str, vector: List[float], size: int = 10, from_: int = 0, fields: List[str] = None, text_weight: float = 0.5, vector_weight: float = 0.5) -> Dict[str, Any]: """ 混合检索:向量相似度检索 + 全文检索 """ return self.search_manager.hybrid_search(index_name, text_query, vector_field, vector, size, from_, fields, text_weight, vector_weight) def knn_search(self, index_name: str, vector_field: str, vector: List[float], k: int = 10, filter_query: Dict[str, Any] = None) -> Dict[str, Any]: """ 向量相似度检索(k-NN) """ return self.search_manager.knn_search(index_name, vector_field, vector, k, filter_query) def close(self): """ 关闭 Elasticsearch 连接 """ self._es_conn.close()