.NetCore实战——工作单元模式(UnitOfWork):管理好你的事务

·

2 min read

工作单元模式有如下几个特性:

1、使用同一上下文

2、跟踪实体的状态

3、保障事务一致性

我们对实体的操作,最终的状态都是应该如实保存到我们的存储中,进行持久化

接下来看一下代码

为了实现工作单元模式,这里定义了一个工作单元的接口

public interface IUnitOfWork : IDisposable
{
    Task<int> SaveChangesAsync(CancellationToken cancellationToken = default);
    Task<bool> SaveEntitiesAsync(CancellationToken cancellationToken = default);
}

这两个方法的区别是:一个是返回的 int 是指我们影响的数据条数,另外一个返回 bool 表示我们保存是否成功,本质上这两个方法达到的效果是相同的。

另外还定义了一个事务管理的接口

public interface ITransaction
{
    // 获取当前事务
    IDbContextTransaction GetCurrentTransaction();

    // 判断当前事务是否开启
    bool HasActiveTransaction { get; }

    // 开启事务
    Task<IDbContextTransaction> BeginTransactionAsync();

    // 提交事务
    Task CommitTransactionAsync(IDbContextTransaction transaction);

    // 事务回滚
    void RollbackTransaction();
}

在实现上我们是借助 EF 来实现工作单元模式的

看一下 EFContext 的定义

/// <summary>
/// DbContext 是 EF 的基类,然后实现了 UnitOfWork 的接口和事务的接口
/// </summary>
public class EFContext : DbContext, IUnitOfWork, ITransaction
{
    protected IMediator _mediator;
    ICapPublisher _capBus;

    // 后面的章节会详细讲到这两个参数
    public EFContext(DbContextOptions options, IMediator mediator, ICapPublisher capBus) : base(options)
    {
        _mediator = mediator;
        _capBus = capBus;
    }

    #region IUnitOfWork

    public async Task<bool> SaveEntitiesAsync(CancellationToken cancellationToken = default)
    {
        var result = await base.SaveChangesAsync(cancellationToken);
        //await _mediator.DispatchDomainEventsAsync(this);
        return true;
    }

    //// 可以看到这个方法实际上与上面的方法是相同的,所以这个方法可以不实现
    //public override Task<int> SaveChangesAsync(bool acceptAllChangesOnSuccess, CancellationToken cancellationToken = default)
    //{
    //    return base.SaveChangesAsync(acceptAllChangesOnSuccess, cancellationToken);
    //}

    #endregion

    #region ITransaction

    private IDbContextTransaction _currentTransaction;// 把当前的事务用一个字段存储

    public IDbContextTransaction GetCurrentTransaction() => _currentTransaction;// 获取当前的事务就是返回存储的私有对象

    public bool HasActiveTransaction => _currentTransaction != null;// 事务是否开启是判断当前这个事务是否为空

    /// <summary>
    /// 开启事务
    /// </summary>
    /// <returns></returns>
    public Task<IDbContextTransaction> BeginTransactionAsync()
    {
        if (_currentTransaction != null) return null;
        _currentTransaction = Database.BeginTransaction(_capBus, autoCommit: false);
        return Task.FromResult(_currentTransaction);
    }

    /// <summary>
    /// 提交事务
    /// </summary>
    /// <param name="transaction">当前事务</param>
    /// <returns></returns>
    public async Task CommitTransactionAsync(IDbContextTransaction transaction)
    {
        if (transaction == null) throw new ArgumentNullException(nameof(transaction));
        if (transaction != _currentTransaction) throw new InvalidOperationException($"Transaction {transaction.TransactionId} is not current");

        try
        {
            await SaveChangesAsync();// 将当前所有的变更都保存到数据库
            transaction.Commit();
        }
        catch
        {
            RollbackTransaction();
            throw;
        }
        finally
        {
            if (_currentTransaction != null)
            {
                // 最终需要把当前事务进行释放,并且置为空
                // 这样就可以多次的开启事务和提交事务
                _currentTransaction.Dispose();
                _currentTransaction = null;
            }
        }
    }

    /// <summary>
    /// 回滚
    /// </summary>
    public void RollbackTransaction()
    {
        try
        {
            _currentTransaction?.Rollback();
        }
        finally
        {
            if (_currentTransaction != null)
            {
                _currentTransaction.Dispose();
                _currentTransaction = null;
            }
        }
    }

    #endregion
}

另外一个我们还是需要关注的一点就是如何管理我们的事务

这里有一个类 TransactionBehavior,这个类是用来注入我们的事务的管理过程的,具体它是怎么工作的在后续的章节会讲到,这里先关注它的实现过程

public class TransactionBehavior<TDbContext, TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse> where TDbContext : EFContext
{
    ILogger _logger;
    TDbContext _dbContext;
    ICapPublisher _capBus;
    public TransactionBehavior(TDbContext dbContext, ICapPublisher capBus, ILogger logger)
    {
        _dbContext = dbContext ?? throw new ArgumentNullException(nameof(dbContext));
        _capBus = capBus ?? throw new ArgumentNullException(nameof(capBus));
        _logger = logger ?? throw new ArgumentNullException(nameof(logger));
    }


    public async Task<TResponse> Handle(TRequest request, CancellationToken cancellationToken, RequestHandlerDelegate<TResponse> next)
    {
        var response = default(TResponse);
        var typeName = request.GetGenericTypeName();

        try
        {
            // 首先判断当前是否有开启事务
            if (_dbContext.HasActiveTransaction)
            {
                return await next();
            }

            // 定义了一个数据库操作执行的策略,比如说可以在里面嵌入一些重试的逻辑,这里创建了一个默认的策略
            var strategy = _dbContext.Database.CreateExecutionStrategy();

            await strategy.ExecuteAsync(async () =>
            {
                Guid transactionId;
                using (var transaction = await _dbContext.BeginTransactionAsync())
                using (_logger.BeginScope("TransactionContext:{TransactionId}", transaction.TransactionId))
                {
                    _logger.LogInformation("----- 开始事务 {TransactionId} ({@Command})", transaction.TransactionId, typeName, request);

                    response = await next();// next 实际上是指我们的后续操作,这里的模式有点像之前讲的中间件模式

                    _logger.LogInformation("----- 提交事务 {TransactionId} {CommandName}", transaction.TransactionId, typeName);


                    await _dbContext.CommitTransactionAsync(transaction);

                    transactionId = transaction.TransactionId;
                }
            });

            return response;
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "处理事务出错 {CommandName} ({@Command})", typeName, request);

            throw;
        }
    }
}

这里实现里在执行命令之前判断事务是否开启,如果事务开启的话继续执行后面的逻辑,如果事务没有开启,先开启事务,再执行后面的逻辑。

回过头来看一下我们的 EFContext,EFContext 实现 IUnitOfWork,工作单元模式的核心,它实现了事务的管理和工作单元模式,我们就可以借助 EFContext 来实现我们的仓储层