# Infinity数据库连接池实现 from typing import Dict, Any, List, Optional import threading import time from contextlib import contextmanager from conf.settings import vector_db_settings class InfinityConnectionPool: """ Infinity数据库连接池 设计特点: 1. 支持并发连接管理 2. 连接自动回收和复用 3. 连接超时和心跳检测 4. 动态调整连接数量 5. 线程安全 """ def __init__( self, host: str = vector_db_settings.infinity_host, port: str = vector_db_settings.infinity_sdk_port, database: str = vector_db_settings.infinity_database, min_connections: int = 2, max_connections: int = 10, connection_timeout: int = 30, idle_timeout: int = 300, heartbeat_interval: int = 60 ): """ 初始化连接池 Args: host: Infinity服务地址 port: Infinity服务端口 database: 数据库名称 min_connections: 最小连接数 max_connections: 最大连接数 connection_timeout: 连接超时时间(秒) idle_timeout: 空闲连接超时时间(秒) heartbeat_interval: 心跳检测间隔(秒) """ self.host = host self.port = port self.database = database self.min_connections = min_connections self.max_connections = max_connections self.connection_timeout = connection_timeout self.idle_timeout = idle_timeout self.heartbeat_interval = heartbeat_interval # 连接池状态 self.connections = [] # 可用连接列表 self.in_use = {} # 正在使用的连接 {connection: thread_id} self.connection_count = 0 # 当前连接总数 # 线程安全锁 self.lock = threading.Lock() self.condition = threading.Condition(self.lock) # 初始化最小连接数 self._init_connections() # 启动心跳检测线程 self.heartbeat_thread = threading.Thread(target=self._heartbeat_check, daemon=True) self.heartbeat_thread.start() def _init_connections(self): """初始化最小连接数""" for _ in range(self.min_connections): self._create_connection() def _create_connection(self) -> Any: """创建新连接""" try: import infinity # 连接到Infinity服务 connection = infinity.connect( infinity.NetworkAddress(self.host, self.port) ) # 注意:根据官方API,RemoteThriftInfinityConnection对象没有use_database方法 # 数据库操作应该通过create_database、drop_database等方法直接指定数据库名称 # 或者通过获取Database对象后再进行操作 # 保存数据库名称,供后续操作使用 connection.__dict__['_database'] = self.database # 记录连接创建时间 connection.__dict__['_created_at'] = time.time() connection.__dict__['_last_used'] = time.time() connection.__dict__['_is_valid'] = True with self.lock: self.connections.append(connection) self.connection_count += 1 return connection except Exception as e: raise Exception(f"Failed to create Infinity connection: {e}") def _is_valid_connection(self, connection: Any) -> bool: """检查连接是否有效""" try: # 通过执行简单查询检查连接是否有效 connection.get_database(self.database) return True except Exception: return False def _heartbeat_check(self): """心跳检测,定期检查连接有效性并清理过期连接""" while True: time.sleep(self.heartbeat_interval) self._cleanup_connections() def _cleanup_connections(self): """清理无效或过期连接""" with self.lock: current_time = time.time() valid_connections = [] for connection in self.connections: # 检查连接是否过期 if current_time - connection.__dict__['_last_used'] > self.idle_timeout: # 关闭过期连接 try: connection.disconnect() self.connection_count -= 1 except Exception: pass elif not self._is_valid_connection(connection): # 关闭无效连接 try: connection.disconnect() self.connection_count -= 1 except Exception: pass else: valid_connections.append(connection) self.connections = valid_connections # 确保连接数不低于最小值 while self.connection_count < self.min_connections: self._create_connection() @contextmanager def get_connection(self, timeout: Optional[int] = None) -> Any: """ 获取一个连接,使用上下文管理器 Args: timeout: 获取连接的超时时间(秒) Yields: Infinity连接对象 """ connection = None try: connection = self.acquire(timeout) yield connection finally: if connection: self.release(connection) def acquire(self, timeout: Optional[int] = None) -> Any: """ 获取一个连接 Args: timeout: 获取连接的超时时间(秒) Returns: Infinity连接对象 Raises: TimeoutError: 获取连接超时 """ end_time = time.time() + (timeout or self.connection_timeout) with self.lock: while True: # 检查是否有可用连接 if self.connections: # 获取一个连接 connection = self.connections.pop() # 检查连接是否有效 if self._is_valid_connection(connection): connection.__dict__['_last_used'] = time.time() self.in_use[connection] = threading.get_ident() return connection else: # 连接无效,关闭并计数减一 try: connection.disconnect() self.connection_count -= 1 except Exception: pass # 检查是否可以创建新连接 elif self.connection_count < self.max_connections: # 创建新连接 connection = self._create_connection() connection.__dict__['_last_used'] = time.time() self.in_use[connection] = threading.get_ident() return connection # 等待连接释放 remaining = end_time - time.time() if remaining <= 0: raise TimeoutError("Timeout waiting for Infinity connection") # 等待连接释放或超时 self.condition.wait(remaining) def release(self, connection: Any): """ 释放连接 Args: connection: 要释放的连接 """ with self.lock: if connection in self.in_use: del self.in_use[connection] # 检查连接是否有效 if self._is_valid_connection(connection): connection.__dict__['_last_used'] = time.time() self.connections.append(connection) # 通知等待的线程 self.condition.notify() else: # 连接无效,关闭并计数减一 try: connection.disconnect() self.connection_count -= 1 except Exception: pass def close(self): """关闭所有连接""" with self.lock: # 关闭可用连接 for connection in self.connections: try: connection.disconnect() except Exception: pass # 关闭正在使用的连接 for connection in list(self.in_use.keys()): try: connection.disconnect() except Exception: pass self.connections = [] self.in_use = {} self.connection_count = 0 def get_status(self) -> Dict[str, Any]: """ 获取连接池状态 Returns: 连接池状态信息 """ with self.lock: return { "total_connections": self.connection_count, "available_connections": len(self.connections), "in_use_connections": len(self.in_use), "min_connections": self.min_connections, "max_connections": self.max_connections }