document.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. """
  2. Elasticsearch 文档管理
  3. """
  4. from typing import List, Dict, Any, Optional
  5. from elasticsearch.helpers import bulk, BulkIndexError
  6. from elasticsearch.exceptions import NotFoundError
  7. from utils.es.base import ESConnection
  8. class DocumentManager:
  9. """
  10. Elasticsearch 文档管理器
  11. 负责:
  12. - 文档插入(单条和批量)
  13. - 文档更新
  14. - 文档删除(单条和批量)
  15. - 文档获取
  16. """
  17. def __init__(self, es_connection: Optional[ESConnection] = None):
  18. """
  19. 初始化文档管理器
  20. Args:
  21. es_connection: ES 连接实例,可选
  22. """
  23. self.es_conn = es_connection or ESConnection()
  24. self.es = self.es_conn.get_client()
  25. def insert(self, index_name: str, document: Dict[str, Any], id: str = None, refresh: bool = False) -> bool:
  26. """
  27. 插入单个文档
  28. Args:
  29. index_name: 索引名称
  30. document: 文档内容
  31. id: 文档ID,可选
  32. refresh: 是否立即刷新
  33. Returns:
  34. bool: 插入是否成功
  35. """
  36. try:
  37. self.es.index(index=index_name, body=document, id=id, refresh=refresh)
  38. return True
  39. except Exception as e:
  40. print(f"插入文档失败: {e}")
  41. return False
  42. def bulk_insert(self, index_name: str, documents: List[Dict[str, Any]], refresh: bool = False) -> Dict[str, Any]:
  43. """
  44. 批量插入文档
  45. Args:
  46. index_name: 索引名称
  47. documents: 文档列表,每个文档可以包含"_id"字段指定ID
  48. refresh: 是否立即刷新
  49. Returns:
  50. Dict: 包含成功和失败信息的字典
  51. """
  52. try:
  53. # 准备批量操作
  54. actions = []
  55. for doc in documents:
  56. action = {
  57. "_index": index_name,
  58. "_source": doc.copy()
  59. }
  60. # 如果文档包含"_id"字段,将其作为文档ID
  61. if "_id" in doc:
  62. action["_id"] = doc["_id"]
  63. del action["_source"]["_id"]
  64. actions.append(action)
  65. # 执行批量操作
  66. success, failed = bulk(self.es, actions, refresh=refresh, stats_only=False)
  67. return {
  68. "success": success,
  69. "failed": len(failed) if failed else 0,
  70. "errors": failed if failed else []
  71. }
  72. except BulkIndexError as e:
  73. print(f"批量插入失败: {e}")
  74. return {
  75. "success": 0,
  76. "failed": len(e.errors),
  77. "errors": e.errors
  78. }
  79. except Exception as e:
  80. print(f"批量插入失败: {e}")
  81. return {
  82. "success": 0,
  83. "failed": len(documents),
  84. "errors": [str(e)] * len(documents)
  85. }
  86. def update(self, index_name: str, id: str, update_body: Dict[str, Any], refresh: bool = False) -> bool:
  87. """
  88. 更新文档
  89. Args:
  90. index_name: 索引名称
  91. id: 文档ID
  92. update_body: 更新内容,格式如 {"doc": {"field": "value"}}
  93. refresh: 是否立即刷新
  94. Returns:
  95. bool: 更新是否成功
  96. """
  97. try:
  98. self.es.update(index=index_name, id=id, body=update_body, refresh=refresh)
  99. return True
  100. except NotFoundError:
  101. print(f"文档不存在: {id}")
  102. return False
  103. except Exception as e:
  104. print(f"更新文档失败: {e}")
  105. return False
  106. def delete(self, index_name: str, id: str, refresh: bool = False) -> bool:
  107. """
  108. 删除单个文档
  109. Args:
  110. index_name: 索引名称
  111. id: 文档ID
  112. refresh: 是否立即刷新
  113. Returns:
  114. bool: 删除是否成功
  115. """
  116. try:
  117. self.es.delete(index=index_name, id=id, refresh=refresh)
  118. return True
  119. except NotFoundError:
  120. print(f"文档不存在: {id}")
  121. return False
  122. except Exception as e:
  123. print(f"删除文档失败: {e}")
  124. return False
  125. def delete_by_query(self, index_name: str, query: Dict[str, Any], refresh: bool = False) -> Dict[str, Any]:
  126. """
  127. 按查询条件删除文档
  128. Args:
  129. index_name: 索引名称
  130. query: 查询条件
  131. refresh: 是否立即刷新
  132. Returns:
  133. Dict: 删除结果
  134. """
  135. try:
  136. result = self.es.delete_by_query(index=index_name, body={"query": query}, refresh=refresh)
  137. return {
  138. "deleted": result["deleted"],
  139. "failed": 0
  140. }
  141. except Exception as e:
  142. print(f"按条件删除失败: {e}")
  143. return {
  144. "deleted": 0,
  145. "failed": 1,
  146. "error": str(e)
  147. }
  148. def get(self, index_name: str, id: str, fields: List[str] = None) -> Optional[Dict[str, Any]]:
  149. """
  150. 获取单个文档
  151. Args:
  152. index_name: 索引名称
  153. id: 文档ID
  154. fields: 要返回的字段列表,可选
  155. Returns:
  156. Dict: 文档内容,不存在则返回None
  157. """
  158. try:
  159. params = {}
  160. if fields:
  161. params["_source"] = fields
  162. result = self.es.get(index=index_name, id=id, **params)
  163. return result["_source"]
  164. except NotFoundError:
  165. return None
  166. except Exception as e:
  167. print(f"获取文档失败: {e}")
  168. return None