学习ORMX的事务处理功能,包括事务基础概念、ACID特性、隔离级别、ITransaction接口、手动事务管理、保存点和实际应用场景。 关键字:事务处理, ACID特性, 隔离级别, ITransaction接口, 保存点, 嵌套事务
第十章:事务处理
10.1 事务基础概念
10.1.1 什么是事务
事务(Transaction)是数据库操作的基本单位,它是一个逻辑工作单元,包含一系列操作。这些操作要么全部成功,要么全部失败。
10.1.2 ACID 特性
事务具有四个核心特性(ACID):
- 原子性(Atomicity):事务中的所有操作要么全部成功,要么全部失败
- 一致性(Consistency):事务执行前后,数据库从一个一致性状态变换到另一个一致性状态
- 隔离性(Isolation):多个事务并发执行时,一个事务的执行不应影响其他事务
- 持久性(Durability):事务一旦提交,其结果就是永久性的
10.1.3 为什么需要事务
在以下场景中,必须使用事务:
- 银行转账:从一个账户扣款,向另一个账户加款
- 订单处理:创建订单、扣减库存、记录日志
- 批量导入:插入大量数据,要么全部成功,要么全部失败
- 多表操作:涉及多个表的更新操作
10.2 事务隔离级别
10.2.1 隔离级别类型
.NET 提供了以下事务隔离级别:
public enum IsolationLevel
{
Unspecified, // 未指定
Chaos, // 混乱
ReadUncommitted, // 读未提交
ReadCommitted, // 读已提交(默认)
RepeatableRead, // 可重复读
Serializable, // 可串行化
Snapshot // 快照
}
10.2.2 隔离级别说明
| 隔离级别 | 脏读 | 不可重复读 | 幻读 | 说明 |
|---|---|---|---|---|
| ReadUncommitted | 可能 | 可能 | 可能 | 最低隔离级别,性能最好 |
| ReadCommitted | 不可能 | 可能 | 可能 | 默认级别,平衡性能和一致性 |
| RepeatableRead | 不可能 | 不可能 | 可能 | 防止不可重复读 |
| Serializable | 不可能 | 不可能 | 不可能 | 最高隔离级别,性能最差 |
| Snapshot | 不可能 | 不可能 | 不可能 | 使用快照技术,性能较好 |
10.2.3 选择隔离级别
// 读已提交(默认)
transaction.Begin(IsolationLevel.ReadCommitted);
// 可重复读
transaction.Begin(IsolationLevel.RepeatableRead);
// 可串行化
transaction.Begin(IsolationLevel.Serializable);
10.3 ITransaction 接口
10.3.1 接口定义
public interface ITransaction : IDisposable
{
/// <summary>
/// 开始事务
/// </summary>
/// <param name="isolationLevel">隔离级别</param>
void Begin(IsolationLevel isolationLevel = IsolationLevel.ReadCommitted);
/// <summary>
/// 提交事务
/// </summary>
void Commit();
/// <summary>
/// 回滚事务
/// </summary>
void Rollback();
/// <summary>
/// 创建保存点
/// </summary>
/// <param name="name">保存点名称</param>
void Save(string name);
/// <summary>
/// 回滚到保存点
/// </summary>
/// <param name="name">保存点名称</param>
void RollbackTo(string name);
}
10.3.2 Transaction 类
using JCode.ORMX.Abstractions;
using JCode.ORMX.Core;
// 创建事务
using var transaction = new Transaction(tableManager);
transaction.Begin(IsolationLevel.ReadCommitted);
try
{
// 执行操作
userTable.Insert(user);
orderTable.Insert(order);
// 提交事务
transaction.Commit();
}
catch (Exception ex)
{
// 回滚事务
transaction.Rollback();
Console.WriteLine(<div class="latex">$"事务失败:{ex.Message}");
}
10.4 手动事务管理
10.4.1 基本事务操作
using JCode.ORMX.Abstractions;
using JCode.ORMX.Core;
// 创建事务
using var transaction = new Transaction(tableManager);
transaction.Begin(IsolationLevel.ReadCommitted);
try
{
// 执行操作
userTable.Insert(user1);
userTable.Insert(user2);
userTable.Insert(user3);
// 提交事务
transaction.Commit();
Console.WriteLine("事务提交成功");
}
catch (Exception ex)
{
// 回滚事务
transaction.Rollback();
Console.WriteLine($</div>"事务回滚:{ex.Message}");
}
10.4.2 嵌套事务
// 外层事务
using var outerTransaction = new Transaction(tableManager);
outerTransaction.Begin(IsolationLevel.ReadCommitted);
try
{
// 执行操作
userTable.Insert(user1);
// 内层事务
using var innerTransaction = new Transaction(tableManager);
innerTransaction.Begin(IsolationLevel.ReadCommitted);
try
{
// 执行操作
userTable.Insert(user2);
userTable.Insert(user3);
// 提交内层事务
innerTransaction.Commit();
}
catch (Exception ex)
{
// 回滚内层事务
innerTransaction.Rollback();
Console.WriteLine(<div class="latex">$"内层事务回滚:{ex.Message}");
}
// 提交外层事务
outerTransaction.Commit();
Console.WriteLine("外层事务提交成功");
}
catch (Exception ex)
{
// 回滚外层事务
outerTransaction.Rollback();
Console.WriteLine($</div>"外层事务回滚:{ex.Message}");
}
10.5 保存点(Save Point)
10.5.1 创建保存点
using var transaction = new Transaction(tableManager);
transaction.Begin(IsolationLevel.ReadCommitted);
try
{
// 操作 1
userTable.Insert(user1);
// 创建保存点
transaction.Save("point1");
// 操作 2
userTable.Insert(user2);
// 操作 3
userTable.Insert(user3);
// 提交事务
transaction.Commit();
}
catch (Exception ex)
{
// 回滚到保存点
transaction.RollbackTo("point1");
Console.WriteLine(<div class="latex">$"回滚到保存点:{ex.Message}");
}
10.5.2 多个保存点
using var transaction = new Transaction(tableManager);
transaction.Begin(IsolationLevel.ReadCommitted);
try
{
// 操作 1
userTable.Insert(user1);
transaction.Save("point1");
// 操作 2
userTable.Insert(user2);
transaction.Save("point2");
// 操作 3
userTable.Insert(user3);
transaction.Save("point3");
// 操作 4
userTable.Insert(user4);
// 提交事务
transaction.Commit();
}
catch (Exception ex)
{
// 根据错误类型回滚到不同的保存点
if (ex is InvalidOperationException)
{
transaction.RollbackTo("point2");
}
else
{
transaction.RollbackTo("point1");
}
Console.WriteLine($</div>"回滚到保存点:{ex.Message}");
}
10.6 实际应用场景
10.6.1 银行转账
using System;
using JCode.ORMX.Attributes;
using JCode.ORMX.DbProvider;
using JCode.ORMX.Core;
using Microsoft.Data.Sqlite;
[Table(Name = "Accounts")]
public class Account
{
[Column(IsPrimaryKey = true)]
public int Id { get; set; }
public string Name { get; set; }
public decimal Balance { get; set; }
}
class Program
{
static void Main(string[] args)
{
// 创建数据库提供程序(推荐方式)
using var provider = new SqliteDatabaseProvider("Data Source=:memory:");
var tableManager = provider.GetTableManager();
// 初始化账户
var accountTable = tableManager.Table<Account>();
accountTable.InsertAll(
new Account { Id = 1, Name = "张三", Balance = 1000 },
new Account { Id = 2, Name = "李四", Balance = 500 }
);
// 转账操作
Console.WriteLine("=== 开始转账 ===");
using var transaction = new Transaction(tableManager);
transaction.Begin(IsolationLevel.ReadCommitted);
try
{
// 转账金额
decimal amount = 200;
// 扣款
var fromAccount = accountTable.Where(a => a.Id == 1).FirstOrDefault();
if (fromAccount.Balance < amount)
{
throw new InvalidOperationException("余额不足");
}
fromAccount.Balance -= amount;
accountTable.Update(fromAccount);
// 加款
var toAccount = accountTable.Where(a => a.Id == 2).FirstOrDefault();
toAccount.Balance += amount;
accountTable.Update(toAccount);
// 提交事务
transaction.Commit();
Console.WriteLine("转账成功");
// 查询余额
var newFromAccount = accountTable.Where(a => a.Id == 1).FirstOrDefault();
var newToAccount = accountTable.Where(a => a.Id == 2).FirstOrDefault();
Console.WriteLine(<div class="latex">$"张三余额:{newFromAccount.Balance}");
Console.WriteLine($</div>"李四余额:{newToAccount.Balance}");
}
catch (Exception ex)
{
// 回滚事务
transaction.Rollback();
Console.WriteLine(<div class="latex">$"转账失败:{ex.Message}");
// 查询余额(应该保持不变)
var fromAccount = accountTable.Where(a => a.Id == 1).FirstOrDefault();
var toAccount = accountTable.Where(a => a.Id == 2).FirstOrDefault();
Console.WriteLine($</div>"张三余额:{fromAccount.Balance}");
Console.WriteLine(<div class="latex">$"李四余额:{toAccount.Balance}");
}
}
}
10.6.2 订单处理
[Table(Name = "Orders")]
public class Order
{
[Column(IsPrimaryKey = true, IsAutoIncrement = true)]
public int Id { get; set; }
public int UserId { get; set; }
public decimal Amount { get; set; }
public DateTime CreatedAt { get; set; }
}
[Table(Name = "OrderItems")]
public class OrderItem
{
[Column(IsPrimaryKey = true, IsAutoIncrement = true)]
public int Id { get; set; }
public int OrderId { get; set; }
public int ProductId { get; set; }
public int Quantity { get; set; }
public decimal Price { get; set; }
}
[Table(Name = "Products")]
public class Product
{
[Column(IsPrimaryKey = true)]
public int Id { get; set; }
public string Name { get; set; }
public int Stock { get; set; }
public decimal Price { get; set; }
}
class OrderService
{
private readonly ITableManager _tableManager;
public OrderService(ITableManager tableManager)
{
_tableManager = tableManager;
}
public void CreateOrder(int userId, List<(int productId, int quantity)> items)
{
using var transaction = new Transaction(_tableManager);
transaction.Begin(IsolationLevel.Serializable);
try
{
using var productTable = _tableManager.Table<Product>();
using var orderTable = _tableManager.Table<Order>();
using var orderItemTable = _tableManager.Table<OrderItem>();
// 1. 创建订单
var order = new Order
{
UserId = userId,
Amount = 0,
CreatedAt = DateTime.Now
};
order = orderTable.Insert(order);
// 2. 创建订单项并扣减库存
foreach (var (productId, quantity) in items)
{
// 检查库存
var product = productTable.Where(p => p.Id == productId).FirstOrDefault();
if (product == null)
{
throw new InvalidOperationException($</div>"商品 {productId} 不存在");
}
if (product.Stock < quantity)
{
throw new InvalidOperationException(<div class="latex">$"商品 {product.Name} 库存不足");
}
// 扣减库存
product.Stock -= quantity;
productTable.Update(product);
// 创建订单项
var orderItem = new OrderItem
{
OrderId = order.Id,
ProductId = productId,
Quantity = quantity,
Price = product.Price
};
orderItemTable.Insert(orderItem);
// 计算订单金额
order.Amount += product.Price * quantity;
}
// 3. 更新订单金额
orderTable.Update(order);
// 4. 提交事务
transaction.Commit();
Console.WriteLine($</div>"订单 {order.Id} 创建成功,金额:{order.Amount}");
}
catch (Exception ex)
{
// 回滚事务
transaction.Rollback();
Console.WriteLine(<div class="latex">$"订单创建失败:{ex.Message}");
throw;
}
}
}
10.6.3 批量导入
class ImportService
{
private readonly ITableManager _tableManager;
public ImportService(ITableManager tableManager)
{
_tableManager = tableManager;
}
public void ImportUsers(List<User> users)
{
using var transaction = new Transaction(_tableManager);
transaction.Begin(IsolationLevel.ReadCommitted);
try
{
using var userTable = _tableManager.Table<User>();
// 批量插入
userTable.InsertAll(users);
// 提交事务
transaction.Commit();
Console.WriteLine($</div>"成功导入 {users.Count} 个用户");
}
catch (Exception ex)
{
// 回滚事务
transaction.Rollback();
Console.WriteLine(<div class="latex">$"导入失败:{ex.Message}");
throw;
}
}
}
10.7 事务最佳实践
10.7.1 保持事务简短
// 推荐:事务只包含必要的数据库操作
using var transaction = new Transaction(tableManager);
transaction.Begin(IsolationLevel.ReadCommitted);
try
{
userTable.Insert(user);
orderTable.Insert(order);
transaction.Commit();
}
catch (Exception ex)
{
transaction.Rollback();
throw;
}
// 不推荐:事务中包含非数据库操作
using var transaction = new Transaction(tableManager);
transaction.Begin(IsolationLevel.ReadCommitted);
try
{
userTable.Insert(user);
// 不应该在事务中进行的操作
SendEmail(user.Email, "欢迎注册");
orderTable.Insert(order);
transaction.Commit();
}
catch (Exception ex)
{
transaction.Rollback();
throw;
}
10.7.2 使用适当的隔离级别
// 读操作:使用 ReadCommitted
transaction.Begin(IsolationLevel.ReadCommitted);
// 写操作:使用 RepeatableRead
transaction.Begin(IsolationLevel.RepeatableRead);
// 关键操作:使用 Serializable
transaction.Begin(IsolationLevel.Serializable);
10.7.3 始终处理异常
using var transaction = new Transaction(tableManager);
transaction.Begin(IsolationLevel.ReadCommitted);
try
{
// 执行操作
userTable.Insert(user);
orderTable.Insert(order);
transaction.Commit();
}
catch (Exception ex)
{
transaction.Rollback();
Console.WriteLine($</div>"事务失败:{ex.Message}");
throw;
}
10.7.4 使用 using 语句
// 推荐:使用 using 语句
using var transaction = new Transaction(tableManager);
transaction.Begin(IsolationLevel.ReadCommitted);
try
{
// 执行操作
transaction.Commit();
}
catch (Exception ex)
{
transaction.Rollback();
throw;
}
// 不推荐:手动管理资源
var transaction = new Transaction(tableManager);
transaction.Begin(IsolationLevel.ReadCommitted);
try
{
// 执行操作
transaction.Commit();
}
catch (Exception ex)
{
transaction.Rollback();
throw;
}
finally
{
transaction.Dispose();
}
10.7.5 MongoDB 事务注意事项
MongoDB 事务支持:
- 副本集模式:MongoDB 4.0+ 支持在副本集模式下使用事务
- 分片集群模式:MongoDB 4.2+ 支持在分片集群模式下使用事务
- Standalone 模式:不支持事务,ORMX 会自动使用模拟回滚
MongoDB 事务内部机制:
ORMX 内部使用 MongoDB 驱动的会话(Session)和事务 API 实现事务支持:
- 会话创建:事务开始时,ORMX 会创建一个 MongoDB 会话
- 事务检测:自动检测 MongoDB 实例是否支持事务
- 会话传播:将会话传递给所有相关的数据库操作
- 事务提交/回滚:根据操作结果提交或回滚事务
- 资源管理:自动管理会话和事务资源
详细使用示例:
using JCode.ORMX.DbProvider.MongoDB;
// 创建 MongoDB 提供程序(使用副本集连接)
using var provider = new MongoDBProvider("mongodb://host1:27017,host2:27017,host3:27017/?replicaSet=myReplicaSet", "mydb");
var tableManager = provider.GetTableManager();
// 创建事务
using var transaction = new Transaction(tableManager);
transaction.Begin(IsolationLevel.ReadCommitted);
try
{
var userTable = tableManager.Table<User>();
var orderTable = tableManager.Table<Order>();
// 插入用户
var user = new User { Name = "张三", Email = "zhangsan@example.com", Age = 25 };
user = userTable.Insert(user);
// 插入订单
var order = new Order { UserId = user.Id, Amount = 100, CreatedAt = DateTime.Now };
order = orderTable.Insert(order);
// 提交事务
transaction.Commit();
Console.WriteLine("事务提交成功");
Console.WriteLine(<div class="latex">$"创建用户:{user.Name},订单ID:{order.Id}");
}
catch (Exception ex)
{
// 回滚事务
transaction.Rollback();
Console.WriteLine($</div>"事务失败:{ex.Message}");
}
MongoDB 事务限制:
- 写操作限制:单个事务最多支持 1000 个写操作
- 执行时间:事务执行时间不应超过 60 秒
- 操作范围:事务中的操作应尽可能简短
- 资源消耗:事务会占用更多的服务器资源
- 并发性能:长时间运行的事务会影响并发性能
- 网络开销:事务需要额外的网络往返
模拟回滚机制:
当 MongoDB 不支持事务时(如 standalone 模式),ORMX 会自动使用模拟回滚:
- 记录操作:跟踪事务中执行的所有插入操作
- 回滚处理:当需要回滚时,删除所有插入的文档
- 限制:对于更新和删除操作,无法完全回滚
- 适用场景:主要用于测试环境和非关键操作
MongoDB 事务最佳实践:
- 使用副本集:生产环境务必使用 MongoDB 副本集,获得完整事务支持
- 保持简短:事务操作应在 1-2 秒内完成
- 批量操作:将多个相关操作合并到一个事务中
- 错误处理:始终捕获并处理事务异常
- 资源管理:使用
using语句管理事务资源 - 监控性能:定期监控事务执行时间和成功率
- 合理设计:避免在事务中进行网络调用或复杂计算
事务隔离级别支持:
MongoDB 仅支持 ReadCommitted 隔离级别,即使在代码中指定其他隔离级别,ORMX 也会自动使用 ReadCommitted。
实际应用场景示例:
// MongoDB 事务 - 订单处理示例
public void ProcessOrder(int userId, List<OrderItemDto> items)
{
using var provider = new MongoDBProvider("mongodb://localhost:27017", "ecommerce");
var tableManager = provider.GetTableManager();
using var transaction = new Transaction(tableManager);
transaction.Begin(IsolationLevel.ReadCommitted);
try
{
var userTable = tableManager.Table<User>();
var productTable = tableManager.Table<Product>();
var orderTable = tableManager.Table<Order>();
var orderItemTable = tableManager.Table<OrderItem>();
// 验证用户
var user = userTable.Where(u => u.Id == userId).FirstOrDefault();
if (user == null)
{
throw new InvalidOperationException("用户不存在");
}
// 创建订单
var order = new Order
{
UserId = userId,
TotalAmount = 0,
Status = "Pending",
CreatedAt = DateTime.Now
};
order = orderTable.Insert(order);
// 处理订单项
decimal totalAmount = 0;
foreach (var item in items)
{
// 验证商品
var product = productTable.Where(p => p.Id == item.ProductId).FirstOrDefault();
if (product == null)
{
throw new InvalidOperationException(<div class="latex">$"商品 {item.ProductId} 不存在");
}
// 检查库存
if (product.Stock < item.Quantity)
{
throw new InvalidOperationException($</div>"商品 {product.Name} 库存不足");
}
// 扣减库存
product.Stock -= item.Quantity;
productTable.Update(product);
// 创建订单项
var orderItem = new OrderItem
{
OrderId = order.Id,
ProductId = item.ProductId,
Quantity = item.Quantity,
UnitPrice = product.Price,
Subtotal = product.Price * item.Quantity
};
orderItemTable.Insert(orderItem);
totalAmount += orderItem.Subtotal;
}
// 更新订单金额
order.TotalAmount = totalAmount;
orderTable.Update(order);
// 提交事务
transaction.Commit();
Console.WriteLine(<div class="latex">$"订单 {order.Id} 处理成功,总金额:{totalAmount}");
}
catch (Exception ex)
{
transaction.Rollback();
Console.WriteLine($</div>"订单处理失败:{ex.Message}");
throw;
}
}
性能优化建议:
- 批量操作:将多个小操作合并为批量操作
- 索引优化:为频繁查询的字段创建索引
- 连接池:合理配置 MongoDB 连接池
- 监控:使用 MongoDB 监控工具跟踪事务性能
- 错误重试:对临时性错误实现重试机制
- 事务大小:控制事务中操作的数量和数据量
总结
本章介绍了 ORMX 框架的事务处理功能,包括事务基础概念、ACID 特性、隔离级别、ITransaction 接口、手动事务管理、保存点和实际应用场景。事务是保证数据一致性的重要机制,ORMX 提供了完整的事务支持,包括嵌套事务和保存点功能。对于 MongoDB 数据库,ORMX 也提供了事务支持,但需要注意 MongoDB 的事务限制和最佳实践。合理使用事务功能,可以确保数据操作的原子性和一致性。
扩展思考
在高并发场景下,如何优化事务性能?是否需要使用乐观锁、悲观锁等技术?对于分布式事务,如何实现跨数据库的一致性?在微服务架构中,如何处理事务边界和最终一致性?这些问题值得在深入使用 ORMX 后进一步思考。