mysql_pool.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. """
  2. MySQL 连接池配置
  3. 该文件提供 MySQL 数据库连接池配置功能,支持:
  4. - 单例模式
  5. - DBUtils 连接池管理
  6. - 连接错误处理
  7. - 从.env文件读取配置
  8. """
  9. import pymysql
  10. from pymysql.cursors import DictCursor
  11. from dbutils.pooled_db import PooledDB
  12. from conf.settings import mysql_settings
  13. # 单例装饰器
  14. class singleton:
  15. def __init__(self, cls):
  16. self.cls = cls
  17. self._instance = None
  18. def __call__(self, *args, **kwargs):
  19. if self._instance is None:
  20. self._instance = self.cls(*args, **kwargs)
  21. return self._instance
  22. @singleton
  23. class MySQLPool:
  24. """
  25. MySQL 连接池管理器
  26. 支持:
  27. - 单例模式
  28. - DBUtils 连接池管理
  29. - 连接错误处理
  30. """
  31. def __init__(self, host: str = None, port: int = None,
  32. user: str = None, password: str = None,
  33. database: str = None, charset: str = None,
  34. pool_size: int = None, **kwargs):
  35. """
  36. 初始化 MySQL 连接池
  37. Args:
  38. host: MySQL 主机地址
  39. port: MySQL 端口号
  40. user: MySQL 用户名
  41. password: MySQL 密码
  42. database: 数据库名称
  43. charset: 字符集
  44. pool_size: 连接池大小
  45. **kwargs: 其他 MySQL 连接参数
  46. """
  47. # 从环境变量读取配置,优先级:传入参数 > 环境变量 > 默认值
  48. self.host = host or mysql_settings.mysql_host
  49. self.port = int(port or mysql_settings.mysql_port)
  50. self.user = user or mysql_settings.mysql_user
  51. self.password = password or mysql_settings.mysql_password
  52. self.database = database or mysql_settings.mysql_database
  53. self.charset = charset or mysql_settings.mysql_charset
  54. self.pool_size = int(pool_size or mysql_settings.mysql_pool_size)
  55. self.kwargs = kwargs
  56. # 初始化 DBUtils 连接池
  57. self._pool = PooledDB(
  58. creator=pymysql,
  59. maxconnections=self.pool_size,
  60. mincached=2,
  61. maxcached=5,
  62. maxshared=3,
  63. blocking=True,
  64. maxusage=None,
  65. setsession=[],
  66. ping=0,
  67. host=self.host,
  68. port=self.port,
  69. user=self.user,
  70. password=self.password,
  71. database=self.database,
  72. charset=self.charset,
  73. cursorclass=DictCursor,
  74. **kwargs
  75. )
  76. def get_connection(self) -> pymysql.connections.Connection:
  77. """
  78. 从连接池获取连接
  79. Returns:
  80. MySQL 连接对象
  81. """
  82. return self._pool.connection()
  83. def close(self):
  84. """
  85. 关闭连接池
  86. """
  87. # DBUtils 连接池会自动管理连接,无需手动关闭
  88. pass
  89. # 简化的接口函数,便于快速使用
  90. def get_mysql_pool(host: str = None, port: int = None,
  91. user: str = None, password: str = None,
  92. database: str = None, charset: str = None,
  93. pool_size: int = None, **kwargs) -> MySQLPool:
  94. """
  95. 获取 MySQL 连接池实例
  96. Args:
  97. host: MySQL 主机地址
  98. port: MySQL 端口号
  99. user: MySQL 用户名
  100. password: MySQL 密码
  101. database: 数据库名称
  102. charset: 字符集
  103. pool_size: 连接池大小
  104. **kwargs: 其他 MySQL 连接参数
  105. Returns:
  106. MySQL 连接池实例
  107. """
  108. # 从环境变量读取默认值,与MySQLPool.__init__保持一致
  109. default_host = mysql_settings.mysql_host
  110. default_port = int(mysql_settings.mysql_port)
  111. default_user = mysql_settings.mysql_user
  112. default_database = mysql_settings.mysql_database
  113. default_charset = mysql_settings.mysql_charset
  114. default_pool_size = int(mysql_settings.mysql_pool_size)
  115. return MySQLPool(
  116. host=host or default_host,
  117. port=port or default_port,
  118. user=user or default_user,
  119. password=password,
  120. database=database or default_database,
  121. charset=charset or default_charset,
  122. pool_size=pool_size or default_pool_size,
  123. **kwargs
  124. )