""" MySQL SQL 执行器 该文件提供 MySQL 数据库 SQL 执行功能,支持: - 单例模式 - 基本 CRUD 操作 - 事务支持 - 连接错误处理 - 全局客户端管理 """ import pymysql from pymysql.cursors import DictCursor from typing import Any, List, Dict, Optional, Union from contextlib import contextmanager from .mysql_pool import get_mysql_pool, MySQLPool from utils.decorators.singleton import singleton @singleton class MySQLConnection: """ MySQL SQL 执行器 支持: - 单例模式 - 基本 CRUD 操作 - 事务支持 - 连接错误处理 """ def __init__(self, mysql_pool: Optional[MySQLPool] = None, host: str = None, port: int = None, user: str = None, password: str = None, database: str = None, charset: str = None, pool_size: int = None, **kwargs): """ 初始化 MySQL SQL 执行器 Args: mysql_pool: 可选的 MySQL 连接池实例,如果提供则使用该实例,否则创建新实例 host: MySQL 主机地址 port: MySQL 端口号 user: MySQL 用户名 password: MySQL 密码 database: 数据库名称 charset: 字符集 pool_size: 连接池大小 **kwargs: 其他 MySQL 连接参数 """ # 如果提供了连接池实例,则使用该实例,否则创建新实例 if mysql_pool: self._pool = mysql_pool else: self._pool = get_mysql_pool(host, port, user, password, database, charset, pool_size, **kwargs) def _get_connection(self) -> pymysql.connections.Connection: """ 从连接池获取连接 Returns: MySQL 连接对象 """ return self._pool.get_connection() @contextmanager def get_cursor(self, cursorclass=DictCursor): """ 获取游标上下文管理器 Args: cursorclass: 游标类型 Yields: MySQL 游标对象 """ conn = self._get_connection() cursor = conn.cursor(cursorclass=cursorclass) try: yield cursor conn.commit() except Exception as e: conn.rollback() raise e finally: cursor.close() conn.close() def execute(self, sql: str, params: Union[List, Dict] = None) -> int: """ 执行 SQL 语句(用于 INSERT、UPDATE、DELETE) Args: sql: SQL 语句 params: SQL 参数 Returns: 受影响的行数 """ with self.get_cursor() as cursor: cursor.execute(sql, params) return cursor.rowcount def fetch_one(self, sql: str, params: Union[List, Dict] = None) -> Optional[Dict[str, Any]]: """ 执行 SQL 查询,返回单行结果 Args: sql: SQL 查询语句 params: SQL 参数 Returns: 查询结果字典,无结果返回 None """ with self.get_cursor() as cursor: cursor.execute(sql, params) return cursor.fetchone() def fetch_all(self, sql: str, params: Union[List, Dict] = None) -> List[Dict[str, Any]]: """ 执行 SQL 查询,返回所有结果 Args: sql: SQL 查询语句 params: SQL 参数 Returns: 查询结果列表 """ with self.get_cursor() as cursor: cursor.execute(sql, params) return cursor.fetchall() def fetch_many(self, sql: str, size: int, params: Union[List, Dict] = None) -> List[Dict[str, Any]]: """ 执行 SQL 查询,返回指定数量的结果 Args: sql: SQL 查询语句 size: 返回结果数量 params: SQL 参数 Returns: 查询结果列表 """ with self.get_cursor() as cursor: cursor.execute(sql, params) return cursor.fetchmany(size) def bulk_insert(self, sql: str, params_list: List[Union[List, Dict]]) -> int: """ 批量插入数据 Args: sql: SQL 插入语句 params_list: 参数列表 Returns: 受影响的行数 """ with self.get_cursor() as cursor: cursor.executemany(sql, params_list) return cursor.rowcount def begin_transaction(self): """ 开始事务 Returns: 连接对象和游标对象 """ conn = self._get_connection() conn.begin() cursor = conn.cursor() return conn, cursor def commit_transaction(self, conn: pymysql.connections.Connection, cursor: pymysql.cursors.Cursor): """ 提交事务 Args: conn: 连接对象 cursor: 游标对象 """ try: conn.commit() finally: cursor.close() conn.close() def rollback_transaction(self, conn: pymysql.connections.Connection, cursor: pymysql.cursors.Cursor): """ 回滚事务 Args: conn: 连接对象 cursor: 游标对象 """ try: conn.rollback() finally: cursor.close() conn.close() def close(self): """ 关闭 SQL 执行器 """ # 关闭连接池 self._pool.close() # 简化的接口函数,便于快速使用 def get_mysql_conn(host: str = None, port: int = None, user: str = None, password: str = None, database: str = None, charset: str = None, pool_size: int = None, **kwargs) -> MySQLConnection: """ 获取 MySQL SQL 执行器实例 Args: host: MySQL 主机地址 port: MySQL 端口号 user: MySQL 用户名 password: MySQL 密码 database: 数据库名称 charset: 字符集 pool_size: 连接池大小 **kwargs: 其他 MySQL 连接参数 Returns: MySQL SQL 执行器实例 """ return MySQLConnection(host=host, port=port, user=user, password=password, database=database, charset=charset, pool_size=pool_size, **kwargs) def get_mysql_conn_with_pool(mysql_pool: MySQLPool) -> MySQLConnection: """ 使用指定的连接池获取 MySQL SQL 执行器实例 Args: mysql_pool: MySQL 连接池实例 Returns: MySQL SQL 执行器实例 """ return MySQLConnection(mysql_pool=mysql_pool)