from utils.mysql_conn import get_mysql_conn def test_mysql_connection(): """测试MySQL连接池""" print("开始测试MySQL连接池...") # 获取MySQL连接管理器实例 conn_manager = get_mysql_conn( host="localhost", port=3306, user="root", password="password", database="test" ) print("✓ MySQL连接管理器实例创建成功") # 测试execute方法 try: # 创建测试表 create_table_sql = """ CREATE TABLE IF NOT EXISTS test_users ( id INT AUTO_INCREMENT PRIMARY KEY, name VARCHAR(50) NOT NULL, email VARCHAR(100) NOT NULL, create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """ rowcount = conn_manager.execute(create_table_sql) print(f"✓ 创建测试表成功,受影响行数: {rowcount}") # 插入测试数据 insert_sql = "INSERT INTO test_users (name, email) VALUES (%s, %s)" params = ("测试用户", "test@example.com") rowcount = conn_manager.execute(insert_sql, params) print(f"✓ 插入测试数据成功,受影响行数: {rowcount}") # 测试fetch_one方法 select_one_sql = "SELECT * FROM test_users ORDER BY id DESC LIMIT 1" result = conn_manager.fetch_one(select_one_sql) print(f"✓ 查询单条数据成功,结果: {result}") # 测试fetch_all方法 select_all_sql = "SELECT * FROM test_users" results = conn_manager.fetch_all(select_all_sql) print(f"✓ 查询所有数据成功,结果数量: {len(results)}") # 测试fetch_many方法 select_many_sql = "SELECT * FROM test_users" results = conn_manager.fetch_many(select_many_sql, size=2) print(f"✓ 查询多条数据成功,结果数量: {len(results)}") # 测试bulk_insert方法 bulk_insert_sql = "INSERT INTO test_users (name, email) VALUES (%s, %s)" bulk_params = [ ("批量用户1", "batch1@example.com"), ("批量用户2", "batch2@example.com"), ("批量用户3", "batch3@example.com") ] rowcount = conn_manager.bulk_insert(bulk_insert_sql, bulk_params) print(f"✓ 批量插入数据成功,受影响行数: {rowcount}") # 测试事务 try: conn, cursor = conn_manager.begin_transaction() cursor.execute("INSERT INTO test_users (name, email) VALUES (%s, %s)", ("事务用户", "transaction@example.com")) conn_manager.commit_transaction(conn, cursor) print("✓ 事务提交成功") except Exception as e: conn_manager.rollback_transaction(conn, cursor) print(f"✗ 事务回滚: {e}") # 清理测试数据 delete_sql = "DELETE FROM test_users" rowcount = conn_manager.execute(delete_sql) print(f"✓ 清理测试数据成功,受影响行数: {rowcount}") # 删除测试表 drop_table_sql = "DROP TABLE IF EXISTS test_users" rowcount = conn_manager.execute(drop_table_sql) print(f"✓ 删除测试表成功,受影响行数: {rowcount}") print("\n🎉 所有测试通过!MySQL连接池工作正常。") except Exception as e: print(f"\n✗ 测试失败: {e}") # 测试连接池关闭 conn_manager.close() print("✓ 连接池关闭成功") if __name__ == "__main__": test_mysql_connection()