|
|
3 mesiacov pred | |
|---|---|---|
| .. | ||
| __pycache__ | 3 mesiacov pred | |
| README.md | 3 mesiacov pred | |
| __init__.py | 3 mesiacov pred | |
| client.py | 3 mesiacov pred | |
| pool.py | 3 mesiacov pred | |
| test_infinity.py | 3 mesiacov pred | |
这是一个基于Infinity数据库的Python API客户端,带有连接池机制,旨在保障高并发下的高可用性。
└── utils/infinity/
├── __init__.py # 主入口,导入并重新导出所有组件
├── client.py # 客户端实现,包含数据库操作方法
├── pool.py # 连接池实现,包含连接管理和心跳检测
├── README.md # 说明文档
└── test_infinity.py # 测试脚本
# 安装Infinity Python SDK
pip install infinity-sdk
from utils.infinity import InfinityClient
# 创建客户端实例
client = InfinityClient(
host="192.168.16.134",
port="23817",
database="default_db",
min_connections=2,
max_connections=10
)
# 获取所有数据库
databases = client.get_databases()
print(f"Databases: {databases}")
# 获取指定数据库的所有表
tables = client.get_tables(database_name="image_db")
print(f"Tables: {tables}")
# 关闭客户端
client.close()
from utils.infinity import get_client, close_client
# 获取全局客户端实例
client = get_client()
# 使用客户端
databases = client.get_databases()
print(f"Databases: {databases}")
# 关闭全局客户端
close_client()
连接上下文管理器是一种更安全、更高效的使用连接方式,它会确保连接在使用完毕后自动释放回连接池,避免连接泄漏。
from utils.infinity import InfinityClient
def main():
# 创建客户端实例
client = InfinityClient(database="test_db")
try:
# 使用连接上下文获取连接
with client.get_connection() as conn:
print("=== 连接上下文示例 ===")
# 1. 获取所有数据库
databases = conn.get_databases()
print(f"1. 所有数据库: {databases}")
# 2. 获取当前数据库的所有表
tables = conn.get_tables()
print(f"2. 当前数据库表: {tables}")
# 3. 创建新表(如果不存在)
table_name = "example_table"
print(f"3. 创建表: {table_name}")
# 定义表结构
fields = [
{"name": "id", "type": "INT", "is_primary_key": True},
{"name": "name", "type": "VARCHAR(100)"},
{"name": "value", "type": "FLOAT"}
]
# 先检查表是否存在,如果存在则删除
if table_name in tables:
conn.drop_table(table_name)
print(f" - 表 {table_name} 已存在,已删除")
# 创建新表
conn.create_table(table_name, fields)
print(f" - 表 {table_name} 创建成功")
# 4. 插入数据
print(f"4. 向表 {table_name} 插入数据")
# 准备插入的数据
documents = [
{"id": 1, "name": "示例1", "value": 10.5},
{"id": 2, "name": "示例2", "value": 20.3},
{"id": 3, "name": "示例3", "value": 15.7}
]
# 执行插入操作
conn.insert(table_name, documents)
print(f" - 成功插入 {len(documents)} 条数据")
# 5. 执行搜索查询
print(f"5. 查询表 {table_name} 中的数据")
# 构建查询
search_query = {
"field": "name",
"query": "示例",
"topn": 2
}
# 执行查询
result = conn.search(table_name, ["id", "name", "value"], search_query)
print(f" - 查询结果: {result}")
# 6. 执行向量检索(示例,实际需要向量字段)
print(f"6. 执行向量检索(示例)")
try:
# 注意:此示例仅用于演示API用法,实际使用需要表中存在向量字段
vector_query = {
"vector_field": "vector", # 假设存在向量字段
"query_vector": [0.1, 0.2, 0.3],
"topn": 2
}
vector_result = conn.vector_search(table_name, ["id", "name", "value"], vector_query)
print(f" - 向量检索结果: {vector_result}")
except Exception as e:
print(f" - 向量检索示例失败(预期行为,因为表中没有向量字段): {e}")
# 7. 再次查看所有表
updated_tables = conn.get_tables()
print(f"7. 更新后的表列表: {updated_tables}")
print("\n✅ 所有操作执行完成")
# 8. 连接已自动释放回连接池
print("\n✅ 连接已自动释放回连接池")
except Exception as e:
print(f"\n❌ 操作失败: {e}")
finally:
# 关闭客户端
client.close()
print("✅ 客户端已关闭")
if __name__ == "__main__":
main()
from utils.infinity import InfinityClient
# 创建客户端实例
client = InfinityClient(database="test_db")
# 示例:在同一个连接上下文中执行多个相关操作
try:
with client.get_connection() as conn:
# 操作1:创建表
conn.create_table("temp_table", [
{"name": "id", "type": "INT", "is_primary_key": True},
{"name": "data", "type": "VARCHAR(255)"}
])
# 操作2:插入数据
conn.insert("temp_table", [{"id": 1, "data": "test1"}, {"id": 2, "data": "test2"}])
# 操作3:查询数据
search_query = {
"field": "data",
"query": "test",
"topn": 2
}
result = conn.search("temp_table", ["id", "data"], search_query)
print(f"查询结果: {result}")
# 操作4:删除表
conn.drop_table("temp_table")
print("✅ 所有相关操作在同一个连接上下文中完成")
except Exception as e:
print(f"❌ 操作失败: {e}")
finally:
client.close()
from utils.infinity import InfinityClient
# 创建客户端实例
client = InfinityClient(database="test_db")
try:
with client.get_connection() as conn:
# 执行可能失败的操作
conn.create_table("error_table", [
{"name": "id", "type": "INT", "is_primary_key": True},
{"name": "invalid_field", "type": "INVALID_TYPE"} # 无效字段类型
])
# 注意:如果上面的操作失败,代码不会执行到这里
print("✅ 操作成功")
except Exception as e:
# 捕获并处理异常
print(f"❌ 操作失败,已捕获异常: {e}")
finally:
# 无论操作是否成功,客户端都会被关闭
client.close()
print("✅ 客户端已关闭")
from utils.infinity import InfinityClient
client = InfinityClient(database="image_db")
# 执行搜索查询
result = client.search(
table_name="pdf_documents_table",
output_fields=["id", "title", "content"],
query={
"field": "content",
"query": "儿童绘本",
"topn": 5
}
)
print(f"Search result: {result}")
client.close()
from utils.infinity import InfinityClient
client = InfinityClient(database="image_db")
# 执行混合检索
result = client.hybrid_search(
table_name="pdf_documents_table",
output_fields=["id", "title", "content", "score"],
query={
"vector_field": "dense_vector_1024",
"query_vector": [0.1, 0.2, 0.3, ...], # 实际向量
"field": "content",
"query": "儿童绘本",
"topn": 5,
"fusion_weight": 0.5
}
)
print(f"Hybrid search result: {result}")
client.close()
from utils.infinity import InfinityClient
client = InfinityClient(database="image_db")
# 执行向量检索
result = client.vector_search(
table_name="pdf_documents_table",
output_fields=["id", "title", "content", "score"],
query={
"vector_field": "dense_vector_1024",
"query_vector": [0.1, 0.2, 0.3, ...], # 实际向量
"topn": 5
}
)
print(f"Vector search result: {result}")
client.close()
get_databases(): 获取所有数据库create_database(database_name): 创建数据库drop_database(database_name): 删除数据库use_database(database_name): 切换数据库get_tables(database_name=None): 获取所有表create_table(table_name, fields, database_name=None): 创建表drop_table(table_name, database_name=None): 删除表insert(table_name, documents, database_name=None): 插入文档search(table_name, output_fields, query, database_name=None): 搜索文档
table_name: 表名output_fields: 要返回的字段列表query: 查询条件,包含field、query和topn字段database_name: 数据库名称(可选,默认使用客户端配置的数据库)hybrid_search(table_name, output_fields, query, database_name=None): 混合检索
table_name: 表名output_fields: 要返回的字段列表query: 查询条件,包含vector_field、query_vector、field、query、topn和fusion_weight字段database_name: 数据库名称(可选,默认使用客户端配置的数据库)vector_search(table_name, output_fields, query, database_name=None): 向量检索
table_name: 表名output_fields: 要返回的字段列表query: 查询条件,包含vector_field、query_vector和topn字段database_name: 数据库名称(可选,默认使用客户端配置的数据库)get_status(): 获取连接池状态close(): 关闭所有连接| 参数 | 类型 | 默认值 | 说明 |
|---|---|---|---|
| host | str | "192.168.16.134" | Infinity服务地址 |
| port | str | "23817" | Infinity服务端口 |
| database | str | "default_db" | 数据库名称 |
| min_connections | int | 2 | 最小连接数 |
| max_connections | int | 10 | 最大连接数 |
| 参数 | 类型 | 默认值 | 说明 |
|---|---|---|---|
| host | str | "192.168.16.134" | Infinity服务地址 |
| port | str | "23817" | Infinity服务端口 |
| database | str | "default_db" | 数据库名称 |
| min_connections | int | 2 | 最小连接数 |
| max_connections | int | 10 | 最大连接数 |
| connection_timeout | int | 30 | 连接超时时间(秒) |
| idle_timeout | int | 300 | 空闲连接超时时间(秒) |
| heartbeat_interval | int | 60 | 心跳检测间隔(秒) |
可以通过 get_status() 方法获取连接池的状态信息:
from utils.infinity import InfinityClient
client = InfinityClient()
status = client.get_status()
print(f"Connection pool status: {status}")
输出示例:
Connection pool status: {
"total_connections": 5,
"available_connections": 3,
"in_use_connections": 2,
"min_connections": 2,
"max_connections": 10
}
min_connections 和 max_connectionswith client.get_connection() 确保连接正确释放from utils.infinity import InfinityClient
try:
client = InfinityClient()
databases = client.get_databases()
print(f"Databases: {databases}")
except Exception as e:
print(f"Error: {e}")
finally:
client.close()
可以将此客户端与现有代码无缝集成,替换直接使用Infinity SDK的地方:
import infinity
from infinity.common import NetworkAddress
# 直接创建连接
conn = infinity.connect(NetworkAddress("192.168.16.134", "23817"))
conn.use_database("image_db")
# 使用连接
result = conn.search("my_table", query)
from utils.infinity import InfinityClient
# 使用连接池客户端
client = InfinityClient(database="image_db")
result = client.search("my_table", query)
可以使用以下方法测试连接池的性能:
from utils.infinity import InfinityClient
import threading
import time
def test_query(client, thread_id):
"""测试查询性能"""
start_time = time.time()
databases = client.get_databases()
end_time = time.time()
print(f"Thread {thread_id}: Query took {end_time - start_time:.4f} seconds")
# 创建客户端
client = InfinityClient(max_connections=20)
# 测试并发查询
threads = []
start_time = time.time()
for i in range(50):
thread = threading.Thread(target=test_query, args=(client, i))
threads.append(thread)
thread.start()
# 等待所有线程完成
for thread in threads:
thread.join()
end_time = time.time()
print(f"Total time for 50 concurrent queries: {end_time - start_time:.4f} seconds")
print(f"Connection pool status: {client.get_status()}")
# 关闭客户端
client.close()
MIT License