""" Elasticsearch 索引管理 """ from typing import Dict, Any, Optional from utils.es.base import ESConnection class IndexManager: """ Elasticsearch 索引管理器 负责: - 索引创建 - 索引删除 - 索引检查 """ def __init__(self, es_connection: Optional[ESConnection] = None): """ 初始化索引管理器 Args: es_connection: ES 连接实例,可选 """ self.es_conn = es_connection or ESConnection() self.es = self.es_conn.get_client() def create_index(self, index_name: str, mappings: Dict[str, Any] = None, settings: Dict[str, Any] = None) -> bool: """ 创建索引 Args: index_name: 索引名称 mappings: 自定义映射,会与动态模板合并 settings: 索引设置 Returns: bool: 创建是否成功 """ try: # 如果索引已存在,返回True if self.es.indices.exists(index=index_name): return True # 合并动态模板和自定义映射 final_mappings = self.es_conn.dynamic_templates.copy() if mappings: if "dynamic_templates" in mappings: final_mappings["dynamic_templates"] += mappings["dynamic_templates"] if "properties" in mappings: final_mappings["properties"] = mappings["properties"] body = {} if settings: body["settings"] = settings body["mappings"] = final_mappings self.es.indices.create(index=index_name, body=body) return True except Exception as e: print(f"创建索引失败: {e}") return False def delete_index(self, index_name: str) -> bool: """ 删除索引 Args: index_name: 索引名称 Returns: bool: 删除是否成功 """ try: if self.es.indices.exists(index=index_name): self.es.indices.delete(index=index_name) return True except Exception as e: print(f"删除索引失败: {e}") return False def exists(self, index_name: str) -> bool: """ 检查索引是否存在 Args: index_name: 索引名称 Returns: bool: 索引是否存在 """ try: return self.es.indices.exists(index=index_name) except Exception as e: print(f"检查索引存在失败: {e}") return False def get_mappings(self, index_name: str) -> Optional[Dict[str, Any]]: """ 获取索引映射 Args: index_name: 索引名称 Returns: Dict[str, Any]: 索引映射,不存在则返回None """ try: if self.exists(index_name): return self.es.indices.get_mapping(index=index_name) return None except Exception as e: print(f"获取索引映射失败: {e}") return None def get_settings(self, index_name: str) -> Optional[Dict[str, Any]]: """ 获取索引设置 Args: index_name: 索引名称 Returns: Dict[str, Any]: 索引设置,不存在则返回None """ try: if self.exists(index_name): return self.es.indices.get_settings(index=index_name) return None except Exception as e: print(f"获取索引设置失败: {e}") return None