| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281 |
- # Infinity数据库连接池实现
- from typing import Dict, Any, List, Optional
- import threading
- import time
- from contextlib import contextmanager
- from conf.config import VectorDBConfig
- class InfinityConnectionPool:
- """
- Infinity数据库连接池
-
- 设计特点:
- 1. 支持并发连接管理
- 2. 连接自动回收和复用
- 3. 连接超时和心跳检测
- 4. 动态调整连接数量
- 5. 线程安全
- """
-
- def __init__(
- self,
- host: str = VectorDBConfig.get_infinity_host(),
- port: str = VectorDBConfig.get_infinity_sdk_port(),
- database: str = VectorDBConfig.get_infinity_database(),
- min_connections: int = 2,
- max_connections: int = 10,
- connection_timeout: int = 30,
- idle_timeout: int = 300,
- heartbeat_interval: int = 60
- ):
- """
- 初始化连接池
-
- Args:
- host: Infinity服务地址
- port: Infinity服务端口
- database: 数据库名称
- min_connections: 最小连接数
- max_connections: 最大连接数
- connection_timeout: 连接超时时间(秒)
- idle_timeout: 空闲连接超时时间(秒)
- heartbeat_interval: 心跳检测间隔(秒)
- """
- self.host = host
- self.port = port
- self.database = database
- self.min_connections = min_connections
- self.max_connections = max_connections
- self.connection_timeout = connection_timeout
- self.idle_timeout = idle_timeout
- self.heartbeat_interval = heartbeat_interval
-
- # 连接池状态
- self.connections = [] # 可用连接列表
- self.in_use = {} # 正在使用的连接 {connection: thread_id}
- self.connection_count = 0 # 当前连接总数
-
- # 线程安全锁
- self.lock = threading.Lock()
- self.condition = threading.Condition(self.lock)
-
- # 初始化最小连接数
- self._init_connections()
-
- # 启动心跳检测线程
- self.heartbeat_thread = threading.Thread(target=self._heartbeat_check, daemon=True)
- self.heartbeat_thread.start()
-
- def _init_connections(self):
- """初始化最小连接数"""
- for _ in range(self.min_connections):
- self._create_connection()
-
- def _create_connection(self) -> Any:
- """创建新连接"""
- try:
- import infinity
-
- # 连接到Infinity服务
- connection = infinity.connect(
- infinity.NetworkAddress(self.host, self.port)
- )
-
- # 注意:根据官方API,RemoteThriftInfinityConnection对象没有use_database方法
- # 数据库操作应该通过create_database、drop_database等方法直接指定数据库名称
- # 或者通过获取Database对象后再进行操作
-
- # 保存数据库名称,供后续操作使用
- connection.__dict__['_database'] = self.database
-
- # 记录连接创建时间
- connection.__dict__['_created_at'] = time.time()
- connection.__dict__['_last_used'] = time.time()
- connection.__dict__['_is_valid'] = True
-
- with self.lock:
- self.connections.append(connection)
- self.connection_count += 1
-
- return connection
- except Exception as e:
- raise Exception(f"Failed to create Infinity connection: {e}")
-
- def _is_valid_connection(self, connection: Any) -> bool:
- """检查连接是否有效"""
- try:
- # 通过执行简单查询检查连接是否有效
- connection.get_database(self.database)
- return True
- except Exception:
- return False
-
- def _heartbeat_check(self):
- """心跳检测,定期检查连接有效性并清理过期连接"""
- while True:
- time.sleep(self.heartbeat_interval)
- self._cleanup_connections()
-
- def _cleanup_connections(self):
- """清理无效或过期连接"""
- with self.lock:
- current_time = time.time()
- valid_connections = []
-
- for connection in self.connections:
- # 检查连接是否过期
- if current_time - connection.__dict__['_last_used'] > self.idle_timeout:
- # 关闭过期连接
- try:
- connection.disconnect()
- self.connection_count -= 1
- except Exception:
- pass
- elif not self._is_valid_connection(connection):
- # 关闭无效连接
- try:
- connection.disconnect()
- self.connection_count -= 1
- except Exception:
- pass
- else:
- valid_connections.append(connection)
-
- self.connections = valid_connections
-
- # 确保连接数不低于最小值
- while self.connection_count < self.min_connections:
- self._create_connection()
-
- @contextmanager
- def get_connection(self, timeout: Optional[int] = None) -> Any:
- """
- 获取一个连接,使用上下文管理器
-
- Args:
- timeout: 获取连接的超时时间(秒)
-
- Yields:
- Infinity连接对象
- """
- connection = None
- try:
- connection = self.acquire(timeout)
- yield connection
- finally:
- if connection:
- self.release(connection)
-
- def acquire(self, timeout: Optional[int] = None) -> Any:
- """
- 获取一个连接
-
- Args:
- timeout: 获取连接的超时时间(秒)
-
- Returns:
- Infinity连接对象
-
- Raises:
- TimeoutError: 获取连接超时
- """
- end_time = time.time() + (timeout or self.connection_timeout)
-
- with self.lock:
- while True:
- # 检查是否有可用连接
- if self.connections:
- # 获取一个连接
- connection = self.connections.pop()
-
- # 检查连接是否有效
- if self._is_valid_connection(connection):
- connection.__dict__['_last_used'] = time.time()
- self.in_use[connection] = threading.get_ident()
- return connection
- else:
- # 连接无效,关闭并计数减一
- try:
- connection.disconnect()
- self.connection_count -= 1
- except Exception:
- pass
-
- # 检查是否可以创建新连接
- elif self.connection_count < self.max_connections:
- # 创建新连接
- connection = self._create_connection()
- connection.__dict__['_last_used'] = time.time()
- self.in_use[connection] = threading.get_ident()
- return connection
-
- # 等待连接释放
- remaining = end_time - time.time()
- if remaining <= 0:
- raise TimeoutError("Timeout waiting for Infinity connection")
-
- # 等待连接释放或超时
- self.condition.wait(remaining)
-
- def release(self, connection: Any):
- """
- 释放连接
-
- Args:
- connection: 要释放的连接
- """
- with self.lock:
- if connection in self.in_use:
- del self.in_use[connection]
-
- # 检查连接是否有效
- if self._is_valid_connection(connection):
- connection.__dict__['_last_used'] = time.time()
- self.connections.append(connection)
- # 通知等待的线程
- self.condition.notify()
- else:
- # 连接无效,关闭并计数减一
- try:
- connection.disconnect()
- self.connection_count -= 1
- except Exception:
- pass
-
- def close(self):
- """关闭所有连接"""
- with self.lock:
- # 关闭可用连接
- for connection in self.connections:
- try:
- connection.disconnect()
- except Exception:
- pass
-
- # 关闭正在使用的连接
- for connection in list(self.in_use.keys()):
- try:
- connection.disconnect()
- except Exception:
- pass
-
- self.connections = []
- self.in_use = {}
- self.connection_count = 0
-
- def get_status(self) -> Dict[str, Any]:
- """
- 获取连接池状态
-
- Returns:
- 连接池状态信息
- """
- with self.lock:
- return {
- "total_connections": self.connection_count,
- "available_connections": len(self.connections),
- "in_use_connections": len(self.in_use),
- "min_connections": self.min_connections,
- "max_connections": self.max_connections
- }
|