| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244 |
- """
- MySQL SQL 执行器
- 该文件提供 MySQL 数据库 SQL 执行功能,支持:
- - 单例模式
- - 基本 CRUD 操作
- - 事务支持
- - 连接错误处理
- - 全局客户端管理
- """
- import pymysql
- from pymysql.cursors import DictCursor
- from typing import Any, List, Dict, Optional, Union
- from contextlib import contextmanager
- from .mysql_pool import get_mysql_pool, MySQLPool
- from utils.decorators.singleton import singleton
- @singleton
- class MySQLConnection:
- """
- MySQL SQL 执行器
- 支持:
- - 单例模式
- - 基本 CRUD 操作
- - 事务支持
- - 连接错误处理
- """
-
- def __init__(self, mysql_pool: Optional[MySQLPool] = None,
- host: str = None, port: int = None,
- user: str = None, password: str = None,
- database: str = None, charset: str = None,
- pool_size: int = None, **kwargs):
- """
- 初始化 MySQL SQL 执行器
-
- Args:
- mysql_pool: 可选的 MySQL 连接池实例,如果提供则使用该实例,否则创建新实例
- host: MySQL 主机地址
- port: MySQL 端口号
- user: MySQL 用户名
- password: MySQL 密码
- database: 数据库名称
- charset: 字符集
- pool_size: 连接池大小
- **kwargs: 其他 MySQL 连接参数
- """
- # 如果提供了连接池实例,则使用该实例,否则创建新实例
- if mysql_pool:
- self._pool = mysql_pool
- else:
- self._pool = get_mysql_pool(host, port, user, password, database, charset, pool_size, **kwargs)
-
- def _get_connection(self) -> pymysql.connections.Connection:
- """
- 从连接池获取连接
-
- Returns:
- MySQL 连接对象
- """
- return self._pool.get_connection()
-
- @contextmanager
- def get_cursor(self, cursorclass=DictCursor):
- """
- 获取游标上下文管理器
-
- Args:
- cursorclass: 游标类型
-
- Yields:
- MySQL 游标对象
- """
- conn = self._get_connection()
- cursor = conn.cursor(cursorclass=cursorclass)
-
- try:
- yield cursor
- conn.commit()
- except Exception as e:
- conn.rollback()
- raise e
- finally:
- cursor.close()
- conn.close()
-
- def execute(self, sql: str, params: Union[List, Dict] = None) -> int:
- """
- 执行 SQL 语句(用于 INSERT、UPDATE、DELETE)
-
- Args:
- sql: SQL 语句
- params: SQL 参数
-
- Returns:
- 受影响的行数
- """
- with self.get_cursor() as cursor:
- cursor.execute(sql, params)
- return cursor.rowcount
-
- def fetch_one(self, sql: str, params: Union[List, Dict] = None) -> Optional[Dict[str, Any]]:
- """
- 执行 SQL 查询,返回单行结果
-
- Args:
- sql: SQL 查询语句
- params: SQL 参数
-
- Returns:
- 查询结果字典,无结果返回 None
- """
- with self.get_cursor() as cursor:
- cursor.execute(sql, params)
- return cursor.fetchone()
-
- def fetch_all(self, sql: str, params: Union[List, Dict] = None) -> List[Dict[str, Any]]:
- """
- 执行 SQL 查询,返回所有结果
-
- Args:
- sql: SQL 查询语句
- params: SQL 参数
-
- Returns:
- 查询结果列表
- """
- with self.get_cursor() as cursor:
- cursor.execute(sql, params)
- return cursor.fetchall()
-
- def fetch_many(self, sql: str, size: int, params: Union[List, Dict] = None) -> List[Dict[str, Any]]:
- """
- 执行 SQL 查询,返回指定数量的结果
-
- Args:
- sql: SQL 查询语句
- size: 返回结果数量
- params: SQL 参数
-
- Returns:
- 查询结果列表
- """
- with self.get_cursor() as cursor:
- cursor.execute(sql, params)
- return cursor.fetchmany(size)
-
- def bulk_insert(self, sql: str, params_list: List[Union[List, Dict]]) -> int:
- """
- 批量插入数据
-
- Args:
- sql: SQL 插入语句
- params_list: 参数列表
-
- Returns:
- 受影响的行数
- """
- with self.get_cursor() as cursor:
- cursor.executemany(sql, params_list)
- return cursor.rowcount
-
- def begin_transaction(self):
- """
- 开始事务
-
- Returns:
- 连接对象和游标对象
- """
- conn = self._get_connection()
- conn.begin()
- cursor = conn.cursor()
- return conn, cursor
-
- def commit_transaction(self, conn: pymysql.connections.Connection, cursor: pymysql.cursors.Cursor):
- """
- 提交事务
-
- Args:
- conn: 连接对象
- cursor: 游标对象
- """
- try:
- conn.commit()
- finally:
- cursor.close()
- conn.close()
-
- def rollback_transaction(self, conn: pymysql.connections.Connection, cursor: pymysql.cursors.Cursor):
- """
- 回滚事务
-
- Args:
- conn: 连接对象
- cursor: 游标对象
- """
- try:
- conn.rollback()
- finally:
- cursor.close()
- conn.close()
-
- def close(self):
- """
- 关闭 SQL 执行器
- """
- # 关闭连接池
- self._pool.close()
- # 简化的接口函数,便于快速使用
- def get_mysql_conn(host: str = None, port: int = None,
- user: str = None, password: str = None,
- database: str = None, charset: str = None,
- pool_size: int = None, **kwargs) -> MySQLConnection:
- """
- 获取 MySQL SQL 执行器实例
-
- Args:
- host: MySQL 主机地址
- port: MySQL 端口号
- user: MySQL 用户名
- password: MySQL 密码
- database: 数据库名称
- charset: 字符集
- pool_size: 连接池大小
- **kwargs: 其他 MySQL 连接参数
-
- Returns:
- MySQL SQL 执行器实例
- """
- return MySQLConnection(host=host, port=port, user=user, password=password,
- database=database, charset=charset, pool_size=pool_size, **kwargs)
- def get_mysql_conn_with_pool(mysql_pool: MySQLPool) -> MySQLConnection:
- """
- 使用指定的连接池获取 MySQL SQL 执行器实例
-
- Args:
- mysql_pool: MySQL 连接池实例
-
- Returns:
- MySQL SQL 执行器实例
- """
- return MySQLConnection(mysql_pool=mysql_pool)
|