pool.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. # Infinity数据库连接池实现
  2. from typing import Dict, Any, List, Optional
  3. import threading
  4. import time
  5. from contextlib import contextmanager
  6. from conf.settings import vector_db_settings
  7. class InfinityConnectionPool:
  8. """
  9. Infinity数据库连接池
  10. 设计特点:
  11. 1. 支持并发连接管理
  12. 2. 连接自动回收和复用
  13. 3. 连接超时和心跳检测
  14. 4. 动态调整连接数量
  15. 5. 线程安全
  16. """
  17. def __init__(
  18. self,
  19. host: str = vector_db_settings.infinity_host,
  20. port: str = vector_db_settings.infinity_sdk_port,
  21. database: str = vector_db_settings.infinity_database,
  22. min_connections: int = 2,
  23. max_connections: int = 10,
  24. connection_timeout: int = 30,
  25. idle_timeout: int = 300,
  26. heartbeat_interval: int = 60
  27. ):
  28. """
  29. 初始化连接池
  30. Args:
  31. host: Infinity服务地址
  32. port: Infinity服务端口
  33. database: 数据库名称
  34. min_connections: 最小连接数
  35. max_connections: 最大连接数
  36. connection_timeout: 连接超时时间(秒)
  37. idle_timeout: 空闲连接超时时间(秒)
  38. heartbeat_interval: 心跳检测间隔(秒)
  39. """
  40. self.host = host
  41. self.port = port
  42. self.database = database
  43. self.min_connections = min_connections
  44. self.max_connections = max_connections
  45. self.connection_timeout = connection_timeout
  46. self.idle_timeout = idle_timeout
  47. self.heartbeat_interval = heartbeat_interval
  48. # 连接池状态
  49. self.connections = [] # 可用连接列表
  50. self.in_use = {} # 正在使用的连接 {connection: thread_id}
  51. self.connection_count = 0 # 当前连接总数
  52. # 线程安全锁
  53. self.lock = threading.Lock()
  54. self.condition = threading.Condition(self.lock)
  55. # 初始化最小连接数
  56. self._init_connections()
  57. # 启动心跳检测线程
  58. self.heartbeat_thread = threading.Thread(target=self._heartbeat_check, daemon=True)
  59. self.heartbeat_thread.start()
  60. def _init_connections(self):
  61. """初始化最小连接数"""
  62. for _ in range(self.min_connections):
  63. # 初始化时需要获取锁,因为_create_connection现在没有内部锁
  64. with self.lock:
  65. self._create_connection()
  66. def _create_connection(self) -> Any:
  67. """创建新连接"""
  68. try:
  69. import infinity
  70. # 连接到Infinity服务
  71. connection = infinity.connect(
  72. infinity.NetworkAddress(self.host, self.port)
  73. )
  74. # 注意:根据官方API,RemoteThriftInfinityConnection对象没有use_database方法
  75. # 数据库操作应该通过create_database、drop_database等方法直接指定数据库名称
  76. # 或者通过获取Database对象后再进行操作
  77. # 保存数据库名称,供后续操作使用
  78. connection.__dict__['_database'] = self.database
  79. # 记录连接创建时间
  80. connection.__dict__['_created_at'] = time.time()
  81. connection.__dict__['_last_used'] = time.time()
  82. connection.__dict__['_is_valid'] = True
  83. # 注意:这里不需要再获取锁,因为调用此方法时已经在acquire方法中持有了锁
  84. self.connections.append(connection)
  85. self.connection_count += 1
  86. return connection
  87. except Exception as e:
  88. raise Exception(f"Failed to create Infinity connection: {e}")
  89. def _is_valid_connection(self, connection: Any) -> bool:
  90. """检查连接是否有效"""
  91. try:
  92. # 通过执行简单查询检查连接是否有效
  93. # 注意:这里不应该在持有锁的情况下执行网络操作
  94. # 但由于此方法是在锁内被调用的,我们需要尽量减少操作时间
  95. connection.get_database(self.database)
  96. return True
  97. except Exception:
  98. return False
  99. def _heartbeat_check(self):
  100. """心跳检测,定期检查连接有效性并清理过期连接"""
  101. while True:
  102. time.sleep(self.heartbeat_interval)
  103. self._cleanup_connections()
  104. def _cleanup_connections(self):
  105. """清理无效或过期连接"""
  106. with self.lock:
  107. current_time = time.time()
  108. valid_connections = []
  109. for connection in self.connections:
  110. # 检查连接是否过期
  111. if current_time - connection.__dict__['_last_used'] > self.idle_timeout:
  112. # 关闭过期连接
  113. try:
  114. connection.disconnect()
  115. self.connection_count -= 1
  116. except Exception:
  117. pass
  118. elif not self._is_valid_connection(connection):
  119. # 关闭无效连接
  120. try:
  121. connection.disconnect()
  122. self.connection_count -= 1
  123. except Exception:
  124. pass
  125. else:
  126. valid_connections.append(connection)
  127. self.connections = valid_connections
  128. # 确保连接数不低于最小值
  129. while self.connection_count < self.min_connections:
  130. self._create_connection()
  131. @contextmanager
  132. def get_connection(self, timeout: Optional[int] = None) -> Any:
  133. """
  134. 获取一个连接,使用上下文管理器
  135. Args:
  136. timeout: 获取连接的超时时间(秒)
  137. Yields:
  138. Infinity连接对象
  139. """
  140. connection = None
  141. try:
  142. connection = self.acquire(timeout)
  143. yield connection
  144. finally:
  145. if connection:
  146. self.release(connection)
  147. def acquire(self, timeout: Optional[int] = None) -> Any:
  148. """
  149. 获取一个连接
  150. Args:
  151. timeout: 获取连接的超时时间(秒)
  152. Returns:
  153. Infinity连接对象
  154. Raises:
  155. TimeoutError: 获取连接超时
  156. """
  157. end_time = time.time() + (timeout or self.connection_timeout)
  158. with self.lock:
  159. while True:
  160. # 检查是否有可用连接
  161. if self.connections:
  162. # 获取一个连接
  163. connection = self.connections.pop()
  164. # 检查连接是否有效
  165. if self._is_valid_connection(connection):
  166. connection.__dict__['_last_used'] = time.time()
  167. self.in_use[connection] = threading.get_ident()
  168. return connection
  169. else:
  170. # 连接无效,关闭并计数减一
  171. try:
  172. connection.disconnect()
  173. self.connection_count -= 1
  174. except Exception:
  175. pass
  176. # 检查是否可以创建新连接
  177. elif self.connection_count < self.max_connections:
  178. # 创建新连接
  179. connection = self._create_connection()
  180. connection.__dict__['_last_used'] = time.time()
  181. self.in_use[connection] = threading.get_ident()
  182. return connection
  183. # 等待连接释放
  184. remaining = end_time - time.time()
  185. if remaining <= 0:
  186. raise TimeoutError("Timeout waiting for Infinity connection")
  187. # 等待连接释放或超时
  188. self.condition.wait(remaining)
  189. def release(self, connection: Any):
  190. """
  191. 释放连接
  192. Args:
  193. connection: 要释放的连接
  194. """
  195. with self.lock:
  196. if connection in self.in_use:
  197. del self.in_use[connection]
  198. # 检查连接是否有效
  199. if self._is_valid_connection(connection):
  200. connection.__dict__['_last_used'] = time.time()
  201. self.connections.append(connection)
  202. # 通知等待的线程
  203. self.condition.notify()
  204. else:
  205. # 连接无效,关闭并计数减一
  206. try:
  207. connection.disconnect()
  208. self.connection_count -= 1
  209. except Exception:
  210. pass
  211. def close(self):
  212. """关闭所有连接"""
  213. with self.lock:
  214. # 关闭可用连接
  215. for connection in self.connections:
  216. try:
  217. connection.disconnect()
  218. except Exception:
  219. pass
  220. # 关闭正在使用的连接
  221. for connection in list(self.in_use.keys()):
  222. try:
  223. connection.disconnect()
  224. except Exception:
  225. pass
  226. self.connections = []
  227. self.in_use = {}
  228. self.connection_count = 0
  229. def get_status(self) -> Dict[str, Any]:
  230. """
  231. 获取连接池状态
  232. Returns:
  233. 连接池状态信息
  234. """
  235. with self.lock:
  236. return {
  237. "total_connections": self.connection_count,
  238. "available_connections": len(self.connections),
  239. "in_use_connections": len(self.in_use),
  240. "min_connections": self.min_connections,
  241. "max_connections": self.max_connections
  242. }