| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192 |
- """
- Elasticsearch 文档管理
- """
- from typing import List, Dict, Any, Optional
- from elasticsearch.helpers import bulk, BulkIndexError
- from elasticsearch.exceptions import NotFoundError
- from services.utils.es.base import ESConnection
- class DocumentManager:
- """
- Elasticsearch 文档管理器
- 负责:
- - 文档插入(单条和批量)
- - 文档更新
- - 文档删除(单条和批量)
- - 文档获取
- """
-
- def __init__(self, es_connection: Optional[ESConnection] = None):
- """
- 初始化文档管理器
-
- Args:
- es_connection: ES 连接实例,可选
- """
- self.es_conn = es_connection or ESConnection()
- self.es = self.es_conn.get_client()
-
- def insert(self, index_name: str, document: Dict[str, Any], id: str = None, refresh: bool = False) -> bool:
- """
- 插入单个文档
-
- Args:
- index_name: 索引名称
- document: 文档内容
- id: 文档ID,可选
- refresh: 是否立即刷新
-
- Returns:
- bool: 插入是否成功
- """
- try:
- self.es.index(index=index_name, body=document, id=id, refresh=refresh)
- return True
- except Exception as e:
- print(f"插入文档失败: {e}")
- return False
-
- def bulk_insert(self, index_name: str, documents: List[Dict[str, Any]], refresh: bool = False) -> Dict[str, Any]:
- """
- 批量插入文档
-
- Args:
- index_name: 索引名称
- documents: 文档列表,每个文档可以包含"_id"字段指定ID
- refresh: 是否立即刷新
-
- Returns:
- Dict: 包含成功和失败信息的字典
- """
- try:
- # 准备批量操作
- actions = []
- for doc in documents:
- action = {
- "_index": index_name,
- "_source": doc.copy()
- }
- # 如果文档包含"_id"字段,将其作为文档ID
- if "_id" in doc:
- action["_id"] = doc["_id"]
- del action["_source"]["_id"]
- actions.append(action)
-
- # 执行批量操作
- success, failed = bulk(self.es, actions, refresh=refresh, stats_only=False)
-
- return {
- "success": success,
- "failed": len(failed) if failed else 0,
- "errors": failed if failed else []
- }
- except BulkIndexError as e:
- print(f"批量插入失败: {e}")
- return {
- "success": 0,
- "failed": len(e.errors),
- "errors": e.errors
- }
- except Exception as e:
- print(f"批量插入失败: {e}")
- return {
- "success": 0,
- "failed": len(documents),
- "errors": [str(e)] * len(documents)
- }
-
- def update(self, index_name: str, id: str, update_body: Dict[str, Any], refresh: bool = False) -> bool:
- """
- 更新文档
-
- Args:
- index_name: 索引名称
- id: 文档ID
- update_body: 更新内容,格式如 {"doc": {"field": "value"}}
- refresh: 是否立即刷新
-
- Returns:
- bool: 更新是否成功
- """
- try:
- self.es.update(index=index_name, id=id, body=update_body, refresh=refresh)
- return True
- except NotFoundError:
- print(f"文档不存在: {id}")
- return False
- except Exception as e:
- print(f"更新文档失败: {e}")
- return False
-
- def delete(self, index_name: str, id: str, refresh: bool = False) -> bool:
- """
- 删除单个文档
-
- Args:
- index_name: 索引名称
- id: 文档ID
- refresh: 是否立即刷新
-
- Returns:
- bool: 删除是否成功
- """
- try:
- self.es.delete(index=index_name, id=id, refresh=refresh)
- return True
- except NotFoundError:
- print(f"文档不存在: {id}")
- return False
- except Exception as e:
- print(f"删除文档失败: {e}")
- return False
-
- def delete_by_query(self, index_name: str, query: Dict[str, Any], refresh: bool = False) -> Dict[str, Any]:
- """
- 按查询条件删除文档
-
- Args:
- index_name: 索引名称
- query: 查询条件
- refresh: 是否立即刷新
-
- Returns:
- Dict: 删除结果
- """
- try:
- result = self.es.delete_by_query(index=index_name, body={"query": query}, refresh=refresh)
- return {
- "deleted": result["deleted"],
- "failed": 0
- }
- except Exception as e:
- print(f"按条件删除失败: {e}")
- return {
- "deleted": 0,
- "failed": 1,
- "error": str(e)
- }
-
- def get(self, index_name: str, id: str, fields: List[str] = None) -> Optional[Dict[str, Any]]:
- """
- 获取单个文档
-
- Args:
- index_name: 索引名称
- id: 文档ID
- fields: 要返回的字段列表,可选
-
- Returns:
- Dict: 文档内容,不存在则返回None
- """
- try:
- params = {}
- if fields:
- params["_source"] = fields
- result = self.es.get(index=index_name, id=id, **params)
- return result["_source"]
- except NotFoundError:
- return None
- except Exception as e:
- print(f"获取文档失败: {e}")
- return None
|