mysql_conn.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. """
  2. MySQL SQL 执行器
  3. 该文件提供 MySQL 数据库 SQL 执行功能,支持:
  4. - 单例模式
  5. - 基本 CRUD 操作
  6. - 事务支持
  7. - 连接错误处理
  8. - 全局客户端管理
  9. """
  10. import pymysql
  11. from pymysql.cursors import DictCursor
  12. from typing import Any, List, Dict, Optional, Union
  13. from contextlib import contextmanager
  14. from .mysql_pool import get_mysql_pool, MySQLPool
  15. from utils.decorators.singleton import singleton
  16. @singleton
  17. class MySQLConnection:
  18. """
  19. MySQL SQL 执行器
  20. 支持:
  21. - 单例模式
  22. - 基本 CRUD 操作
  23. - 事务支持
  24. - 连接错误处理
  25. """
  26. def __init__(self, mysql_pool: Optional[MySQLPool] = None,
  27. host: str = None, port: int = None,
  28. user: str = None, password: str = None,
  29. database: str = None, charset: str = None,
  30. pool_size: int = None, **kwargs):
  31. """
  32. 初始化 MySQL SQL 执行器
  33. Args:
  34. mysql_pool: 可选的 MySQL 连接池实例,如果提供则使用该实例,否则创建新实例
  35. host: MySQL 主机地址
  36. port: MySQL 端口号
  37. user: MySQL 用户名
  38. password: MySQL 密码
  39. database: 数据库名称
  40. charset: 字符集
  41. pool_size: 连接池大小
  42. **kwargs: 其他 MySQL 连接参数
  43. """
  44. # 如果提供了连接池实例,则使用该实例,否则创建新实例
  45. if mysql_pool:
  46. self._pool = mysql_pool
  47. else:
  48. self._pool = get_mysql_pool(host, port, user, password, database, charset, pool_size, **kwargs)
  49. def _get_connection(self) -> pymysql.connections.Connection:
  50. """
  51. 从连接池获取连接
  52. Returns:
  53. MySQL 连接对象
  54. """
  55. return self._pool.get_connection()
  56. @contextmanager
  57. def get_cursor(self, cursorclass=DictCursor):
  58. """
  59. 获取游标上下文管理器
  60. Args:
  61. cursorclass: 游标类型
  62. Yields:
  63. MySQL 游标对象
  64. """
  65. conn = self._get_connection()
  66. cursor = conn.cursor(cursorclass=cursorclass)
  67. try:
  68. yield cursor
  69. conn.commit()
  70. except Exception as e:
  71. conn.rollback()
  72. raise e
  73. finally:
  74. cursor.close()
  75. conn.close()
  76. def execute(self, sql: str, params: Union[List, Dict] = None) -> int:
  77. """
  78. 执行 SQL 语句(用于 INSERT、UPDATE、DELETE)
  79. Args:
  80. sql: SQL 语句
  81. params: SQL 参数
  82. Returns:
  83. 受影响的行数
  84. """
  85. with self.get_cursor() as cursor:
  86. cursor.execute(sql, params)
  87. return cursor.rowcount
  88. def fetch_one(self, sql: str, params: Union[List, Dict] = None) -> Optional[Dict[str, Any]]:
  89. """
  90. 执行 SQL 查询,返回单行结果
  91. Args:
  92. sql: SQL 查询语句
  93. params: SQL 参数
  94. Returns:
  95. 查询结果字典,无结果返回 None
  96. """
  97. with self.get_cursor() as cursor:
  98. cursor.execute(sql, params)
  99. return cursor.fetchone()
  100. def fetch_all(self, sql: str, params: Union[List, Dict] = None) -> List[Dict[str, Any]]:
  101. """
  102. 执行 SQL 查询,返回所有结果
  103. Args:
  104. sql: SQL 查询语句
  105. params: SQL 参数
  106. Returns:
  107. 查询结果列表
  108. """
  109. with self.get_cursor() as cursor:
  110. cursor.execute(sql, params)
  111. return cursor.fetchall()
  112. def fetch_many(self, sql: str, size: int, params: Union[List, Dict] = None) -> List[Dict[str, Any]]:
  113. """
  114. 执行 SQL 查询,返回指定数量的结果
  115. Args:
  116. sql: SQL 查询语句
  117. size: 返回结果数量
  118. params: SQL 参数
  119. Returns:
  120. 查询结果列表
  121. """
  122. with self.get_cursor() as cursor:
  123. cursor.execute(sql, params)
  124. return cursor.fetchmany(size)
  125. def bulk_insert(self, sql: str, params_list: List[Union[List, Dict]]) -> int:
  126. """
  127. 批量插入数据
  128. Args:
  129. sql: SQL 插入语句
  130. params_list: 参数列表
  131. Returns:
  132. 受影响的行数
  133. """
  134. with self.get_cursor() as cursor:
  135. cursor.executemany(sql, params_list)
  136. return cursor.rowcount
  137. def begin_transaction(self):
  138. """
  139. 开始事务
  140. Returns:
  141. 连接对象和游标对象
  142. """
  143. conn = self._get_connection()
  144. conn.begin()
  145. cursor = conn.cursor()
  146. return conn, cursor
  147. def commit_transaction(self, conn: pymysql.connections.Connection, cursor: pymysql.cursors.Cursor):
  148. """
  149. 提交事务
  150. Args:
  151. conn: 连接对象
  152. cursor: 游标对象
  153. """
  154. try:
  155. conn.commit()
  156. finally:
  157. cursor.close()
  158. conn.close()
  159. def rollback_transaction(self, conn: pymysql.connections.Connection, cursor: pymysql.cursors.Cursor):
  160. """
  161. 回滚事务
  162. Args:
  163. conn: 连接对象
  164. cursor: 游标对象
  165. """
  166. try:
  167. conn.rollback()
  168. finally:
  169. cursor.close()
  170. conn.close()
  171. def close(self):
  172. """
  173. 关闭 SQL 执行器
  174. """
  175. # 关闭连接池
  176. self._pool.close()
  177. # 简化的接口函数,便于快速使用
  178. def get_mysql_conn(host: str = None, port: int = None,
  179. user: str = None, password: str = None,
  180. database: str = None, charset: str = None,
  181. pool_size: int = None, **kwargs) -> MySQLConnection:
  182. """
  183. 获取 MySQL SQL 执行器实例
  184. Args:
  185. host: MySQL 主机地址
  186. port: MySQL 端口号
  187. user: MySQL 用户名
  188. password: MySQL 密码
  189. database: 数据库名称
  190. charset: 字符集
  191. pool_size: 连接池大小
  192. **kwargs: 其他 MySQL 连接参数
  193. Returns:
  194. MySQL SQL 执行器实例
  195. """
  196. return MySQLConnection(host=host, port=port, user=user, password=password,
  197. database=database, charset=charset, pool_size=pool_size, **kwargs)
  198. def get_mysql_conn_with_pool(mysql_pool: MySQLPool) -> MySQLConnection:
  199. """
  200. 使用指定的连接池获取 MySQL SQL 执行器实例
  201. Args:
  202. mysql_pool: MySQL 连接池实例
  203. Returns:
  204. MySQL SQL 执行器实例
  205. """
  206. return MySQLConnection(mysql_pool=mysql_pool)