""" Elasticsearch 连接基础类 """ from typing import List, Dict, Any, Optional from elasticsearch import Elasticsearch from elastic_transport import ConnectionTimeout from utils.decorators import singleton from utils.es.constants import ES_DEFAULT_CONFIG from utils.es.templates import get_dynamic_templates @singleton class ESConnection: """ Elasticsearch 连接管理器 支持: - 单例模式 - 连接池管理 - 基础配置管理 """ def __init__(self, hosts: List[str] = None, **kwargs): """ 初始化 Elasticsearch 连接 Args: hosts: Elasticsearch 主机列表,格式如 ["http://localhost:9200"] **kwargs: 其他 Elasticsearch 客户端配置参数 """ # 合并配置 self.config = {**ES_DEFAULT_CONFIG, **kwargs} self.hosts = hosts or ES_DEFAULT_CONFIG.get("hosts", ["http://localhost:9200"]) # 初始化 Elasticsearch 客户端 self.es = Elasticsearch( hosts=self.hosts, **self.config ) # 动态模板映射 self.dynamic_templates = get_dynamic_templates() def ping(self) -> bool: """ 检查 ES 连接是否正常 Returns: bool: 连接是否正常 """ try: return self.es.ping() except Exception: return False def get_client(self) -> Elasticsearch: """ 获取 ES 客户端实例 Returns: Elasticsearch: ES 客户端实例 """ return self.es def close(self): """ 关闭 Elasticsearch 连接 """ self.es.close()