""" MySQL 连接管理器 该文件提供 MySQL 数据库连接管理功能,支持: - 单例模式 - 连接池管理 - 基本 CRUD 操作 - 事务支持 - 连接错误处理 """ import pymysql from pymysql.cursors import DictCursor from typing import Any, List, Dict, Optional, Union from contextlib import contextmanager # 单例装饰器 class singleton: def __init__(self, cls): self.cls = cls self._instance = None def __call__(self, *args, **kwargs): if self._instance is None: self._instance = self.cls(*args, **kwargs) return self._instance @singleton class MySQLConnection: """ MySQL 连接管理器 支持: - 单例模式 - 连接池管理 - 基本 CRUD 操作 - 事务支持 - 连接错误处理 """ def __init__(self, host: str = "localhost", port: int = 3306, user: str = None, password: str = None, database: str = None, charset: str = "utf8mb4", pool_size: int = 5, **kwargs): """ 初始化 MySQL 连接池 Args: host: MySQL 主机地址 port: MySQL 端口号 user: MySQL 用户名 password: MySQL 密码 database: 数据库名称 charset: 字符集 pool_size: 连接池大小 **kwargs: 其他 MySQL 连接参数 """ self.host = host self.port = port self.user = user self.password = password self.database = database self.charset = charset self.pool_size = pool_size self.kwargs = kwargs # 初始化连接池 self._connection_pool = [] self._init_connection_pool() def _init_connection_pool(self): """ 初始化连接池 """ for _ in range(self.pool_size): conn = self._create_connection() if conn: self._connection_pool.append(conn) def _create_connection(self) -> Optional[pymysql.connections.Connection]: """ 创建新的 MySQL 连接 Returns: MySQL 连接对象,失败返回 None """ try: conn = pymysql.connect( host=self.host, port=self.port, user=self.user, password=self.password, database=self.database, charset=self.charset, cursorclass=DictCursor, **self.kwargs ) return conn except pymysql.Error as e: print(f"创建 MySQL 连接失败: {e}") return None def _get_connection(self) -> Optional[pymysql.connections.Connection]: """ 从连接池获取连接 Returns: MySQL 连接对象,失败返回 None """ if self._connection_pool: return self._connection_pool.pop() else: # 连接池为空,创建新连接 return self._create_connection() def _return_connection(self, conn: pymysql.connections.Connection): """ 将连接返回连接池 Args: conn: MySQL 连接对象 """ if len(self._connection_pool) < self.pool_size: self._connection_pool.append(conn) else: # 连接池已满,关闭连接 conn.close() @contextmanager def get_cursor(self, cursorclass=DictCursor): """ 获取游标上下文管理器 Args: cursorclass: 游标类型 Yields: MySQL 游标对象 """ conn = self._get_connection() if not conn: raise Exception("无法获取 MySQL 连接") try: cursor = conn.cursor(cursorclass=cursorclass) yield cursor conn.commit() except Exception as e: conn.rollback() raise e finally: cursor.close() self._return_connection(conn) def execute(self, sql: str, params: Union[List, Dict] = None) -> int: """ 执行 SQL 语句(用于 INSERT、UPDATE、DELETE) Args: sql: SQL 语句 params: SQL 参数 Returns: 受影响的行数 """ with self.get_cursor() as cursor: cursor.execute(sql, params) return cursor.rowcount def fetch_one(self, sql: str, params: Union[List, Dict] = None) -> Optional[Dict[str, Any]]: """ 执行 SQL 查询,返回单行结果 Args: sql: SQL 查询语句 params: SQL 参数 Returns: 查询结果字典,无结果返回 None """ with self.get_cursor() as cursor: cursor.execute(sql, params) return cursor.fetchone() def fetch_all(self, sql: str, params: Union[List, Dict] = None) -> List[Dict[str, Any]]: """ 执行 SQL 查询,返回所有结果 Args: sql: SQL 查询语句 params: SQL 参数 Returns: 查询结果列表 """ with self.get_cursor() as cursor: cursor.execute(sql, params) return cursor.fetchall() def fetch_many(self, sql: str, size: int, params: Union[List, Dict] = None) -> List[Dict[str, Any]]: """ 执行 SQL 查询,返回指定数量的结果 Args: sql: SQL 查询语句 size: 返回结果数量 params: SQL 参数 Returns: 查询结果列表 """ with self.get_cursor() as cursor: cursor.execute(sql, params) return cursor.fetchmany(size) def bulk_insert(self, sql: str, params_list: List[Union[List, Dict]]) -> int: """ 批量插入数据 Args: sql: SQL 插入语句 params_list: 参数列表 Returns: 受影响的行数 """ with self.get_cursor() as cursor: cursor.executemany(sql, params_list) return cursor.rowcount def begin_transaction(self): """ 开始事务 Returns: 连接对象和游标对象 """ conn = self._get_connection() if not conn: raise Exception("无法获取 MySQL 连接") try: conn.begin() cursor = conn.cursor() return conn, cursor except Exception as e: self._return_connection(conn) raise e def commit_transaction(self, conn: pymysql.connections.Connection, cursor: pymysql.cursors.Cursor): """ 提交事务 Args: conn: 连接对象 cursor: 游标对象 """ try: conn.commit() finally: cursor.close() self._return_connection(conn) def rollback_transaction(self, conn: pymysql.connections.Connection, cursor: pymysql.cursors.Cursor): """ 回滚事务 Args: conn: 连接对象 cursor: 游标对象 """ try: conn.rollback() finally: cursor.close() self._return_connection(conn) def close(self): """ 关闭连接池中的所有连接 """ for conn in self._connection_pool: try: conn.close() except Exception as e: print(f"关闭 MySQL 连接失败: {e}") self._connection_pool.clear() # 简化的接口函数,便于快速使用 def get_mysql_conn(host: str = "localhost", port: int = 3306, user: str = None, password: str = None, database: str = None, charset: str = "utf8mb4", pool_size: int = 5, **kwargs) -> MySQLConnection: """ 获取 MySQL 连接管理器实例 Args: host: MySQL 主机地址 port: MySQL 端口号 user: MySQL 用户名 password: MySQL 密码 database: 数据库名称 charset: 字符集 pool_size: 连接池大小 **kwargs: 其他 MySQL 连接参数 Returns: MySQL 连接管理器实例 """ return MySQLConnection(host, port, user, password, database, charset, pool_size, **kwargs)