| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138 |
- """
- Elasticsearch 连接管理器(向后兼容接口)
- 该文件提供与旧版 es_conn.py 兼容的接口,同时内部使用新的工程化模块。
- """
- import re
- import json
- import time
- from typing import Any, List, Dict, Optional, Union
- from elasticsearch import Elasticsearch, helpers
- from elasticsearch.helpers import bulk, BulkIndexError
- from elastic_transport import ConnectionTimeout
- from elasticsearch.exceptions import NotFoundError
- from services.utils.es.base import ESConnection as _ESConnection
- from services.utils.es.index import IndexManager
- from services.utils.es.document import DocumentManager
- from services.utils.es.search import SearchManager
- # 单例装饰器
- class singleton:
- def __init__(self, cls):
- self.cls = cls
- self._instance = None
-
- def __call__(self, *args, **kwargs):
- if self._instance is None:
- self._instance = self.cls(*args, **kwargs)
- return self._instance
- @singleton
- class ESConnection:
- """
- Elasticsearch 连接管理器(向后兼容)
- 支持:
- - 单例模式
- - 连接池管理
- - CRUD操作
- - 向量相似度检索 + 全文检索的混合检索
- - 动态模板映射
- """
-
- def __init__(self, hosts: List[str] = None, **kwargs):
- """
- 初始化 Elasticsearch 连接
-
- Args:
- hosts: Elasticsearch 主机列表,格式如 ["http://localhost:9200"]
- **kwargs: 其他 Elasticsearch 客户端配置参数
- """
- # 使用新的 ESConnection 作为底层连接
- self._es_conn = _ESConnection(hosts=hosts, **kwargs)
-
- # 初始化管理器
- self.index_manager = IndexManager(self._es_conn)
- self.document_manager = DocumentManager(self._es_conn)
- self.search_manager = SearchManager(self._es_conn)
-
- # 向后兼容属性
- self.es = self._es_conn.get_client()
- self.dynamic_templates = self._es_conn.dynamic_templates
-
- def _get_dynamic_templates(self) -> Dict[str, Any]:
- """
- 获取动态模板映射配置(向后兼容方法)
- """
- return self.dynamic_templates
-
- def create_index(self, index_name: str, mappings: Dict[str, Any] = None, settings: Dict[str, Any] = None) -> bool:
- """
- 创建索引
- """
- return self.index_manager.create_index(index_name, mappings, settings)
-
- def insert(self, index_name: str, document: Dict[str, Any], id: str = None, refresh: bool = False) -> bool:
- """
- 插入单个文档
- """
- return self.document_manager.insert(index_name, document, id, refresh)
-
- def bulk_insert(self, index_name: str, documents: List[Dict[str, Any]], refresh: bool = False) -> Dict[str, Any]:
- """
- 批量插入文档
- """
- return self.document_manager.bulk_insert(index_name, documents, refresh)
-
- def update(self, index_name: str, id: str, update_body: Dict[str, Any], refresh: bool = False) -> bool:
- """
- 更新文档
- """
- return self.document_manager.update(index_name, id, update_body, refresh)
-
- def delete(self, index_name: str, id: str, refresh: bool = False) -> bool:
- """
- 删除文档
- """
- return self.document_manager.delete(index_name, id, refresh)
-
- def delete_by_query(self, index_name: str, query: Dict[str, Any], refresh: bool = False) -> Dict[str, Any]:
- """
- 按查询条件删除文档
- """
- return self.document_manager.delete_by_query(index_name, query, refresh)
-
- def get(self, index_name: str, id: str, fields: List[str] = None) -> Optional[Dict[str, Any]]:
- """
- 获取单个文档
- """
- return self.document_manager.get(index_name, id, fields)
-
- def search(self, index_name: str, query: Dict[str, Any], size: int = 10, from_: int = 0,
- fields: List[str] = None, highlight: Dict[str, Any] = None) -> Dict[str, Any]:
- """
- 搜索文档
- """
- return self.search_manager.search(index_name, query, size, from_, fields, highlight)
-
- def hybrid_search(self, index_name: str, text_query: str, vector_field: str, vector: List[float],
- size: int = 10, from_: int = 0, fields: List[str] = None,
- text_weight: float = 0.5, vector_weight: float = 0.5) -> Dict[str, Any]:
- """
- 混合检索:向量相似度检索 + 全文检索
- """
- return self.search_manager.hybrid_search(index_name, text_query, vector_field, vector,
- size, from_, fields, text_weight, vector_weight)
-
- def knn_search(self, index_name: str, vector_field: str, vector: List[float],
- k: int = 10, filter_query: Dict[str, Any] = None) -> Dict[str, Any]:
- """
- 向量相似度检索(k-NN)
- """
- return self.search_manager.knn_search(index_name, vector_field, vector, k, filter_query)
-
- def close(self):
- """
- 关闭 Elasticsearch 连接
- """
- self._es_conn.close()
|