index.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. """
  2. Elasticsearch 索引管理
  3. """
  4. from typing import Dict, Any, Optional
  5. from utils.es.base import ESConnection
  6. class IndexManager:
  7. """
  8. Elasticsearch 索引管理器
  9. 负责:
  10. - 索引创建
  11. - 索引删除
  12. - 索引检查
  13. """
  14. def __init__(self, es_connection: Optional[ESConnection] = None):
  15. """
  16. 初始化索引管理器
  17. Args:
  18. es_connection: ES 连接实例,可选
  19. """
  20. self.es_conn = es_connection or ESConnection()
  21. self.es = self.es_conn.get_client()
  22. def create_index(self, index_name: str, mappings: Dict[str, Any] = None, settings: Dict[str, Any] = None) -> bool:
  23. """
  24. 创建索引
  25. Args:
  26. index_name: 索引名称
  27. mappings: 自定义映射,会与动态模板合并
  28. settings: 索引设置
  29. Returns:
  30. bool: 创建是否成功
  31. """
  32. try:
  33. # 如果索引已存在,返回True
  34. if self.es.indices.exists(index=index_name):
  35. return True
  36. # 合并动态模板和自定义映射
  37. final_mappings = self.es_conn.dynamic_templates.copy()
  38. if mappings:
  39. if "dynamic_templates" in mappings:
  40. final_mappings["dynamic_templates"] += mappings["dynamic_templates"]
  41. if "properties" in mappings:
  42. final_mappings["properties"] = mappings["properties"]
  43. body = {}
  44. if settings:
  45. body["settings"] = settings
  46. body["mappings"] = final_mappings
  47. self.es.indices.create(index=index_name, body=body)
  48. return True
  49. except Exception as e:
  50. print(f"创建索引失败: {e}")
  51. return False
  52. def delete_index(self, index_name: str) -> bool:
  53. """
  54. 删除索引
  55. Args:
  56. index_name: 索引名称
  57. Returns:
  58. bool: 删除是否成功
  59. """
  60. try:
  61. if self.es.indices.exists(index=index_name):
  62. self.es.indices.delete(index=index_name)
  63. return True
  64. except Exception as e:
  65. print(f"删除索引失败: {e}")
  66. return False
  67. def exists(self, index_name: str) -> bool:
  68. """
  69. 检查索引是否存在
  70. Args:
  71. index_name: 索引名称
  72. Returns:
  73. bool: 索引是否存在
  74. """
  75. try:
  76. return self.es.indices.exists(index=index_name)
  77. except Exception as e:
  78. print(f"检查索引存在失败: {e}")
  79. return False
  80. def get_mappings(self, index_name: str) -> Optional[Dict[str, Any]]:
  81. """
  82. 获取索引映射
  83. Args:
  84. index_name: 索引名称
  85. Returns:
  86. Dict[str, Any]: 索引映射,不存在则返回None
  87. """
  88. try:
  89. if self.exists(index_name):
  90. return self.es.indices.get_mapping(index=index_name)
  91. return None
  92. except Exception as e:
  93. print(f"获取索引映射失败: {e}")
  94. return None
  95. def get_settings(self, index_name: str) -> Optional[Dict[str, Any]]:
  96. """
  97. 获取索引设置
  98. Args:
  99. index_name: 索引名称
  100. Returns:
  101. Dict[str, Any]: 索引设置,不存在则返回None
  102. """
  103. try:
  104. if self.exists(index_name):
  105. return self.es.indices.get_settings(index=index_name)
  106. return None
  107. except Exception as e:
  108. print(f"获取索引设置失败: {e}")
  109. return None