base.py 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. """
  2. Elasticsearch 连接基础类
  3. """
  4. from typing import List, Dict, Any, Optional
  5. from elasticsearch import Elasticsearch
  6. from elastic_transport import ConnectionTimeout
  7. from utils.decorators import singleton
  8. from utils.es.constants import ES_DEFAULT_CONFIG
  9. from utils.es.templates import get_dynamic_templates
  10. @singleton
  11. class ESConnection:
  12. """
  13. Elasticsearch 连接管理器
  14. 支持:
  15. - 单例模式
  16. - 连接池管理
  17. - 基础配置管理
  18. """
  19. def __init__(self, hosts: List[str] = None, **kwargs):
  20. """
  21. 初始化 Elasticsearch 连接
  22. Args:
  23. hosts: Elasticsearch 主机列表,格式如 ["http://localhost:9200"]
  24. **kwargs: 其他 Elasticsearch 客户端配置参数
  25. """
  26. # 合并配置
  27. self.config = {**ES_DEFAULT_CONFIG, **kwargs}
  28. self.hosts = hosts or ES_DEFAULT_CONFIG.get("hosts", ["http://localhost:9200"])
  29. # 初始化 Elasticsearch 客户端
  30. self.es = Elasticsearch(
  31. hosts=self.hosts,
  32. **self.config
  33. )
  34. # 动态模板映射
  35. self.dynamic_templates = get_dynamic_templates()
  36. def ping(self) -> bool:
  37. """
  38. 检查 ES 连接是否正常
  39. Returns:
  40. bool: 连接是否正常
  41. """
  42. try:
  43. return self.es.ping()
  44. except Exception:
  45. return False
  46. def get_client(self) -> Elasticsearch:
  47. """
  48. 获取 ES 客户端实例
  49. Returns:
  50. Elasticsearch: ES 客户端实例
  51. """
  52. return self.es
  53. def close(self):
  54. """
  55. 关闭 Elasticsearch 连接
  56. """
  57. self.es.close()