test_es_conn.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. import json
  4. from services.utils.es_conn import ESConnection
  5. def test_es_connection():
  6. """
  7. 测试 Elasticsearch 连接和基本功能
  8. """
  9. try:
  10. # 初始化连接
  11. print("正在初始化 Elasticsearch 连接...")
  12. es = ESConnection(hosts=["http://localhost:9200"])
  13. print("连接成功!")
  14. # 测试索引创建
  15. index_name = "test_ragflow_index"
  16. print(f"\n正在创建索引: {index_name}")
  17. success = es.create_index(index_name)
  18. if success:
  19. print(f"索引 {index_name} 创建成功!")
  20. else:
  21. print(f"索引 {index_name} 创建失败!")
  22. return False
  23. # 测试文档插入
  24. test_doc = {
  25. "title": "测试文档",
  26. "content": "这是一个用于测试 Elasticsearch 连接的文档",
  27. "content_tks": "这 是 一个 用于 测试 Elasticsearch 连接 的 文档",
  28. "vector_768_vec": [0.1] * 768,
  29. "created_at": "2024-01-01 00:00:00",
  30. "count_int": 10,
  31. "importance_flt": 0.8,
  32. "tags_kwd": ["测试", "elasticsearch"],
  33. "kb_id": "test_kb_123"
  34. }
  35. print("\n正在插入测试文档...")
  36. insert_success = es.insert(index_name, test_doc)
  37. if insert_success:
  38. print("文档插入成功!")
  39. else:
  40. print("文档插入失败!")
  41. return False
  42. # 测试批量插入
  43. test_docs = []
  44. for i in range(3):
  45. doc = {
  46. "title": f"批量测试文档 {i}",
  47. "content": f"这是第 {i} 个批量测试文档",
  48. "content_tks": f"这是 第 {i} 个 批量 测试 文档",
  49. "vector_768_vec": [0.1] * 768,
  50. "created_at": "2024-01-01 00:00:00",
  51. "count_int": i,
  52. "importance_flt": 0.5 + i * 0.1,
  53. "tags_kwd": ["批量", "测试"],
  54. "kb_id": "test_kb_123"
  55. }
  56. test_docs.append(doc)
  57. print("\n正在批量插入测试文档...")
  58. bulk_result = es.bulk_insert(index_name, test_docs)
  59. print(f"批量插入结果: {bulk_result}")
  60. # 测试全文检索
  61. print("\n正在测试全文检索...")
  62. text_query = {
  63. "match": {
  64. "content": "测试"
  65. }
  66. }
  67. text_result = es.search(index_name, text_query, size=5)
  68. print(f"全文检索结果: {text_result['hits']['total']} 个命中")
  69. # 测试向量检索
  70. print("\n正在测试向量检索...")
  71. vector = [0.1] * 768
  72. vector_result = es.knn_search(
  73. index_name=index_name,
  74. vector_field="vector_768_vec",
  75. vector=vector,
  76. k=3
  77. )
  78. print(f"向量检索结果: {vector_result['hits']['total']} 个命中")
  79. # 测试混合检索
  80. print("\n正在测试混合检索...")
  81. hybrid_result = es.hybrid_search(
  82. index_name=index_name,
  83. text_query="测试",
  84. vector_field="vector_768_vec",
  85. vector=vector,
  86. size=5
  87. )
  88. print(f"混合检索结果: {hybrid_result['hits']['total']} 个命中")
  89. # 打印命中的文档
  90. print("\n混合检索命中的文档:")
  91. for hit in hybrid_result['hits']['hits']:
  92. doc = hit['_source']
  93. print(f" - 标题: {doc['title']}, 相似度分数: {hit['_score']:.4f}")
  94. # 测试文档删除
  95. print(f"\n正在删除索引: {index_name}")
  96. es.es.indices.delete(index=index_name, ignore=[400, 404])
  97. print(f"索引 {index_name} 删除成功!")
  98. # 关闭连接
  99. es.close()
  100. print("\n所有测试完成!")
  101. return True
  102. except Exception as e:
  103. print(f"测试失败: {e}")
  104. return False
  105. if __name__ == "__main__":
  106. test_es_connection()