test_mysql_conn.py 3.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. from utils.mysql_conn import get_mysql_conn
  2. def test_mysql_connection():
  3. """测试MySQL连接池"""
  4. print("开始测试MySQL连接池...")
  5. # 获取MySQL连接管理器实例
  6. conn_manager = get_mysql_conn(
  7. host="localhost",
  8. port=3306,
  9. user="root",
  10. password="password",
  11. database="test"
  12. )
  13. print("✓ MySQL连接管理器实例创建成功")
  14. # 测试execute方法
  15. try:
  16. # 创建测试表
  17. create_table_sql = """
  18. CREATE TABLE IF NOT EXISTS test_users (
  19. id INT AUTO_INCREMENT PRIMARY KEY,
  20. name VARCHAR(50) NOT NULL,
  21. email VARCHAR(100) NOT NULL,
  22. create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
  23. )
  24. """
  25. rowcount = conn_manager.execute(create_table_sql)
  26. print(f"✓ 创建测试表成功,受影响行数: {rowcount}")
  27. # 插入测试数据
  28. insert_sql = "INSERT INTO test_users (name, email) VALUES (%s, %s)"
  29. params = ("测试用户", "test@example.com")
  30. rowcount = conn_manager.execute(insert_sql, params)
  31. print(f"✓ 插入测试数据成功,受影响行数: {rowcount}")
  32. # 测试fetch_one方法
  33. select_one_sql = "SELECT * FROM test_users ORDER BY id DESC LIMIT 1"
  34. result = conn_manager.fetch_one(select_one_sql)
  35. print(f"✓ 查询单条数据成功,结果: {result}")
  36. # 测试fetch_all方法
  37. select_all_sql = "SELECT * FROM test_users"
  38. results = conn_manager.fetch_all(select_all_sql)
  39. print(f"✓ 查询所有数据成功,结果数量: {len(results)}")
  40. # 测试fetch_many方法
  41. select_many_sql = "SELECT * FROM test_users"
  42. results = conn_manager.fetch_many(select_many_sql, size=2)
  43. print(f"✓ 查询多条数据成功,结果数量: {len(results)}")
  44. # 测试bulk_insert方法
  45. bulk_insert_sql = "INSERT INTO test_users (name, email) VALUES (%s, %s)"
  46. bulk_params = [
  47. ("批量用户1", "batch1@example.com"),
  48. ("批量用户2", "batch2@example.com"),
  49. ("批量用户3", "batch3@example.com")
  50. ]
  51. rowcount = conn_manager.bulk_insert(bulk_insert_sql, bulk_params)
  52. print(f"✓ 批量插入数据成功,受影响行数: {rowcount}")
  53. # 测试事务
  54. try:
  55. conn, cursor = conn_manager.begin_transaction()
  56. cursor.execute("INSERT INTO test_users (name, email) VALUES (%s, %s)", ("事务用户", "transaction@example.com"))
  57. conn_manager.commit_transaction(conn, cursor)
  58. print("✓ 事务提交成功")
  59. except Exception as e:
  60. conn_manager.rollback_transaction(conn, cursor)
  61. print(f"✗ 事务回滚: {e}")
  62. # 清理测试数据
  63. delete_sql = "DELETE FROM test_users"
  64. rowcount = conn_manager.execute(delete_sql)
  65. print(f"✓ 清理测试数据成功,受影响行数: {rowcount}")
  66. # 删除测试表
  67. drop_table_sql = "DROP TABLE IF EXISTS test_users"
  68. rowcount = conn_manager.execute(drop_table_sql)
  69. print(f"✓ 删除测试表成功,受影响行数: {rowcount}")
  70. print("\n🎉 所有测试通过!MySQL连接池工作正常。")
  71. except Exception as e:
  72. print(f"\n✗ 测试失败: {e}")
  73. # 测试连接池关闭
  74. conn_manager.close()
  75. print("✓ 连接池关闭成功")
  76. if __name__ == "__main__":
  77. test_mysql_connection()