mysql_conn.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308
  1. """
  2. MySQL 连接管理器
  3. 该文件提供 MySQL 数据库连接管理功能,支持:
  4. - 单例模式
  5. - 连接池管理
  6. - 基本 CRUD 操作
  7. - 事务支持
  8. - 连接错误处理
  9. """
  10. import pymysql
  11. from pymysql.cursors import DictCursor
  12. from typing import Any, List, Dict, Optional, Union
  13. from contextlib import contextmanager
  14. # 单例装饰器
  15. class singleton:
  16. def __init__(self, cls):
  17. self.cls = cls
  18. self._instance = None
  19. def __call__(self, *args, **kwargs):
  20. if self._instance is None:
  21. self._instance = self.cls(*args, **kwargs)
  22. return self._instance
  23. @singleton
  24. class MySQLConnection:
  25. """
  26. MySQL 连接管理器
  27. 支持:
  28. - 单例模式
  29. - 连接池管理
  30. - 基本 CRUD 操作
  31. - 事务支持
  32. - 连接错误处理
  33. """
  34. def __init__(self, host: str = "localhost", port: int = 3306,
  35. user: str = None, password: str = None,
  36. database: str = None, charset: str = "utf8mb4",
  37. pool_size: int = 5, **kwargs):
  38. """
  39. 初始化 MySQL 连接池
  40. Args:
  41. host: MySQL 主机地址
  42. port: MySQL 端口号
  43. user: MySQL 用户名
  44. password: MySQL 密码
  45. database: 数据库名称
  46. charset: 字符集
  47. pool_size: 连接池大小
  48. **kwargs: 其他 MySQL 连接参数
  49. """
  50. self.host = host
  51. self.port = port
  52. self.user = user
  53. self.password = password
  54. self.database = database
  55. self.charset = charset
  56. self.pool_size = pool_size
  57. self.kwargs = kwargs
  58. # 初始化连接池
  59. self._connection_pool = []
  60. self._init_connection_pool()
  61. def _init_connection_pool(self):
  62. """
  63. 初始化连接池
  64. """
  65. for _ in range(self.pool_size):
  66. conn = self._create_connection()
  67. if conn:
  68. self._connection_pool.append(conn)
  69. def _create_connection(self) -> Optional[pymysql.connections.Connection]:
  70. """
  71. 创建新的 MySQL 连接
  72. Returns:
  73. MySQL 连接对象,失败返回 None
  74. """
  75. try:
  76. conn = pymysql.connect(
  77. host=self.host,
  78. port=self.port,
  79. user=self.user,
  80. password=self.password,
  81. database=self.database,
  82. charset=self.charset,
  83. cursorclass=DictCursor,
  84. **self.kwargs
  85. )
  86. return conn
  87. except pymysql.Error as e:
  88. print(f"创建 MySQL 连接失败: {e}")
  89. return None
  90. def _get_connection(self) -> Optional[pymysql.connections.Connection]:
  91. """
  92. 从连接池获取连接
  93. Returns:
  94. MySQL 连接对象,失败返回 None
  95. """
  96. if self._connection_pool:
  97. return self._connection_pool.pop()
  98. else:
  99. # 连接池为空,创建新连接
  100. return self._create_connection()
  101. def _return_connection(self, conn: pymysql.connections.Connection):
  102. """
  103. 将连接返回连接池
  104. Args:
  105. conn: MySQL 连接对象
  106. """
  107. if len(self._connection_pool) < self.pool_size:
  108. self._connection_pool.append(conn)
  109. else:
  110. # 连接池已满,关闭连接
  111. conn.close()
  112. @contextmanager
  113. def get_cursor(self, cursorclass=DictCursor):
  114. """
  115. 获取游标上下文管理器
  116. Args:
  117. cursorclass: 游标类型
  118. Yields:
  119. MySQL 游标对象
  120. """
  121. conn = self._get_connection()
  122. if not conn:
  123. raise Exception("无法获取 MySQL 连接")
  124. try:
  125. cursor = conn.cursor(cursorclass=cursorclass)
  126. yield cursor
  127. conn.commit()
  128. except Exception as e:
  129. conn.rollback()
  130. raise e
  131. finally:
  132. cursor.close()
  133. self._return_connection(conn)
  134. def execute(self, sql: str, params: Union[List, Dict] = None) -> int:
  135. """
  136. 执行 SQL 语句(用于 INSERT、UPDATE、DELETE)
  137. Args:
  138. sql: SQL 语句
  139. params: SQL 参数
  140. Returns:
  141. 受影响的行数
  142. """
  143. with self.get_cursor() as cursor:
  144. cursor.execute(sql, params)
  145. return cursor.rowcount
  146. def fetch_one(self, sql: str, params: Union[List, Dict] = None) -> Optional[Dict[str, Any]]:
  147. """
  148. 执行 SQL 查询,返回单行结果
  149. Args:
  150. sql: SQL 查询语句
  151. params: SQL 参数
  152. Returns:
  153. 查询结果字典,无结果返回 None
  154. """
  155. with self.get_cursor() as cursor:
  156. cursor.execute(sql, params)
  157. return cursor.fetchone()
  158. def fetch_all(self, sql: str, params: Union[List, Dict] = None) -> List[Dict[str, Any]]:
  159. """
  160. 执行 SQL 查询,返回所有结果
  161. Args:
  162. sql: SQL 查询语句
  163. params: SQL 参数
  164. Returns:
  165. 查询结果列表
  166. """
  167. with self.get_cursor() as cursor:
  168. cursor.execute(sql, params)
  169. return cursor.fetchall()
  170. def fetch_many(self, sql: str, size: int, params: Union[List, Dict] = None) -> List[Dict[str, Any]]:
  171. """
  172. 执行 SQL 查询,返回指定数量的结果
  173. Args:
  174. sql: SQL 查询语句
  175. size: 返回结果数量
  176. params: SQL 参数
  177. Returns:
  178. 查询结果列表
  179. """
  180. with self.get_cursor() as cursor:
  181. cursor.execute(sql, params)
  182. return cursor.fetchmany(size)
  183. def bulk_insert(self, sql: str, params_list: List[Union[List, Dict]]) -> int:
  184. """
  185. 批量插入数据
  186. Args:
  187. sql: SQL 插入语句
  188. params_list: 参数列表
  189. Returns:
  190. 受影响的行数
  191. """
  192. with self.get_cursor() as cursor:
  193. cursor.executemany(sql, params_list)
  194. return cursor.rowcount
  195. def begin_transaction(self):
  196. """
  197. 开始事务
  198. Returns:
  199. 连接对象和游标对象
  200. """
  201. conn = self._get_connection()
  202. if not conn:
  203. raise Exception("无法获取 MySQL 连接")
  204. try:
  205. conn.begin()
  206. cursor = conn.cursor()
  207. return conn, cursor
  208. except Exception as e:
  209. self._return_connection(conn)
  210. raise e
  211. def commit_transaction(self, conn: pymysql.connections.Connection, cursor: pymysql.cursors.Cursor):
  212. """
  213. 提交事务
  214. Args:
  215. conn: 连接对象
  216. cursor: 游标对象
  217. """
  218. try:
  219. conn.commit()
  220. finally:
  221. cursor.close()
  222. self._return_connection(conn)
  223. def rollback_transaction(self, conn: pymysql.connections.Connection, cursor: pymysql.cursors.Cursor):
  224. """
  225. 回滚事务
  226. Args:
  227. conn: 连接对象
  228. cursor: 游标对象
  229. """
  230. try:
  231. conn.rollback()
  232. finally:
  233. cursor.close()
  234. self._return_connection(conn)
  235. def close(self):
  236. """
  237. 关闭连接池中的所有连接
  238. """
  239. for conn in self._connection_pool:
  240. try:
  241. conn.close()
  242. except Exception as e:
  243. print(f"关闭 MySQL 连接失败: {e}")
  244. self._connection_pool.clear()
  245. # 简化的接口函数,便于快速使用
  246. def get_mysql_conn(host: str = "localhost", port: int = 3306,
  247. user: str = None, password: str = None,
  248. database: str = None, charset: str = "utf8mb4",
  249. pool_size: int = 5, **kwargs) -> MySQLConnection:
  250. """
  251. 获取 MySQL 连接管理器实例
  252. Args:
  253. host: MySQL 主机地址
  254. port: MySQL 端口号
  255. user: MySQL 用户名
  256. password: MySQL 密码
  257. database: 数据库名称
  258. charset: 字符集
  259. pool_size: 连接池大小
  260. **kwargs: 其他 MySQL 连接参数
  261. Returns:
  262. MySQL 连接管理器实例
  263. """
  264. return MySQLConnection(host, port, user, password, database, charset, pool_size, **kwargs)