""" Elasticsearch 文档管理 """ from typing import List, Dict, Any, Optional from elasticsearch.helpers import bulk, BulkIndexError from elasticsearch.exceptions import NotFoundError from utils.es.base import ESConnection class DocumentManager: """ Elasticsearch 文档管理器 负责: - 文档插入(单条和批量) - 文档更新 - 文档删除(单条和批量) - 文档获取 """ def __init__(self, es_connection: Optional[ESConnection] = None): """ 初始化文档管理器 Args: es_connection: ES 连接实例,可选 """ self.es_conn = es_connection or ESConnection() self.es = self.es_conn.get_client() def insert(self, index_name: str, document: Dict[str, Any], id: str = None, refresh: bool = False) -> bool: """ 插入单个文档 Args: index_name: 索引名称 document: 文档内容 id: 文档ID,可选 refresh: 是否立即刷新 Returns: bool: 插入是否成功 """ try: self.es.index(index=index_name, body=document, id=id, refresh=refresh) return True except Exception as e: print(f"插入文档失败: {e}") return False def bulk_insert(self, index_name: str, documents: List[Dict[str, Any]], refresh: bool = False) -> Dict[str, Any]: """ 批量插入文档 Args: index_name: 索引名称 documents: 文档列表,每个文档可以包含"_id"字段指定ID refresh: 是否立即刷新 Returns: Dict: 包含成功和失败信息的字典 """ try: # 准备批量操作 actions = [] for doc in documents: action = { "_index": index_name, "_source": doc.copy() } # 如果文档包含"_id"字段,将其作为文档ID if "_id" in doc: action["_id"] = doc["_id"] del action["_source"]["_id"] actions.append(action) # 执行批量操作 success, failed = bulk(self.es, actions, refresh=refresh, stats_only=False) return { "success": success, "failed": len(failed) if failed else 0, "errors": failed if failed else [] } except BulkIndexError as e: print(f"批量插入失败: {e}") return { "success": 0, "failed": len(e.errors), "errors": e.errors } except Exception as e: print(f"批量插入失败: {e}") return { "success": 0, "failed": len(documents), "errors": [str(e)] * len(documents) } def update(self, index_name: str, id: str, update_body: Dict[str, Any], refresh: bool = False) -> bool: """ 更新文档 Args: index_name: 索引名称 id: 文档ID update_body: 更新内容,格式如 {"doc": {"field": "value"}} refresh: 是否立即刷新 Returns: bool: 更新是否成功 """ try: self.es.update(index=index_name, id=id, body=update_body, refresh=refresh) return True except NotFoundError: print(f"文档不存在: {id}") return False except Exception as e: print(f"更新文档失败: {e}") return False def delete(self, index_name: str, id: str, refresh: bool = False) -> bool: """ 删除单个文档 Args: index_name: 索引名称 id: 文档ID refresh: 是否立即刷新 Returns: bool: 删除是否成功 """ try: self.es.delete(index=index_name, id=id, refresh=refresh) return True except NotFoundError: print(f"文档不存在: {id}") return False except Exception as e: print(f"删除文档失败: {e}") return False def delete_by_query(self, index_name: str, query: Dict[str, Any], refresh: bool = False) -> Dict[str, Any]: """ 按查询条件删除文档 Args: index_name: 索引名称 query: 查询条件 refresh: 是否立即刷新 Returns: Dict: 删除结果 """ try: result = self.es.delete_by_query(index=index_name, body={"query": query}, refresh=refresh) return { "deleted": result["deleted"], "failed": 0 } except Exception as e: print(f"按条件删除失败: {e}") return { "deleted": 0, "failed": 1, "error": str(e) } def get(self, index_name: str, id: str, fields: List[str] = None) -> Optional[Dict[str, Any]]: """ 获取单个文档 Args: index_name: 索引名称 id: 文档ID fields: 要返回的字段列表,可选 Returns: Dict: 文档内容,不存在则返回None """ try: params = {} if fields: params["_source"] = fields result = self.es.get(index=index_name, id=id, **params) return result["_source"] except NotFoundError: return None except Exception as e: print(f"获取文档失败: {e}") return None