es_conn.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. """
  2. Elasticsearch 连接管理器(向后兼容接口)
  3. 该文件提供与旧版 es_conn.py 兼容的接口,同时内部使用新的工程化模块。
  4. """
  5. import re
  6. import json
  7. import time
  8. from typing import Any, List, Dict, Optional, Union
  9. from elasticsearch import Elasticsearch, helpers
  10. from elasticsearch.helpers import bulk, BulkIndexError
  11. from elastic_transport import ConnectionTimeout
  12. from elasticsearch.exceptions import NotFoundError
  13. from services.utils.es.base import ESConnection as _ESConnection
  14. from services.utils.es.index import IndexManager
  15. from services.utils.es.document import DocumentManager
  16. from services.utils.es.search import SearchManager
  17. # 单例装饰器
  18. class singleton:
  19. def __init__(self, cls):
  20. self.cls = cls
  21. self._instance = None
  22. def __call__(self, *args, **kwargs):
  23. if self._instance is None:
  24. self._instance = self.cls(*args, **kwargs)
  25. return self._instance
  26. @singleton
  27. class ESConnection:
  28. """
  29. Elasticsearch 连接管理器(向后兼容)
  30. 支持:
  31. - 单例模式
  32. - 连接池管理
  33. - CRUD操作
  34. - 向量相似度检索 + 全文检索的混合检索
  35. - 动态模板映射
  36. """
  37. def __init__(self, hosts: List[str] = None, **kwargs):
  38. """
  39. 初始化 Elasticsearch 连接
  40. Args:
  41. hosts: Elasticsearch 主机列表,格式如 ["http://localhost:9200"]
  42. **kwargs: 其他 Elasticsearch 客户端配置参数
  43. """
  44. # 使用新的 ESConnection 作为底层连接
  45. self._es_conn = _ESConnection(hosts=hosts, **kwargs)
  46. # 初始化管理器
  47. self.index_manager = IndexManager(self._es_conn)
  48. self.document_manager = DocumentManager(self._es_conn)
  49. self.search_manager = SearchManager(self._es_conn)
  50. # 向后兼容属性
  51. self.es = self._es_conn.get_client()
  52. self.dynamic_templates = self._es_conn.dynamic_templates
  53. def _get_dynamic_templates(self) -> Dict[str, Any]:
  54. """
  55. 获取动态模板映射配置(向后兼容方法)
  56. """
  57. return self.dynamic_templates
  58. def create_index(self, index_name: str, mappings: Dict[str, Any] = None, settings: Dict[str, Any] = None) -> bool:
  59. """
  60. 创建索引
  61. """
  62. return self.index_manager.create_index(index_name, mappings, settings)
  63. def insert(self, index_name: str, document: Dict[str, Any], id: str = None, refresh: bool = False) -> bool:
  64. """
  65. 插入单个文档
  66. """
  67. return self.document_manager.insert(index_name, document, id, refresh)
  68. def bulk_insert(self, index_name: str, documents: List[Dict[str, Any]], refresh: bool = False) -> Dict[str, Any]:
  69. """
  70. 批量插入文档
  71. """
  72. return self.document_manager.bulk_insert(index_name, documents, refresh)
  73. def update(self, index_name: str, id: str, update_body: Dict[str, Any], refresh: bool = False) -> bool:
  74. """
  75. 更新文档
  76. """
  77. return self.document_manager.update(index_name, id, update_body, refresh)
  78. def delete(self, index_name: str, id: str, refresh: bool = False) -> bool:
  79. """
  80. 删除文档
  81. """
  82. return self.document_manager.delete(index_name, id, refresh)
  83. def delete_by_query(self, index_name: str, query: Dict[str, Any], refresh: bool = False) -> Dict[str, Any]:
  84. """
  85. 按查询条件删除文档
  86. """
  87. return self.document_manager.delete_by_query(index_name, query, refresh)
  88. def get(self, index_name: str, id: str, fields: List[str] = None) -> Optional[Dict[str, Any]]:
  89. """
  90. 获取单个文档
  91. """
  92. return self.document_manager.get(index_name, id, fields)
  93. def search(self, index_name: str, query: Dict[str, Any], size: int = 10, from_: int = 0,
  94. fields: List[str] = None, highlight: Dict[str, Any] = None) -> Dict[str, Any]:
  95. """
  96. 搜索文档
  97. """
  98. return self.search_manager.search(index_name, query, size, from_, fields, highlight)
  99. def hybrid_search(self, index_name: str, text_query: str, vector_field: str, vector: List[float],
  100. size: int = 10, from_: int = 0, fields: List[str] = None,
  101. text_weight: float = 0.5, vector_weight: float = 0.5) -> Dict[str, Any]:
  102. """
  103. 混合检索:向量相似度检索 + 全文检索
  104. """
  105. return self.search_manager.hybrid_search(index_name, text_query, vector_field, vector,
  106. size, from_, fields, text_weight, vector_weight)
  107. def knn_search(self, index_name: str, vector_field: str, vector: List[float],
  108. k: int = 10, filter_query: Dict[str, Any] = None) -> Dict[str, Any]:
  109. """
  110. 向量相似度检索(k-NN)
  111. """
  112. return self.search_manager.knn_search(index_name, vector_field, vector, k, filter_query)
  113. def close(self):
  114. """
  115. 关闭 Elasticsearch 连接
  116. """
  117. self._es_conn.close()