| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138 |
- """
- MySQL 连接池配置
- 该文件提供 MySQL 数据库连接池配置功能,支持:
- - 单例模式
- - DBUtils 连接池管理
- - 连接错误处理
- - 从.env文件读取配置
- """
- import pymysql
- from pymysql.cursors import DictCursor
- from dbutils.pooled_db import PooledDB
- from conf.settings import mysql_settings
- # 单例装饰器
- class singleton:
- def __init__(self, cls):
- self.cls = cls
- self._instance = None
-
- def __call__(self, *args, **kwargs):
- if self._instance is None:
- self._instance = self.cls(*args, **kwargs)
- return self._instance
- @singleton
- class MySQLPool:
- """
- MySQL 连接池管理器
- 支持:
- - 单例模式
- - DBUtils 连接池管理
- - 连接错误处理
- """
-
- def __init__(self, host: str = None, port: int = None,
- user: str = None, password: str = None,
- database: str = None, charset: str = None,
- pool_size: int = None, **kwargs):
- """
- 初始化 MySQL 连接池
-
- Args:
- host: MySQL 主机地址
- port: MySQL 端口号
- user: MySQL 用户名
- password: MySQL 密码
- database: 数据库名称
- charset: 字符集
- pool_size: 连接池大小
- **kwargs: 其他 MySQL 连接参数
- """
- # 从环境变量读取配置,优先级:传入参数 > 环境变量 > 默认值
- self.host = host or mysql_settings.mysql_host
- self.port = int(port or mysql_settings.mysql_port)
- self.user = user or mysql_settings.mysql_user
- self.password = password or mysql_settings.mysql_password
- self.database = database or mysql_settings.mysql_database
- self.charset = charset or mysql_settings.mysql_charset
- self.pool_size = int(pool_size or mysql_settings.mysql_pool_size)
- self.kwargs = kwargs
-
- # 初始化 DBUtils 连接池
- self._pool = PooledDB(
- creator=pymysql,
- maxconnections=self.pool_size,
- mincached=2,
- maxcached=5,
- maxshared=3,
- blocking=True,
- maxusage=None,
- setsession=[],
- ping=0,
- host=self.host,
- port=self.port,
- user=self.user,
- password=self.password,
- database=self.database,
- charset=self.charset,
- cursorclass=DictCursor,
- **kwargs
- )
-
- def get_connection(self) -> pymysql.connections.Connection:
- """
- 从连接池获取连接
-
- Returns:
- MySQL 连接对象
- """
- return self._pool.connection()
-
- def close(self):
- """
- 关闭连接池
- """
- # DBUtils 连接池会自动管理连接,无需手动关闭
- pass
- # 简化的接口函数,便于快速使用
- def get_mysql_pool(host: str = None, port: int = None,
- user: str = None, password: str = None,
- database: str = None, charset: str = None,
- pool_size: int = None, **kwargs) -> MySQLPool:
- """
- 获取 MySQL 连接池实例
-
- Args:
- host: MySQL 主机地址
- port: MySQL 端口号
- user: MySQL 用户名
- password: MySQL 密码
- database: 数据库名称
- charset: 字符集
- pool_size: 连接池大小
- **kwargs: 其他 MySQL 连接参数
-
- Returns:
- MySQL 连接池实例
- """
- # 从环境变量读取默认值,与MySQLPool.__init__保持一致
- default_host = mysql_settings.mysql_host
- default_port = int(mysql_settings.mysql_port)
- default_user = mysql_settings.mysql_user
- default_database = mysql_settings.mysql_database
- default_charset = mysql_settings.mysql_charset
- default_pool_size = int(mysql_settings.mysql_pool_size)
-
- return MySQLPool(
- host=host or default_host,
- port=port or default_port,
- user=user or default_user,
- password=password,
- database=database or default_database,
- charset=charset or default_charset,
- pool_size=pool_size or default_pool_size,
- **kwargs
- )
|