# Elasticsearch 连接器技术文档 ## 概述 本文档详细解析 `es_conn.py` 文件的设计思路与具体实现。该文件实现了一个基于 Elasticsearch 的数据库连接器,提供完整的 CRUD 操作支持,并具备混合搜索(文本+向量)、批量处理、重试机制等高级功能。 --- ## 文件结构总览 ### 1. 版权声明与许可 (1-15行) 文件开头的 Apache License 2.0 声明表明这是一个开源项目,遵循 Apache 开源协议。该协议允许自由使用、修改和分发代码,但需要保留原始版权声明和许可证文本。 ### 2. 导入模块分析 (17-28行) ```python import re # 正则表达式处理 import json # JSON 序列化 import time # 时间控制 import copy # 深拷贝操作 from elasticsearch_dsl import UpdateByQuery, Q, Search # ES DSL 查询构建器 from elastic_transport import ConnectionTimeout # 连接超时异常 from common.decorator import singleton # 单例装饰器 from common.doc_store.doc_store_base import MatchTextExpr, OrderByExpr, MatchExpr, MatchDenseExpr, FusionExpr # 查询表达式基类 from common.doc_store.es_conn_base import ESConnectionBase # 基础连接类 from common.float_utils import get_float # 浮点数处理工具 from common.constants import PAGERANK_FLD, TAG_FLD # 常量定义 ``` **设计思路**: - 使用 `elasticsearch_dsl` 库构建类型安全的查询 DSL(领域特定语言) - 通过 `singleton` 装饰器确保全局只有一个连接实例,避免连接池浪费 - 自定义异常类型 `ConnectionTimeout` 用于优雅处理超时情况 - 导入基类 `ESConnectionBase` 遵循开放/封闭原则,便于扩展 ### 3. 常量定义 (30行) ```python ATTEMPT_TIME = 2 ``` **设计思路**:定义重试次数为 2 次,在可靠性和性能之间取得平衡。网络操作可能因临时故障失败,轻量级重试机制可提高成功率。 --- ## 核心类设计 ### ESConnection 类 (33-375行) #### 类的继承关系 ```python @singleton class ESConnection(ESConnectionBase): ``` **设计思路**: - 继承自 `ESConnectionBase` 抽象基类,遵循依赖倒置原则 - 使用 `@singleton` 装饰器确保单例模式,整个应用生命周期内只有一个实例 - 单例模式避免了频繁创建/销毁连接的开销,提高了资源利用效率 --- ## 搜索功能详解 (39-169行) ### 方法签名 ```python def search( self, select_fields: list[str], highlight_fields: list[str], condition: dict, match_expressions: list[MatchExpr], order_by: OrderByExpr, offset: int, limit: int, index_names: str | list[str], knowledgebase_ids: list[str], agg_fields: list[str] | None = None, rank_feature: dict | None = None ): ``` **参数设计思路**: - `select_fields` 和 `highlight_fields` 分离了字段选择和高亮显示的关注点 - `condition` 使用字典存储查询条件,提供了灵活的条件组合方式 - `match_expressions` 使用表达式模式,支持多种匹配类型的组合 - `knowledgebase_ids` 支持多知识库搜索场景 ### 索引名称处理 (55-58行) ```python if isinstance(index_names, str): index_names = index_names.split(",") assert isinstance(index_names, list) and len(index_names) > 0 assert "_id" not in condition" ``` **设计思路**: - 兼容字符串和列表两种输入形式,提高了 API 的易用性 - 使用断言确保参数的有效性,在开发阶段快速捕获错误 - 禁止直接使用 `_id` 作为查询条件,防止潜在的注入风险 ### 查询条件构建 (60-78行) ```python bool_query = Q("bool", must=[]) condition["kb_id"] = knowledgebase_ids for k, v in condition.items(): if k == "available_int": if v == 0: bool_query.filter.append(Q("range", available_int={"lt": 1})) else: bool_query.filter.append( Q("bool", must_not=Q("range", available_int={"lt": 1}"))) continue if not v: continue if isinstance(v, list): bool_query.filter.append(Q("terms", **{k: v})) elif isinstance(v, str) or isinstance(v, int): bool_query.filter.append(Q("term", **{k: v})) else: raise Exception(...) ``` **设计思路**: - 使用 Elasticsearch 的 bool 查询作为基础,灵活组合 must/filter/must_not 等条件 - 特殊处理 `available_int` 字段,实现可用性过滤逻辑 - 根据值的类型自动选择 `terms`(列表)或 `term`(单值)查询方式 - 跳过空值条件,避免不必要的查询开销 ### 混合搜索权重计算 (80-89行) ```python s = Search() vector_similarity_weight = 0.5 for m in match_expressions: if isinstance(m, FusionExpr) and m.method == "weighted_sum" and "weights" in m.fusion_params: assert len(match_expressions) == 3 and isinstance(match_expressions[0], MatchTextExpr) and isinstance( match_expressions[1], MatchDenseExpr) and isinstance( match_expressions[2], FusionExpr) weights = m.fusion_params["weights"] vector_similarity_weight = get_float(weights.split(",")[1]) ``` **设计思路**: - 支持混合搜索场景,同时处理文本匹配和向量相似度搜索 - 从权重参数中解析向量搜索的权重比例 - 使用断言确保混合搜索的表达式组合符合预期模式 ### 文本匹配处理 (91-99行) ```python for m in match_expressions: if isinstance(m, MatchTextExpr): minimum_should_match = m.extra_options.get("minimum_should_match", 0.0) if isinstance(minimum_should_match, float): minimum_should_match = str(int(minimum_should_match * 100)) + "%" bool_query.must.append(Q("query_string", fields=m.fields, type="best_fields", query=m.matching_text, minimum_should_match=minimum_should_match, boost=1)) bool_query.boost = 1.0 - vector_similarity_weight ``` **设计思路**: - 使用 `query_string` 查询支持复杂的查询语法 - `best_fields` 类型在多字段搜索中返回最佳匹配字段的分数 - 将浮点数百分比转换为 Elasticsearch 要求的字符串格式 - 根据混合搜索权重调整文本查询的 boost 值 ### 向量搜索处理 (101-112行) ```python elif isinstance(m, MatchDenseExpr): assert (bool_query is not None) similarity = 0.0 if "similarity" in m.extra_options: similarity = m.extra_options["similarity"] s = s.knn(m.vector_column_name, m.topn, m.topn * 2, query_vector=list(m.embedding_data), filter=bool_query.to_dict(), similarity=similarity, ) ``` **设计思路**: - 使用 Elasticsearch 的 k-NN(最近邻)搜索功能进行向量相似度匹配 - `topn * 2` 的设计是为了在应用其他过滤条件后仍能返回足够的候选结果 - 将查询过滤器传递给 k-NN 搜索,实现向量搜索与过滤条件的组合 - 支持自定义相似度阈值 ### 排名特征增强 (114-118行) ```python if bool_query and rank_feature: for fld, sc in rank_feature.items(): if fld != PAGERANK_FLD: fld = f"{TAG_FLD}.{fld}" bool_query.should.append(Q("rank_feature", field=fld, linear={}, boost=sc)) ``` **设计思路**: - 使用 `rank_feature` 查询提升特定特征的权重 - 对非 pagerank 字段添加 `tag.` 前缀,符合数据模型的命名规范 - `should` 查询不会排除结果,只会调整相关性分数 ### 高亮与排序处理 (120-137行) ```python for field in highlight_fields: s = s.highlight(field) if order_by: orders = list() for field, order in order_by.fields: order = "asc" if order == 0 else "desc" if field in ["page_num_int", "top_int"]: order_info = {"order": order, "unmapped_type": "float", "mode": "avg", "numeric_type": "double"} elif field.endswith("_int") or field.endswith("_flt"): order_info = {"order": order, "unmapped_type": "float"} else: order_info = {"order": order, "unmapped_type": "text"} orders.append({field: order_info}) s = s.sort(*orders) ``` **设计思路**: - 分离高亮配置和排序配置的关注点 - 根据字段类型设置不同的 `unmapped_type`,避免排序时因字段不存在而失败 - 数值类型字段特殊处理,支持正确的数值排序 ### 聚合处理 (138-140行) ```python if agg_fields: for fld in agg_fields: s.aggs.bucket(f'aggs_{fld}', 'terms', field=fld, size=1000000) ``` **设计思路**: - 使用 `terms` 聚合统计字段值的分布 - `size=1000000` 设置较大的聚合桶数量,确保结果完整性 ### 分页处理 (142-143行) ```python if limit > 0: s = s[offset:offset + limit] ``` **设计思路**: - Elasticsearch DSL 的切片语法简洁直观 - 限制只有 limit > 0 时才应用分页,支持返回所有记录的场景 ### 重试机制与超时处理 (147-169行) ```python for i in range(ATTEMPT_TIME): try: res = self.es.search(index=index_names, body=q, timeout="600s", track_total_hits=True, _source=True) if str(res.get("timed_out", "")).lower() == "true": raise Exception("Es Timeout.") self.logger.debug(f"ESConnection.search {str(index_names)} res: " + str(res)) return res except ConnectionTimeout: self.logger.exception("ES request timeout") self._connect() continue except Exception as e: self.logger.exception(f"ESConnection.search {str(index_names)} query: " + str(q) + str(e)) raise e self.logger.error(f"ESConnection.search timeout for {ATTEMPT_TIME} times!") raise Exception("ESConnection.search timeout.") ``` **设计思路**: - 设置 600 秒超时时间,适应复杂查询场景 - `track_total_hits=True` 确保返回准确的命中总数 - ConnectionTimeout 异常时重连后重试,最大化容错能力 - 非超时异常直接抛出,便于上层处理 - 所有重试失败后抛出明确的超时异常 --- ## 插入功能详解 (171-207行) ### 方法签名 ```python def insert(self, documents: list[dict], index_name: str, knowledgebase_id: str = None) -> list[str]: ``` **设计思路**: - `documents` 参数接收字典列表,支持批量插入 - `knowledgebase_id` 参数将知识库 ID 注入每条文档,实现数据隔离 ### 批量操作构建 (173-182行) ```python operations = [] for d in documents: assert "_id" not in d assert "id" in d d_copy = copy.deepcopy(d) d_copy["kb_id"] = knowledgebase_id meta_id = d_copy.pop("id", "") operations.append( {"index": {"_index": index_name, "_id": meta_id}}) operations.append(d_copy) ``` **设计思路**: - 使用 Elasticsearch 的批量 API(Bulk API)提高插入效率 - 禁止使用 `_id` 作为字段名,防止与 ES 内部 ID 冲突 - 要求每条文档必须有 `id` 字段,作为 ES 文档 ID - 使用深拷贝避免修改原始数据 - 将 `id` 字段重命名为 ES 的 `_id`,并添加 `kb_id` 字段 ### 批量执行与错误处理 (184-207行) ```python for _ in range(ATTEMPT_TIME): try: r = self.es.bulk(index=index_name, operations=operations, refresh=False, timeout="60s") if re.search(r"False", str(r["errors"]), re.IGNORECASE): return res for item in r["items"]: for action in ["create", "delete", "index", "update"]: if action in item and "error" in item[action]: res.append(str(item[action]["_id"]) + ":" + str(item[action]["error"])) return res except ConnectionTimeout: self.logger.exception("ES request timeout") time.sleep(3) self._connect() continue except Exception as e: res.append(str(e)) self.logger.warning("ESConnection.insert got exception: " + str(e)) return res ``` **设计思路**: - `refresh=False` 提高写入性能,不要求立即刷新到磁盘 - 返回值为错误列表,空列表表示全部成功 - 遍历批量响应中的每个操作,收集错误信息 - 超时后等待 3 秒再重试,给系统恢复时间 --- ## 更新功能详解 (209-301行) ### 方法签名 ```python def update(self, condition: dict, new_value: dict, index_name: str, knowledgebase_id: str) -> bool: ``` **设计思路**: - 支持两种更新场景:指定 ID 的单文档更新和条件匹配的多文档更新 - 返回布尔值表示更新是否成功 ### 单文档更新 (213-233行) ```python if "id" in condition and isinstance(condition["id"], str): chunk_id = condition["id"] for i in range(ATTEMPT_TIME): for k in doc.keys(): if "feas" != k.split("_")[-1]: continue try: self.es.update(index=index_name, id=chunk_id, script=f"ctx._source.remove(\"{k}\");") except Exception: self.logger.exception(...) try: self.es.update(index=index_name, id=chunk_id, doc=doc) return True except Exception as e: self.logger.exception(...) break return False ``` **设计思路**: - 对于以 `feas` 结尾的字段,使用脚本执行删除操作 - 这种设计支持特殊字段的清理逻辑 - 主更新操作使用简单的文档更新方式 ### 多文档更新条件构建 (236-249行) ```python bool_query = Q("bool") for k, v in condition.items(): if not isinstance(k, str) or not v: continue if k == "exists": bool_query.filter.append(Q("exists", field=v)) continue if isinstance(v, list): bool_query.filter.append(Q("terms", **{k: v})) elif isinstance(v, str) or isinstance(v, int): bool_query.filter.append(Q("term", **{k: v})) else: raise Exception(...) ``` **设计思路**: - 使用 bool 查询构建复杂条件 - 支持 `exists` 查询检查字段是否存在 - 统一的条件处理逻辑,支持多种值类型 ### 脚本生成器 (250-280行) ```python scripts = [] params = {} for k, v in new_value.items(): if k == "remove": if isinstance(v, str): scripts.append(f"ctx._source.remove('{v}');") if isinstance(v, dict): for kk, vv in v.items(): scripts.append(f"int i=ctx._source.{kk}.indexOf(params.p_{kk});ctx._source.{kk}.remove(i);") params[f"p_{kk}"] = vv continue if k == "add": if isinstance(v, dict): for kk, vv in v.items(): scripts.append(f"ctx._source.{kk}.add(params.pp_{kk});") params[f"pp_{kk}"] = vv.strip() continue if (not isinstance(k, str) or not v) and k != "available_int": continue if isinstance(v, str): v = re.sub(r"(['\n\r]|\\.)", " ", v) params[f"pp_{k}"] = v scripts.append(f"ctx._source.{k}=params.pp_{k};") elif isinstance(v, int) or isinstance(v, float): scripts.append(f"ctx._source.{k}={v};") elif isinstance(v, list): scripts.append(f"ctx._source.{k}=params.pp_{k};") params[f"pp_{k}"] = json.dumps(v, ensure_ascii=False) else: raise Exception(...) ``` **设计思路**: - 使用 Elasticsearch 的 Painless 脚本语言实现复杂更新逻辑 - `remove` 操作支持单个字段删除和数组元素删除 - `add` 操作支持向数组添加元素 - 字符串值经过转义处理,防止脚本注入 - 数组值使用 JSON 序列化后传入脚本 - 使用参数化方式传递值,避免字符串拼接风险 ### UpdateByQuery 执行 (281-301行) ```python ubq = UpdateByQuery( index=index_name).using( self.es).query(bool_query) ubq = ubq.script(source="".join(scripts), params=params) ubq = ubq.params(refresh=True) ubq = ubq.params(slices=5) ubq = ubq.params(conflicts="proceed") for _ in range(ATTEMPT_TIME): try: _ = ubq.execute() return True except ConnectionTimeout: self.logger.exception("ES request timeout") time.sleep(3) self._connect() continue except Exception as e: self.logger.error("ESConnection.update got exception: " + str(e) + "\n".join(scripts)) break return False ``` **设计思路**: - `UpdateByQuery` 用于批量更新符合条件的文档 - `refresh=True` 确保更新后立即对搜索可见 - `slices=5` 启用并行处理,提高大数据量更新的性能 - `conflicts="proceed"` 允许在文档被其他操作修改时继续执行 --- ## 删除功能详解 (303-349行) ### 方法签名 ```python def delete(self, condition: dict, index_name: str, knowledgebase_id: str) -> int: ``` **设计思路**: - 返回删除的文档数量,便于调用者了解操作结果 - 条件删除场景支持灵活的数据清理操作 ### 条件处理 (304-331行) ```python assert "_id" not in condition condition["kb_id"] = knowledgebase_id if "id" in condition: chunk_ids = condition["id"] if not isinstance(chunk_ids, list): chunk_ids = [chunk_ids] if not chunk_ids: qry = Q("match_all") else: qry = Q("ids", values=chunk_ids) else: qry = Q("bool") for k, v in condition.items(): if k == "exists": qry.filter.append(Q("exists", field=v)) elif k == "must_not": if isinstance(v, dict): for kk, vv in v.items(): if kk == "exists": qry.must_not.append(Q("exists", field=vv)) elif isinstance(v, list): qry.must.append(Q("terms", **{k: v})) elif isinstance(v, str) or isinstance(v, int): qry.must.append(Q("term", **{k: v})) else: raise Exception("Condition value must be int, str or list.") ``` **设计思路**: - `id` 参数支持单个 ID 或 ID 列表,灵活处理删除范围 - 空 ID 列表时使用 `match_all` 删除全部文档 - 使用 `ids` 查询高效删除指定文档 - 支持 `exists` 和 `must_not` 高级条件 ### 删除执行 (333-349行) ```python self.logger.debug("ESConnection.delete query: " + json.dumps(qry.to_dict())) for _ in range(ATTEMPT_TIME): try: res = self.es.delete_by_query( index=index_name, body=Search().query(qry).to_dict(), refresh=True) return res["deleted"] except ConnectionTimeout: self.logger.exception("ES request timeout") time.sleep(3) self._connect() continue except Exception as e: self.logger.warning("ESConnection.delete got exception: " + str(e)) if re.search(r"(not_found)", str(e), re.IGNORECASE): return 0 return 0 ``` **设计思路**: - 使用 `delete_by_query` API 批量删除文档 - `refresh=True` 确保删除操作立即生效 - 返回 `deleted` 字段表示实际删除的文档数量 - 忽略 "not_found" 异常,因为删除不存在的文档是预期行为 --- ## 辅助功能详解 (355-375行) ### get_fields 方法 ```python def get_fields(self, res, fields: list[str]) -> dict[str, dict]: res_fields = {} if not fields: return {} for d in self._get_source(res): m = {n: d.get(n) for n in fields if d.get(n) is not None} for n, v in m.items(): if isinstance(v, list): m[n] = v continue if n == "available_int" and isinstance(v, (int, float)): m[n] = v continue if not isinstance(v, str): m[n] = str(v) # if n.find("tks") > 0: # m[n] = remove_redundant_spaces(m[n]) if m: res_fields[d["id"]] = m return res_fields ``` **设计思路**: - 将搜索结果转换为以文档 ID 为键的字典结构,便于后续处理 - 只提取指定的字段,过滤掉不需要的数据 - 保留列表类型和数值类型的字段值 - 将其他类型转换为字符串,保证输出的一致性 - 使用 `_get_source` 方法提取搜索结果的源文档(由基类提供) --- ## 设计模式总结 ### 1. 单例模式 使用 `@singleton` 装饰器确保全局唯一的连接实例,避免资源浪费。 ### 2. 策略模式 通过 `match_expressions` 参数支持多种匹配策略(文本、向量、融合)的组合。 ### 3. 模板方法模式 继承自 `ESConnectionBase`,遵循基类定义的接口规范。 ### 4. 责任链模式 重试机制将处理责任依次传递给自身,实现优雅降级。 ### 5. 建造者模式 使用 Elasticsearch DSL 构建复杂的查询对象。 --- ## 性能优化策略 ### 1. 批量操作 `insert` 方法使用 Bulk API 批量插入,减少网络往返开销。 ### 2. 并行处理 `UpdateByQuery` 的 `slices=5` 参数启用多线程并行处理。 ### 3. 延迟刷新 `refresh=False` 避免每次写入后立即刷新,提高写入性能。 ### 4. 超时控制 600 秒超时适应复杂查询,避免无限期等待。 ### 5. 条件跳过 空值条件自动跳过,减少不必要的查询开销。 --- ## 错误处理机制 ### 1. 连接超时 检测到 `ConnectionTimeout` 异常后重连并重试。 ### 2. 批量操作错误 收集批量操作中每个文档的错误信息。 ### 3. 参数验证 使用断言确保关键参数的有效性。 ### 4. 日志记录 所有异常和关键操作都有日志记录,便于问题排查。 --- ## 扩展性考虑 ### 1. 表达式系统 `MatchExpr` 系列类支持扩展新的匹配类型。 ### 2. 查询构建器 继承 `ESConnectionBase` 可实现其他数据库的连接器。 ### 3. 配置外部化 超时、重试次数等参数可配置,适应不同环境需求。