# Infinity Python API客户端(带连接池) 这是一个基于Infinity数据库的Python API客户端,带有连接池机制,旨在保障高并发下的高可用性。 ## 设计特点 1. **并发连接管理**:支持多线程并发访问 2. **连接自动回收和复用**:避免频繁创建和关闭连接 3. **连接超时和心跳检测**:自动检测和清理无效连接 4. **动态调整连接数量**:根据负载自动调整连接数 5. **线程安全**:所有操作都是线程安全的 6. **易用的API**:提供简洁易用的数据库操作接口 ## 目录结构 ``` └── utils/infinity/ ├── __init__.py # 主入口,导入并重新导出所有组件 ├── client.py # 客户端实现,包含数据库操作方法 ├── pool.py # 连接池实现,包含连接管理和心跳检测 ├── README.md # 说明文档 └── test_infinity.py # 测试脚本 ``` ## 安装依赖 ```bash # 安装Infinity Python SDK pip install infinity-sdk ``` ## 快速开始 ### 1. 基本使用 ```python from utils.infinity import InfinityClient # 创建客户端实例 client = InfinityClient( host="192.168.16.134", port="23817", database="default_db", min_connections=2, max_connections=10 ) # 获取所有数据库 databases = client.get_databases() print(f"Databases: {databases}") # 获取指定数据库的所有表 tables = client.get_tables(database_name="image_db") print(f"Tables: {tables}") # 关闭客户端 client.close() ``` ### 2. 使用全局客户端(单例模式) ```python from utils.infinity import get_client, close_client # 获取全局客户端实例 client = get_client() # 使用客户端 databases = client.get_databases() print(f"Databases: {databases}") # 关闭全局客户端 close_client() ``` ### 3. 使用连接上下文 连接上下文管理器是一种更安全、更高效的使用连接方式,它会确保连接在使用完毕后自动释放回连接池,避免连接泄漏。 #### 完整示例 ```python from utils.infinity import InfinityClient def main(): # 创建客户端实例 client = InfinityClient(database="test_db") try: # 使用连接上下文获取连接 with client.get_connection() as conn: print("=== 连接上下文示例 ===") # 1. 获取所有数据库 databases = conn.get_databases() print(f"1. 所有数据库: {databases}") # 2. 获取当前数据库的所有表 tables = conn.get_tables() print(f"2. 当前数据库表: {tables}") # 3. 创建新表(如果不存在) table_name = "example_table" print(f"3. 创建表: {table_name}") # 定义表结构 fields = [ {"name": "id", "type": "INT", "is_primary_key": True}, {"name": "name", "type": "VARCHAR(100)"}, {"name": "value", "type": "FLOAT"} ] # 先检查表是否存在,如果存在则删除 if table_name in tables: conn.drop_table(table_name) print(f" - 表 {table_name} 已存在,已删除") # 创建新表 conn.create_table(table_name, fields) print(f" - 表 {table_name} 创建成功") # 4. 插入数据 print(f"4. 向表 {table_name} 插入数据") # 准备插入的数据 documents = [ {"id": 1, "name": "示例1", "value": 10.5}, {"id": 2, "name": "示例2", "value": 20.3}, {"id": 3, "name": "示例3", "value": 15.7} ] # 执行插入操作 conn.insert(table_name, documents) print(f" - 成功插入 {len(documents)} 条数据") # 5. 执行搜索查询 print(f"5. 查询表 {table_name} 中的数据") # 构建查询 search_query = { "field": "name", "query": "示例", "topn": 2 } # 执行查询 result = conn.search(table_name, ["id", "name", "value"], search_query) print(f" - 查询结果: {result}") # 6. 执行向量检索(示例,实际需要向量字段) print(f"6. 执行向量检索(示例)") try: # 注意:此示例仅用于演示API用法,实际使用需要表中存在向量字段 vector_query = { "vector_field": "vector", # 假设存在向量字段 "query_vector": [0.1, 0.2, 0.3], "topn": 2 } vector_result = conn.vector_search(table_name, ["id", "name", "value"], vector_query) print(f" - 向量检索结果: {vector_result}") except Exception as e: print(f" - 向量检索示例失败(预期行为,因为表中没有向量字段): {e}") # 7. 再次查看所有表 updated_tables = conn.get_tables() print(f"7. 更新后的表列表: {updated_tables}") print("\n✅ 所有操作执行完成") # 8. 连接已自动释放回连接池 print("\n✅ 连接已自动释放回连接池") except Exception as e: print(f"\n❌ 操作失败: {e}") finally: # 关闭客户端 client.close() print("✅ 客户端已关闭") if __name__ == "__main__": main() ``` #### 多操作上下文示例 ```python from utils.infinity import InfinityClient # 创建客户端实例 client = InfinityClient(database="test_db") # 示例:在同一个连接上下文中执行多个相关操作 try: with client.get_connection() as conn: # 操作1:创建表 conn.create_table("temp_table", [ {"name": "id", "type": "INT", "is_primary_key": True}, {"name": "data", "type": "VARCHAR(255)"} ]) # 操作2:插入数据 conn.insert("temp_table", [{"id": 1, "data": "test1"}, {"id": 2, "data": "test2"}]) # 操作3:查询数据 search_query = { "field": "data", "query": "test", "topn": 2 } result = conn.search("temp_table", ["id", "data"], search_query) print(f"查询结果: {result}") # 操作4:删除表 conn.drop_table("temp_table") print("✅ 所有相关操作在同一个连接上下文中完成") except Exception as e: print(f"❌ 操作失败: {e}") finally: client.close() ``` #### 异常处理上下文示例 ```python from utils.infinity import InfinityClient # 创建客户端实例 client = InfinityClient(database="test_db") try: with client.get_connection() as conn: # 执行可能失败的操作 conn.create_table("error_table", [ {"name": "id", "type": "INT", "is_primary_key": True}, {"name": "invalid_field", "type": "INVALID_TYPE"} # 无效字段类型 ]) # 注意:如果上面的操作失败,代码不会执行到这里 print("✅ 操作成功") except Exception as e: # 捕获并处理异常 print(f"❌ 操作失败,已捕获异常: {e}") finally: # 无论操作是否成功,客户端都会被关闭 client.close() print("✅ 客户端已关闭") ``` ### 4. 搜索示例 ```python from utils.infinity import InfinityClient client = InfinityClient(database="image_db") # 执行搜索查询 result = client.search( table_name="pdf_documents_table", output_fields=["id", "title", "content"], query={ "field": "content", "query": "儿童绘本", "topn": 5 } ) print(f"Search result: {result}") client.close() ``` ### 5. 混合检索示例 ```python from utils.infinity import InfinityClient client = InfinityClient(database="image_db") # 执行混合检索 result = client.hybrid_search( table_name="pdf_documents_table", output_fields=["id", "title", "content", "score"], query={ "vector_field": "dense_vector_1024", "query_vector": [0.1, 0.2, 0.3, ...], # 实际向量 "field": "content", "query": "儿童绘本", "topn": 5, "fusion_weight": 0.5 } ) print(f"Hybrid search result: {result}") client.close() ``` ### 6. 向量检索示例 ```python from utils.infinity import InfinityClient client = InfinityClient(database="image_db") # 执行向量检索 result = client.vector_search( table_name="pdf_documents_table", output_fields=["id", "title", "content", "score"], query={ "vector_field": "dense_vector_1024", "query_vector": [0.1, 0.2, 0.3, ...], # 实际向量 "topn": 5 } ) print(f"Vector search result: {result}") client.close() ``` ## 核心功能 ### 数据库操作 - `get_databases()`: 获取所有数据库 - `create_database(database_name)`: 创建数据库 - `drop_database(database_name)`: 删除数据库 - `use_database(database_name)`: 切换数据库 ### 表操作 - `get_tables(database_name=None)`: 获取所有表 - `create_table(table_name, fields, database_name=None)`: 创建表 - `drop_table(table_name, database_name=None)`: 删除表 ### 文档操作 - `insert(table_name, documents, database_name=None)`: 插入文档 ### 检索操作 - `search(table_name, output_fields, query, database_name=None)`: 搜索文档 - `table_name`: 表名 - `output_fields`: 要返回的字段列表 - `query`: 查询条件,包含`field`、`query`和`topn`字段 - `database_name`: 数据库名称(可选,默认使用客户端配置的数据库) - `hybrid_search(table_name, output_fields, query, database_name=None)`: 混合检索 - `table_name`: 表名 - `output_fields`: 要返回的字段列表 - `query`: 查询条件,包含`vector_field`、`query_vector`、`field`、`query`、`topn`和`fusion_weight`字段 - `database_name`: 数据库名称(可选,默认使用客户端配置的数据库) - `vector_search(table_name, output_fields, query, database_name=None)`: 向量检索 - `table_name`: 表名 - `output_fields`: 要返回的字段列表 - `query`: 查询条件,包含`vector_field`、`query_vector`和`topn`字段 - `database_name`: 数据库名称(可选,默认使用客户端配置的数据库) ### 连接池管理 - `get_status()`: 获取连接池状态 - `close()`: 关闭所有连接 ## 配置参数 ### InfinityClient 配置 | 参数 | 类型 | 默认值 | 说明 | |------|------|--------|------| | host | str | "192.168.16.134" | Infinity服务地址 | | port | str | "23817" | Infinity服务端口 | | database | str | "default_db" | 数据库名称 | | min_connections | int | 2 | 最小连接数 | | max_connections | int | 10 | 最大连接数 | ### InfinityConnectionPool 配置 | 参数 | 类型 | 默认值 | 说明 | |------|------|--------|------| | host | str | "192.168.16.134" | Infinity服务地址 | | port | str | "23817" | Infinity服务端口 | | database | str | "default_db" | 数据库名称 | | min_connections | int | 2 | 最小连接数 | | max_connections | int | 10 | 最大连接数 | | connection_timeout | int | 30 | 连接超时时间(秒) | | idle_timeout | int | 300 | 空闲连接超时时间(秒) | | heartbeat_interval | int | 60 | 心跳检测间隔(秒) | ## 连接池状态 可以通过 `get_status()` 方法获取连接池的状态信息: ```python from utils.infinity import InfinityClient client = InfinityClient() status = client.get_status() print(f"Connection pool status: {status}") ``` 输出示例: ``` Connection pool status: { "total_connections": 5, "available_connections": 3, "in_use_connections": 2, "min_connections": 2, "max_connections": 10 } ``` ## 高并发使用建议 1. **调整连接池大小**:根据实际并发量调整 `min_connections` 和 `max_connections` 2. **使用全局客户端**:在多线程环境中使用全局客户端实例 3. **合理设置超时时间**:根据网络环境和数据库性能调整超时参数 4. **使用连接上下文**:使用 `with client.get_connection()` 确保连接正确释放 5. **定期检查连接池状态**:监控连接池状态,及时调整配置 ## 异常处理 ```python from utils.infinity import InfinityClient try: client = InfinityClient() databases = client.get_databases() print(f"Databases: {databases}") except Exception as e: print(f"Error: {e}") finally: client.close() ``` ## 最佳实践 1. **在应用启动时初始化客户端**:避免频繁创建和销毁客户端 2. **在应用关闭时关闭客户端**:释放所有连接资源 3. **使用连接上下文管理连接**:确保连接正确释放 4. **监控连接池状态**:及时发现和解决连接问题 5. **根据负载调整连接池大小**:避免连接过多或不足 ## 与现有代码集成 可以将此客户端与现有代码无缝集成,替换直接使用Infinity SDK的地方: ### 原有代码 ```python import infinity from infinity.common import NetworkAddress # 直接创建连接 conn = infinity.connect(NetworkAddress("192.168.16.134", "23817")) conn.use_database("image_db") # 使用连接 result = conn.search("my_table", query) ``` ### 集成后 ```python from utils.infinity import InfinityClient # 使用连接池客户端 client = InfinityClient(database="image_db") result = client.search("my_table", query) ``` ## 性能优化 1. **减少连接创建开销**:连接池自动管理连接,避免频繁创建和关闭连接 2. **连接复用**:连接可以被多次复用,提高性能 3. **异步连接管理**:连接池异步管理连接,不会阻塞主线程 4. **心跳检测**:自动清理无效连接,保持连接池健康 ## 测试 可以使用以下方法测试连接池的性能: ```python from utils.infinity import InfinityClient import threading import time def test_query(client, thread_id): """测试查询性能""" start_time = time.time() databases = client.get_databases() end_time = time.time() print(f"Thread {thread_id}: Query took {end_time - start_time:.4f} seconds") # 创建客户端 client = InfinityClient(max_connections=20) # 测试并发查询 threads = [] start_time = time.time() for i in range(50): thread = threading.Thread(target=test_query, args=(client, i)) threads.append(thread) thread.start() # 等待所有线程完成 for thread in threads: thread.join() end_time = time.time() print(f"Total time for 50 concurrent queries: {end_time - start_time:.4f} seconds") print(f"Connection pool status: {client.get_status()}") # 关闭客户端 client.close() ``` ## 版本历史 - v1.0.0: 初始版本,实现了基本的连接池功能和数据库操作接口 ## 许可证 MIT License