| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131 |
- """
- Elasticsearch 索引管理
- """
- from typing import Dict, Any, Optional
- from utils.es.base import ESConnection
- class IndexManager:
- """
- Elasticsearch 索引管理器
- 负责:
- - 索引创建
- - 索引删除
- - 索引检查
- """
-
- def __init__(self, es_connection: Optional[ESConnection] = None):
- """
- 初始化索引管理器
-
- Args:
- es_connection: ES 连接实例,可选
- """
- self.es_conn = es_connection or ESConnection()
- self.es = self.es_conn.get_client()
-
- def create_index(self, index_name: str, mappings: Dict[str, Any] = None, settings: Dict[str, Any] = None) -> bool:
- """
- 创建索引
-
- Args:
- index_name: 索引名称
- mappings: 自定义映射,会与动态模板合并
- settings: 索引设置
-
- Returns:
- bool: 创建是否成功
- """
- try:
- # 如果索引已存在,返回True
- if self.es.indices.exists(index=index_name):
- return True
-
- # 合并动态模板和自定义映射
- final_mappings = self.es_conn.dynamic_templates.copy()
- if mappings:
- if "dynamic_templates" in mappings:
- final_mappings["dynamic_templates"] += mappings["dynamic_templates"]
- if "properties" in mappings:
- final_mappings["properties"] = mappings["properties"]
-
- body = {}
- if settings:
- body["settings"] = settings
- body["mappings"] = final_mappings
-
- self.es.indices.create(index=index_name, body=body)
- return True
- except Exception as e:
- print(f"创建索引失败: {e}")
- return False
-
- def delete_index(self, index_name: str) -> bool:
- """
- 删除索引
-
- Args:
- index_name: 索引名称
-
- Returns:
- bool: 删除是否成功
- """
- try:
- if self.es.indices.exists(index=index_name):
- self.es.indices.delete(index=index_name)
- return True
- except Exception as e:
- print(f"删除索引失败: {e}")
- return False
-
- def exists(self, index_name: str) -> bool:
- """
- 检查索引是否存在
-
- Args:
- index_name: 索引名称
-
- Returns:
- bool: 索引是否存在
- """
- try:
- return self.es.indices.exists(index=index_name)
- except Exception as e:
- print(f"检查索引存在失败: {e}")
- return False
-
- def get_mappings(self, index_name: str) -> Optional[Dict[str, Any]]:
- """
- 获取索引映射
-
- Args:
- index_name: 索引名称
-
- Returns:
- Dict[str, Any]: 索引映射,不存在则返回None
- """
- try:
- if self.exists(index_name):
- return self.es.indices.get_mapping(index=index_name)
- return None
- except Exception as e:
- print(f"获取索引映射失败: {e}")
- return None
-
- def get_settings(self, index_name: str) -> Optional[Dict[str, Any]]:
- """
- 获取索引设置
-
- Args:
- index_name: 索引名称
-
- Returns:
- Dict[str, Any]: 索引设置,不存在则返回None
- """
- try:
- if self.exists(index_name):
- return self.es.indices.get_settings(index=index_name)
- return None
- except Exception as e:
- print(f"获取索引设置失败: {e}")
- return None
|