| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308 |
- """
- MySQL 连接管理器
- 该文件提供 MySQL 数据库连接管理功能,支持:
- - 单例模式
- - 连接池管理
- - 基本 CRUD 操作
- - 事务支持
- - 连接错误处理
- """
- import pymysql
- from pymysql.cursors import DictCursor
- from typing import Any, List, Dict, Optional, Union
- from contextlib import contextmanager
- # 单例装饰器
- class singleton:
- def __init__(self, cls):
- self.cls = cls
- self._instance = None
-
- def __call__(self, *args, **kwargs):
- if self._instance is None:
- self._instance = self.cls(*args, **kwargs)
- return self._instance
- @singleton
- class MySQLConnection:
- """
- MySQL 连接管理器
- 支持:
- - 单例模式
- - 连接池管理
- - 基本 CRUD 操作
- - 事务支持
- - 连接错误处理
- """
-
- def __init__(self, host: str = "localhost", port: int = 3306,
- user: str = None, password: str = None,
- database: str = None, charset: str = "utf8mb4",
- pool_size: int = 5, **kwargs):
- """
- 初始化 MySQL 连接池
-
- Args:
- host: MySQL 主机地址
- port: MySQL 端口号
- user: MySQL 用户名
- password: MySQL 密码
- database: 数据库名称
- charset: 字符集
- pool_size: 连接池大小
- **kwargs: 其他 MySQL 连接参数
- """
- self.host = host
- self.port = port
- self.user = user
- self.password = password
- self.database = database
- self.charset = charset
- self.pool_size = pool_size
- self.kwargs = kwargs
-
- # 初始化连接池
- self._connection_pool = []
- self._init_connection_pool()
-
- def _init_connection_pool(self):
- """
- 初始化连接池
- """
- for _ in range(self.pool_size):
- conn = self._create_connection()
- if conn:
- self._connection_pool.append(conn)
-
- def _create_connection(self) -> Optional[pymysql.connections.Connection]:
- """
- 创建新的 MySQL 连接
-
- Returns:
- MySQL 连接对象,失败返回 None
- """
- try:
- conn = pymysql.connect(
- host=self.host,
- port=self.port,
- user=self.user,
- password=self.password,
- database=self.database,
- charset=self.charset,
- cursorclass=DictCursor,
- **self.kwargs
- )
- return conn
- except pymysql.Error as e:
- print(f"创建 MySQL 连接失败: {e}")
- return None
-
- def _get_connection(self) -> Optional[pymysql.connections.Connection]:
- """
- 从连接池获取连接
-
- Returns:
- MySQL 连接对象,失败返回 None
- """
- if self._connection_pool:
- return self._connection_pool.pop()
- else:
- # 连接池为空,创建新连接
- return self._create_connection()
-
- def _return_connection(self, conn: pymysql.connections.Connection):
- """
- 将连接返回连接池
-
- Args:
- conn: MySQL 连接对象
- """
- if len(self._connection_pool) < self.pool_size:
- self._connection_pool.append(conn)
- else:
- # 连接池已满,关闭连接
- conn.close()
-
- @contextmanager
- def get_cursor(self, cursorclass=DictCursor):
- """
- 获取游标上下文管理器
-
- Args:
- cursorclass: 游标类型
-
- Yields:
- MySQL 游标对象
- """
- conn = self._get_connection()
- if not conn:
- raise Exception("无法获取 MySQL 连接")
-
- try:
- cursor = conn.cursor(cursorclass=cursorclass)
- yield cursor
- conn.commit()
- except Exception as e:
- conn.rollback()
- raise e
- finally:
- cursor.close()
- self._return_connection(conn)
-
- def execute(self, sql: str, params: Union[List, Dict] = None) -> int:
- """
- 执行 SQL 语句(用于 INSERT、UPDATE、DELETE)
-
- Args:
- sql: SQL 语句
- params: SQL 参数
-
- Returns:
- 受影响的行数
- """
- with self.get_cursor() as cursor:
- cursor.execute(sql, params)
- return cursor.rowcount
-
- def fetch_one(self, sql: str, params: Union[List, Dict] = None) -> Optional[Dict[str, Any]]:
- """
- 执行 SQL 查询,返回单行结果
-
- Args:
- sql: SQL 查询语句
- params: SQL 参数
-
- Returns:
- 查询结果字典,无结果返回 None
- """
- with self.get_cursor() as cursor:
- cursor.execute(sql, params)
- return cursor.fetchone()
-
- def fetch_all(self, sql: str, params: Union[List, Dict] = None) -> List[Dict[str, Any]]:
- """
- 执行 SQL 查询,返回所有结果
-
- Args:
- sql: SQL 查询语句
- params: SQL 参数
-
- Returns:
- 查询结果列表
- """
- with self.get_cursor() as cursor:
- cursor.execute(sql, params)
- return cursor.fetchall()
-
- def fetch_many(self, sql: str, size: int, params: Union[List, Dict] = None) -> List[Dict[str, Any]]:
- """
- 执行 SQL 查询,返回指定数量的结果
-
- Args:
- sql: SQL 查询语句
- size: 返回结果数量
- params: SQL 参数
-
- Returns:
- 查询结果列表
- """
- with self.get_cursor() as cursor:
- cursor.execute(sql, params)
- return cursor.fetchmany(size)
-
- def bulk_insert(self, sql: str, params_list: List[Union[List, Dict]]) -> int:
- """
- 批量插入数据
-
- Args:
- sql: SQL 插入语句
- params_list: 参数列表
-
- Returns:
- 受影响的行数
- """
- with self.get_cursor() as cursor:
- cursor.executemany(sql, params_list)
- return cursor.rowcount
-
- def begin_transaction(self):
- """
- 开始事务
-
- Returns:
- 连接对象和游标对象
- """
- conn = self._get_connection()
- if not conn:
- raise Exception("无法获取 MySQL 连接")
-
- try:
- conn.begin()
- cursor = conn.cursor()
- return conn, cursor
- except Exception as e:
- self._return_connection(conn)
- raise e
-
- def commit_transaction(self, conn: pymysql.connections.Connection, cursor: pymysql.cursors.Cursor):
- """
- 提交事务
-
- Args:
- conn: 连接对象
- cursor: 游标对象
- """
- try:
- conn.commit()
- finally:
- cursor.close()
- self._return_connection(conn)
-
- def rollback_transaction(self, conn: pymysql.connections.Connection, cursor: pymysql.cursors.Cursor):
- """
- 回滚事务
-
- Args:
- conn: 连接对象
- cursor: 游标对象
- """
- try:
- conn.rollback()
- finally:
- cursor.close()
- self._return_connection(conn)
-
- def close(self):
- """
- 关闭连接池中的所有连接
- """
- for conn in self._connection_pool:
- try:
- conn.close()
- except Exception as e:
- print(f"关闭 MySQL 连接失败: {e}")
- self._connection_pool.clear()
- # 简化的接口函数,便于快速使用
- def get_mysql_conn(host: str = "localhost", port: int = 3306,
- user: str = None, password: str = None,
- database: str = None, charset: str = "utf8mb4",
- pool_size: int = 5, **kwargs) -> MySQLConnection:
- """
- 获取 MySQL 连接管理器实例
-
- Args:
- host: MySQL 主机地址
- port: MySQL 端口号
- user: MySQL 用户名
- password: MySQL 密码
- database: 数据库名称
- charset: 字符集
- pool_size: 连接池大小
- **kwargs: 其他 MySQL 连接参数
-
- Returns:
- MySQL 连接管理器实例
- """
- return MySQLConnection(host, port, user, password, database, charset, pool_size, **kwargs)
|