Tip: 此篇已加入.NET Core微服务基础系列文章索引
一、CAP简介
下面的文字来自CAP的Wiki文档:https://github.com/dotnetcore/CAP/wiki
CAP 是一个在分布式系统中(SOA,MicroService)实现事件总线及最终一致性(分布式事务)的一个开源的 C# 库,她具有轻量级,高性能,易使用等特点。我们可以轻松的在基于 .NET Core 技术的分布式系统中引入CAP,包括但限于 ASP.NET Core 和 ASP.NET Core on .NET Framework。
CAP 的应用场景主要有以下两个:
- 分布式事务中的最终一致性(异步确保)的方案
- 具有高可用性的 EventBus
CAP 同时支持使用 RabbitMQ 或 Kafka 进行底层之间的消息发送,我们不需要具备 RabbitMQ 或者 Kafka 的使用经验,仍然可以轻松的将CAP集成到项目中。
CAP 目前支持使用 Sql Server,MySql,PostgreSql 数据库的项目;
CAP 同时支持使用 EntityFrameworkCore 和 Dapper 的项目,可以根据需要选择不同的配置方式;
CAP的作者为园友savorboard(杨晓东),成都地区的.NET社区领导者,棒棒哒!
二、案例结构
此次试验仍然和上一篇基于MassTransit的案例一样(其实是我懒得再改,直接拿来复用),共有四个MicroService应用程序,当用户下订单时会通过CAP作为事件总线发布消息,作为订阅者的库存和配送服务会接收到消息并消费消息。此次试验会采用RabbitMQ作为消息队列,采用MSSQL作为关系型数据库(同时CAP也是支持MSSQL的)。
准备工作:为所有服务通过NuGet安装CAP及其相关包
PM> Install-Package DotNetCore.CAP
下面是RabbitMQ的支持包
PM> Install-Package DotNetCore.CAP.RabbitMQ
下面是MSSQL的支持包
PM> Install-Package DotNetCore.CAP.SqlServer
三、具体实现
3.1 OrderService
(1)启动配置:这里主要需要给CAP指定数据库(它会在这个数据库中创建本地消息表Published和Received)以及使用到的消息队列(这里是RabbitMQ)
public void ConfigureServices(IServiceCollection services) { services.AddMvc(); // Repository services.AddScoped<IOrderRepository, OrderRepository>(); // EF DbContext services.AddDbContext<OrderDbContext>(); // Dapper-ConnString services.AddSingleton(Configuration["DB:OrderDB"]); // CAP services.AddCap(x => { x.UseEntityFramework<OrderDbContext>(); // EF x.UseSqlServer(Configuration["DB:OrderDB"]); // SQL Server x.UseRabbitMQ(cfg => { cfg.HostName = Configuration["MQ:Host"]; cfg.VirtualHost = Configuration["MQ:VirtualHost"]; cfg.Port = Convert.ToInt32(Configuration["MQ:Port"]); cfg.UserName = Configuration["MQ:UserName"]; cfg.Password = Configuration["MQ:Password"]; }); // RabbitMQ // Below settings is just for demo x.FailedRetryCount = 2; x.FailedRetryInterval = 5; }); ...... } // This method gets called by the runtime. Use this method to configure the HTTP request pipeline. public void Configure(IApplicationBuilder app, IHostingEnvironment env, IApplicationLifetime lifetime) { ...... app.UseMvc(); // CAP app.UseCap(); ...... }
(2)Controller:这里会调用Repository去实现业务逻辑和发送消息
[Route("api/Order")] public class OrderController : Controller { public IOrderRepository OrderRepository { get; } public OrderController(IOrderRepository OrderRepository) { this.OrderRepository = OrderRepository; } [HttpPost] public string Post([FromBody]OrderDTO orderDTO) { var result = OrderRepository.CreateOrderByDapper(orderDTO).GetAwaiter().GetResult(); return result ? "Post Order Success" : "Post Order Failed"; } }
(3)Repository:这里实现了两种方式:EF和Dapper(基于ADO.NET),其中EF方式中不需要传transaction(当CAP检测到 Publish 是在EF事务区域内的时候,将使用当前的事务上下文进行消息的存储),而基于ADO.NET方式中需要传transaction(由于不能获取到事务上下文,所以需要用户手动的传递事务上下文到CAP中)。
public class OrderRepository : IOrderRepository { public OrderDbContext DbContext { get; } public ICapPublisher CapPublisher { get; } public string ConnStr { get; } // For Dapper use public OrderRepository(OrderDbContext DbContext, ICapPublisher CapPublisher, string ConnStr) { this.DbContext = DbContext; this.CapPublisher = CapPublisher; this.ConnStr = ConnStr; } public async Task<bool> CreateOrderByEF(IOrder order) { using (var trans = DbContext.Database.BeginTransaction()) { var orderEntity = new Order() { ID = GenerateOrderID(), OrderUserID = order.OrderUserID, OrderTime = order.OrderTime, OrderItems = null, ProductID = order.ProductID // For demo use }; DbContext.Orders.Add(orderEntity); await DbContext.SaveChangesAsync(); // When using EF, no need to pass transaction var orderMessage = new OrderMessage() { ID = orderEntity.ID, OrderUserID = orderEntity.OrderUserID, OrderTime = orderEntity.OrderTime, OrderItems = null, ProductID = orderEntity.ProductID // For demo use }; await CapPublisher.PublishAsync(EventConstants.EVENT_NAME_CREATE_ORDER, orderMessage); trans.Commit(); } return true; } public async Task<bool> CreateOrderByDapper(IOrder order) { using (var conn = new SqlConnection(ConnStr)) { conn.Open(); using (var trans = conn.BeginTransaction()) { // business code here string sqlCommand = @"INSERT INTO [dbo].[Orders](OrderID, OrderTime, OrderUserID, ProductID) VALUES(@OrderID, @OrderTime, @OrderUserID, @ProductID)"; order.ID = GenerateOrderID(); await conn.ExecuteAsync(sqlCommand, param: new { OrderID = order.ID, OrderTime = DateTime.Now, OrderUserID = order.OrderUserID, ProductID = order.ProductID }, transaction: trans); // For Dapper/ADO.NET, need to pass transaction var orderMessage = new OrderMessage() { ID = order.ID, OrderUserID = order.OrderUserID, OrderTime = order.OrderTime, OrderItems = null, MessageTime = DateTime.Now, ProductID = order.ProductID // For demo use }; await CapPublisher.PublishAsync(EventConstants.EVENT_NAME_CREATE_ORDER, orderMessage, trans); trans.Commit(); } } return true; } private string GenerateOrderID() { // TODO: Some business logic to generate Order ID return Guid.NewGuid().ToString(); } private string GenerateEventID() { // TODO: Some business logic to generate Order ID return Guid.NewGuid().ToString(); } }
这里摘抄一段CAP wiki中关于事务的一段介绍:
事务在 CAP 具有重要作用,它是保证消息可靠性的一个基石。 在发送一条消息到消息队列的过程中,如果不使用事务,我们是没有办法保证我们的业务代码在执行成功后消息已经成功的发送到了消息队列,或者是消息成功的发送到了消息队列,但是业务代码确执行失败。
这里的失败原因可能是多种多样的,比如连接异常,网络故障等等。
只有业务代码和CAP的Publish代码必须在同一个事务中,才能够保证业务代码和消息代码同时成功或者失败。
换句话说,CAP会确保我们这段逻辑中业务代码和消息代码都成功了,才会真正让事务commit。
3.2 StorageService
(1)启动配置:这里主要是指定Subscriber
public void ConfigureServices(IServiceCollection services) { services.AddMvc(); // EF DbContext services.AddDbContext<StorageDbContext>(); // Dapper-ConnString services.AddSingleton(Configuration["DB:StorageDB"]); // Subscriber services.AddTransient<IOrderSubscriberService, OrderSubscriberService>(); // CAP services.AddCap(x => { x.UseEntityFramework<StorageDbContext>(); // EF x.UseSqlServer(Configuration["DB:StorageDB"]); // SQL Server x.UseRabbitMQ(cfg => { cfg.HostName = Configuration["MQ:Host"]; cfg.VirtualHost = Configuration["MQ:VirtualHost"]; cfg.Port = Convert.ToInt32(Configuration["MQ:Port"]); cfg.UserName = Configuration["MQ:UserName"]; cfg.Password = Configuration["MQ:Password"]; }); // RabbitMQ // Below settings is just for demo x.FailedRetryCount = 2; x.FailedRetryInterval = 5; }); ...... } // This method gets called by the runtime. Use this method to configure the HTTP request pipeline. public void Configure(IApplicationBuilder app, IServiceProvider serviceProvider, IHostingEnvironment env, IApplicationLifetime lifetime) { ...... app.UseMvc(); // CAP app.UseCap(); ...... }
(2)实现Subscriber
首先定义一个接口,建议放到公共类库中
public interface IOrderSubscriberService { Task ConsumeOrderMessage(OrderMessage message); }
然后实现这个接口,记得让其实现ICapSubscribe接口,然后我们就可以使用 CapSubscribeAttribute
来订阅 CAP 发布出来的消息。
public class OrderSubscriberService : IOrderSubscriberService, ICapSubscribe { private readonly string _connStr; public OrderSubscriberService(string connStr) { _connStr = connStr; } [CapSubscribe(EventConstants.EVENT_NAME_CREATE_ORDER)] public async Task ConsumeOrderMessage(OrderMessage message) { await Console.Out.WriteLineAsync($"[StorageService] Received message : {JsonHelper.SerializeObject(message)}"); await UpdateStorageNumberAsync(message); } private async Task<bool> UpdateStorageNumberAsync(OrderMessage order) { //throw new Exception("test"); // just for demo use using (var conn = new SqlConnection(_connStr)) { string sqlCommand = @"UPDATE [dbo].[Storages] SET StorageNumber = StorageNumber - 1 WHERE StorageID = @ProductID"; int count = await conn.ExecuteAsync(sqlCommand, param: new { ProductID = order.ProductID }); return count > 0; } } }
*.CAP约定消息端在方法实现的过程中需要实现幂等性,所谓幂等性就是指用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。这里我没有考虑,实际中需要首先进行验证,避免二次更新。
3.3 DeliveryService
(1)启动配置:与StorageService高度类似,只是使用的不是同一个数据库
(2)实现Subscriber
public class OrderSubscriberService : IOrderSubscriberService, ICapSubscribe { private readonly string _connStr; public OrderSubscriberService(string connStr) { _connStr = connStr; } [CapSubscribe(EventConstants.EVENT_NAME_CREATE_ORDER)] public async Task ConsumeOrderMessage(OrderMessage message) { await Console.Out.WriteLineAsync($"[DeliveryService] Received message : {JsonHelper.SerializeObject(message)}"); await AddDeliveryRecordAsync(message); } private async Task<bool> AddDeliveryRecordAsync(OrderMessage order) { //throw new Exception("test"); // just for demo use using (var conn = new SqlConnection(_connStr)) { string sqlCommand = @"INSERT INTO [dbo].[Deliveries] (DeliveryID, OrderID, ProductID, OrderUserID, CreatedTime) VALUES (@DeliveryID, @OrderID, @ProductID, @OrderUserID, @CreatedTime)"; int count = await conn.ExecuteAsync(sqlCommand, param: new { DeliveryID = Guid.NewGuid().ToString(), OrderID = order.ID, OrderUserID = order.OrderUserID, ProductID = order.ProductID, CreatedTime = DateTime.Now }); return count > 0; } } }
3.4 快速测试
(1)启动3个微服务,Check 数据库表状态
首先会看到在各个数据库中均创建了本地消息表,这两个表的含义如下:
Cap.Published:这个表主要是用来存储 CAP 发送到MQ(Message Queue)的客户端消息,也就是说你使用 ICapPublisher
接口 Publish 的消息内容。
Cap.Received:这个表主要是用来存储 CAP 接收到 MQ(Message Queue) 的客户端订阅的消息,也就是使用 CapSubscribe[]
订阅的那些消息。
然后看看各个表的数据,目前只有库存表有数据,因为我们要做的只是更新。
(2)通过Postman发一个Post请求
(3)Check控制台输出的日志信息
(4)Check数据库中的业务表和消息表数据:可以看到发送者和接收者都执行成功了,如果其中任何一个参与者发生了异常或者连接不上,CAP会有默认的重试机制(默认是50次最大重试次数,每次重试间隔60s),当失败总次数达到默认失败总次数后,就不会进行重试了,我们可以在 Dashboard 中查看消息失败的原因,然后进行人工重试处理。
另外,由于CAP会在数据库中创建消息表,因此难免会考虑到其性能。CAP提供了一个数据清理的机制,默认情况下会每隔一个小时将消息表的数据进行清理删除,避免数据量过多导致性能的降低。清理规则为 ExpiresAt (字段名)不为空并且小于当前时间的数据。
四、小结
本篇首先简单介绍了一下CAP这个开源项目,然后基于上一篇中的下订单的小案例来进行了基于CAP的改造,并通过一个实例的运行来看到了结果。当然,这个实例并不完美,很多点都没有考虑(比如消息端消费时的幂等性)和失败重试的场景实践等等等等。由于时间和精力的关系,目前只使用到这儿,以后有机会能够应用上会研究下CAP的源码,最后感谢杨晓东为.NET社区带来了一个优秀的开源项目!
示例代码
Click Here => 点我点我
参考资料
CAP - GitHub : https://github.com/dotnetcore/CAP
CAP - Wiki : https://github.com/dotnetcore/CAP/wiki
杨晓东,《BASE:一种ACID的替代方案》