yingge d7279aa57c graph_rag_server首次提交 3 tháng trước cách đây
..
README.md d7279aa57c graph_rag_server首次提交 3 tháng trước cách đây
__init__.py d7279aa57c graph_rag_server首次提交 3 tháng trước cách đây
client.py d7279aa57c graph_rag_server首次提交 3 tháng trước cách đây
pool.py d7279aa57c graph_rag_server首次提交 3 tháng trước cách đây
test_infinity.py d7279aa57c graph_rag_server首次提交 3 tháng trước cách đây

README.md

Infinity Python API客户端(带连接池)

这是一个基于Infinity数据库的Python API客户端,带有连接池机制,旨在保障高并发下的高可用性。

设计特点

  1. 并发连接管理:支持多线程并发访问
  2. 连接自动回收和复用:避免频繁创建和关闭连接
  3. 连接超时和心跳检测:自动检测和清理无效连接
  4. 动态调整连接数量:根据负载自动调整连接数
  5. 线程安全:所有操作都是线程安全的
  6. 易用的API:提供简洁易用的数据库操作接口

目录结构

└── utils/infinity/
    ├── __init__.py    # 主入口,导入并重新导出所有组件
    ├── client.py      # 客户端实现,包含数据库操作方法
    ├── pool.py        # 连接池实现,包含连接管理和心跳检测
    ├── README.md      # 说明文档
    └── test_infinity.py  # 测试脚本

安装依赖

# 安装Infinity Python SDK
pip install infinity-sdk

快速开始

1. 基本使用

from utils.infinity import InfinityClient

# 创建客户端实例
client = InfinityClient(
    host="192.168.16.134",
    port="23817",
    database="default_db",
    min_connections=2,
    max_connections=10
)

# 获取所有数据库
databases = client.get_databases()
print(f"Databases: {databases}")

# 获取指定数据库的所有表
tables = client.get_tables(database_name="image_db")
print(f"Tables: {tables}")

# 关闭客户端
client.close()

2. 使用全局客户端(单例模式)

from utils.infinity import get_client, close_client

# 获取全局客户端实例
client = get_client()

# 使用客户端
databases = client.get_databases()
print(f"Databases: {databases}")

# 关闭全局客户端
close_client()

3. 使用连接上下文

连接上下文管理器是一种更安全、更高效的使用连接方式,它会确保连接在使用完毕后自动释放回连接池,避免连接泄漏。

完整示例

from utils.infinity import InfinityClient

def main():
    # 创建客户端实例
    client = InfinityClient(database="test_db")
    
    try:
        # 使用连接上下文获取连接
        with client.get_connection() as conn:
            print("=== 连接上下文示例 ===")
            
            # 1. 获取所有数据库
            databases = conn.get_databases()
            print(f"1. 所有数据库: {databases}")
            
            # 2. 获取当前数据库的所有表
            tables = conn.get_tables()
            print(f"2. 当前数据库表: {tables}")
            
            # 3. 创建新表(如果不存在)
            table_name = "example_table"
            print(f"3. 创建表: {table_name}")
            
            # 定义表结构
            fields = [
                {"name": "id", "type": "INT", "is_primary_key": True},
                {"name": "name", "type": "VARCHAR(100)"},
                {"name": "value", "type": "FLOAT"}
            ]
            
            # 先检查表是否存在,如果存在则删除
            if table_name in tables:
                conn.drop_table(table_name)
                print(f"   - 表 {table_name} 已存在,已删除")
            
            # 创建新表
            conn.create_table(table_name, fields)
            print(f"   - 表 {table_name} 创建成功")
            
            # 4. 插入数据
            print(f"4. 向表 {table_name} 插入数据")
            
            # 准备插入的数据
            documents = [
                {"id": 1, "name": "示例1", "value": 10.5},
                {"id": 2, "name": "示例2", "value": 20.3},
                {"id": 3, "name": "示例3", "value": 15.7}
            ]
            
            # 执行插入操作
            conn.insert(table_name, documents)
            print(f"   - 成功插入 {len(documents)} 条数据")
            
            # 5. 执行搜索查询
            print(f"5. 查询表 {table_name} 中的数据")
            
            # 构建查询
            search_query = {
                "field": "name",
                "query": "示例",
                "topn": 2
            }
            
            # 执行查询
            result = conn.search(table_name, ["id", "name", "value"], search_query)
            print(f"   - 查询结果: {result}")
            
            # 6. 执行向量检索(示例,实际需要向量字段)
            print(f"6. 执行向量检索(示例)")
            
            try:
                # 注意:此示例仅用于演示API用法,实际使用需要表中存在向量字段
                vector_query = {
                    "vector_field": "vector",  # 假设存在向量字段
                    "query_vector": [0.1, 0.2, 0.3],
                    "topn": 2
                }
                vector_result = conn.vector_search(table_name, ["id", "name", "value"], vector_query)
                print(f"   - 向量检索结果: {vector_result}")
            except Exception as e:
                print(f"   - 向量检索示例失败(预期行为,因为表中没有向量字段): {e}")
            
            # 7. 再次查看所有表
            updated_tables = conn.get_tables()
            print(f"7. 更新后的表列表: {updated_tables}")
            
            print("\n✅ 所有操作执行完成")
        
        # 8. 连接已自动释放回连接池
        print("\n✅ 连接已自动释放回连接池")
        
    except Exception as e:
        print(f"\n❌ 操作失败: {e}")
    finally:
        # 关闭客户端
        client.close()
        print("✅ 客户端已关闭")

if __name__ == "__main__":
    main()

多操作上下文示例

from utils.infinity import InfinityClient

# 创建客户端实例
client = InfinityClient(database="test_db")

# 示例:在同一个连接上下文中执行多个相关操作
try:
    with client.get_connection() as conn:
        # 操作1:创建表
        conn.create_table("temp_table", [
            {"name": "id", "type": "INT", "is_primary_key": True},
            {"name": "data", "type": "VARCHAR(255)"}
        ])
        
        # 操作2:插入数据
        conn.insert("temp_table", [{"id": 1, "data": "test1"}, {"id": 2, "data": "test2"}])
        
        # 操作3:查询数据
        search_query = {
            "field": "data",
            "query": "test",
            "topn": 2
        }
        result = conn.search("temp_table", ["id", "data"], search_query)
        print(f"查询结果: {result}")
        
        # 操作4:删除表
        conn.drop_table("temp_table")
        
        print("✅ 所有相关操作在同一个连接上下文中完成")
except Exception as e:
    print(f"❌ 操作失败: {e}")
finally:
    client.close()

异常处理上下文示例

from utils.infinity import InfinityClient

# 创建客户端实例
client = InfinityClient(database="test_db")

try:
    with client.get_connection() as conn:
        # 执行可能失败的操作
        conn.create_table("error_table", [
            {"name": "id", "type": "INT", "is_primary_key": True},
            {"name": "invalid_field", "type": "INVALID_TYPE"}  # 无效字段类型
        ])
        
    # 注意:如果上面的操作失败,代码不会执行到这里
    print("✅ 操作成功")
except Exception as e:
    # 捕获并处理异常
    print(f"❌ 操作失败,已捕获异常: {e}")
finally:
    # 无论操作是否成功,客户端都会被关闭
    client.close()
    print("✅ 客户端已关闭")

4. 搜索示例

from utils.infinity import InfinityClient

client = InfinityClient(database="image_db")

# 执行搜索查询
result = client.search(
    table_name="pdf_documents_table",
    output_fields=["id", "title", "content"],
    query={
        "field": "content",
        "query": "儿童绘本",
        "topn": 5
    }
)

print(f"Search result: {result}")

client.close()

5. 混合检索示例

from utils.infinity import InfinityClient

client = InfinityClient(database="image_db")

# 执行混合检索
result = client.hybrid_search(
    table_name="pdf_documents_table",
    output_fields=["id", "title", "content", "score"],
    query={
        "vector_field": "dense_vector_1024",
        "query_vector": [0.1, 0.2, 0.3, ...],  # 实际向量
        "field": "content",
        "query": "儿童绘本",
        "topn": 5,
        "fusion_weight": 0.5
    }
)

print(f"Hybrid search result: {result}")

client.close()

6. 向量检索示例

from utils.infinity import InfinityClient

client = InfinityClient(database="image_db")

# 执行向量检索
result = client.vector_search(
    table_name="pdf_documents_table",
    output_fields=["id", "title", "content", "score"],
    query={
        "vector_field": "dense_vector_1024",
        "query_vector": [0.1, 0.2, 0.3, ...],  # 实际向量
        "topn": 5
    }
)

print(f"Vector search result: {result}")

client.close()

核心功能

数据库操作

  • get_databases(): 获取所有数据库
  • create_database(database_name): 创建数据库
  • drop_database(database_name): 删除数据库
  • use_database(database_name): 切换数据库

表操作

  • get_tables(database_name=None): 获取所有表
  • create_table(table_name, fields, database_name=None): 创建表
  • drop_table(table_name, database_name=None): 删除表

文档操作

  • insert(table_name, documents, database_name=None): 插入文档

检索操作

  • search(table_name, output_fields, query, database_name=None): 搜索文档

    • table_name: 表名
    • output_fields: 要返回的字段列表
    • query: 查询条件,包含fieldquerytopn字段
    • database_name: 数据库名称(可选,默认使用客户端配置的数据库)
  • hybrid_search(table_name, output_fields, query, database_name=None): 混合检索

    • table_name: 表名
    • output_fields: 要返回的字段列表
    • query: 查询条件,包含vector_fieldquery_vectorfieldquerytopnfusion_weight字段
    • database_name: 数据库名称(可选,默认使用客户端配置的数据库)
  • vector_search(table_name, output_fields, query, database_name=None): 向量检索

    • table_name: 表名
    • output_fields: 要返回的字段列表
    • query: 查询条件,包含vector_fieldquery_vectortopn字段
    • database_name: 数据库名称(可选,默认使用客户端配置的数据库)

连接池管理

  • get_status(): 获取连接池状态
  • close(): 关闭所有连接

配置参数

InfinityClient 配置

参数 类型 默认值 说明
host str "192.168.16.134" Infinity服务地址
port str "23817" Infinity服务端口
database str "default_db" 数据库名称
min_connections int 2 最小连接数
max_connections int 10 最大连接数

InfinityConnectionPool 配置

参数 类型 默认值 说明
host str "192.168.16.134" Infinity服务地址
port str "23817" Infinity服务端口
database str "default_db" 数据库名称
min_connections int 2 最小连接数
max_connections int 10 最大连接数
connection_timeout int 30 连接超时时间(秒)
idle_timeout int 300 空闲连接超时时间(秒)
heartbeat_interval int 60 心跳检测间隔(秒)

连接池状态

可以通过 get_status() 方法获取连接池的状态信息:

from utils.infinity import InfinityClient

client = InfinityClient()
status = client.get_status()
print(f"Connection pool status: {status}")

输出示例:

Connection pool status: {
    "total_connections": 5,
    "available_connections": 3,
    "in_use_connections": 2,
    "min_connections": 2,
    "max_connections": 10
}

高并发使用建议

  1. 调整连接池大小:根据实际并发量调整 min_connectionsmax_connections
  2. 使用全局客户端:在多线程环境中使用全局客户端实例
  3. 合理设置超时时间:根据网络环境和数据库性能调整超时参数
  4. 使用连接上下文:使用 with client.get_connection() 确保连接正确释放
  5. 定期检查连接池状态:监控连接池状态,及时调整配置

异常处理

from utils.infinity import InfinityClient

try:
    client = InfinityClient()
    databases = client.get_databases()
    print(f"Databases: {databases}")
except Exception as e:
    print(f"Error: {e}")
finally:
    client.close()

最佳实践

  1. 在应用启动时初始化客户端:避免频繁创建和销毁客户端
  2. 在应用关闭时关闭客户端:释放所有连接资源
  3. 使用连接上下文管理连接:确保连接正确释放
  4. 监控连接池状态:及时发现和解决连接问题
  5. 根据负载调整连接池大小:避免连接过多或不足

与现有代码集成

可以将此客户端与现有代码无缝集成,替换直接使用Infinity SDK的地方:

原有代码

import infinity
from infinity.common import NetworkAddress

# 直接创建连接
conn = infinity.connect(NetworkAddress("192.168.16.134", "23817"))
conn.use_database("image_db")
# 使用连接
result = conn.search("my_table", query)

集成后

from utils.infinity import InfinityClient

# 使用连接池客户端
client = InfinityClient(database="image_db")
result = client.search("my_table", query)

性能优化

  1. 减少连接创建开销:连接池自动管理连接,避免频繁创建和关闭连接
  2. 连接复用:连接可以被多次复用,提高性能
  3. 异步连接管理:连接池异步管理连接,不会阻塞主线程
  4. 心跳检测:自动清理无效连接,保持连接池健康

测试

可以使用以下方法测试连接池的性能:

from utils.infinity import InfinityClient
import threading
import time

def test_query(client, thread_id):
    """测试查询性能"""
    start_time = time.time()
    databases = client.get_databases()
    end_time = time.time()
    print(f"Thread {thread_id}: Query took {end_time - start_time:.4f} seconds")

# 创建客户端
client = InfinityClient(max_connections=20)

# 测试并发查询
threads = []
start_time = time.time()

for i in range(50):
    thread = threading.Thread(target=test_query, args=(client, i))
    threads.append(thread)
    thread.start()

# 等待所有线程完成
for thread in threads:
    thread.join()

end_time = time.time()
print(f"Total time for 50 concurrent queries: {end_time - start_time:.4f} seconds")
print(f"Connection pool status: {client.get_status()}")

# 关闭客户端
client.close()

版本历史

  • v1.0.0: 初始版本,实现了基本的连接池功能和数据库操作接口

许可证

MIT License