.NET Core 事件总线,分布式事务解决方案:CAP

时间:2024-04-13 20:10:55


CAP 介绍

Github:https://github.com/dotnetcore/CAP

开源协议:MIT

CAP 是一个在分布式系统中(SOA,MicroService)实现事件总线及最终一致性(分布式事务)的一个开源的 C# 库,她具有轻量级,高性能,易使用等特点。

你可以轻松的在基于 .NET Core 技术的分布式系统中引入CAP,包括但限于 ASP.NET Core 和 ASP.NET Core on .NET Framework。

CAP 以 NuGet 包的形式提供,对项目无任何入侵,你仍然可以以你喜爱的方式来构建分布式系统。

CAP 具有 Event Bus 的所有功能,并且CAP提供了更加简化的方式来处理EventBus中的发布/订阅。

CAP 具有消息持久化的功能,也就是当你的服务进行重启或者宕机时,她可以保证消息的可靠性。

CAP 实现了分布式事务中的最终一致性,你不用再去处理这些琐碎的细节。

CAP 提供了基于 Microsoft DI 的 API 服务,她可以和你的 ASP.NET Core 系统进行无缝结合,并且能够和你的业务代码集成支持强一致性的事务处理。

CAP 是开源免费的。CAP基于MIT协议开源,你可以免费的在你的私人或者商业项目中使用,不会有人向你收取任何费用。

Getting Started

目前, CAP 同时支持使用 RabbitMQ 或 Kafka 进行底层之间的消息发送,你不需要具备 RabbitMQ 或者 Kafka 的使用经验,仍然可以轻松的集成到项目中。

CAP 目前支持使用 MS Sql Server 数据库的项目,其他数据库正在支持中...

CAP 同时支持使用 EntityFrameworkCore 和 Dapper 的项目,你可以根据需要选择不同的配置方式。

下面是CAP在系统中的一个不完全示意图:

.NET Core 事件总线,分布式事务解决方案:CAP

图中实线部分代表用户代码,虚线部分代表CAP内部实现。

下面,我们看一下 CAP 怎么集成到项目中:

Step 1:

你可以运行下面的命令来安装CAP NuGet 包:

PM> Install-Package DotNetCore.CAP

根据底层消息队列,你可以选择引入不同的包:

// 如果你使用的是 KafkaPM> Install-Package DotNetCore.CAP.Kafka// 如果你使用的是 RabbitMQPM> Install-Package DotNetCore.CAP.RabbitMQ

CAP 目前支持使用 SQL Server 的项目,你需要引入:

PM> Install-Package DotNetCore.CAP.SqlServer

Step 2:

在 Startup.cs 文件中,添加如下配置:

public void ConfigureServices(IServiceCollection services){
    ......
    
    services.AddDbContext<AppDbContext>();
    
    services.AddCap(x =>
    {        // 如果你的 SqlServer 使用的 EF 进行数据操作,你需要添加如下配置:
        // 注意: 你不需要再次配置 x.UseSqlServer(""")
        x.UseEntityFramework<AppDbContext>();        
        // 如果你使用的Dapper,你需要添加如下配置:
        x.UseSqlServer("数据库连接字符串");    
        // 如果你使用的 RabbitMQ 作为MQ,你需要添加如下配置:
        x.UseRabbitMQ("localhost");    
        //如果你使用的 Kafka 作为MQ,你需要添加如下配置:
        x.UseKafka("localhost:9092");
    });
}public void Configure(IApplicationBuilder app){
    .....    
    // 添加 CAP
    app.UseCap();
}

发布事件/消息

在 Controller 中注入 ICapPublisher 然后使用 ICapPublisher 进行消息发布:

public class PublishController : Controller{    private readonly ICapPublisher _publisher;    
    public PublishController(ICapPublisher publisher)    {
        _publisher = publisher;
    }
    
    [Route("~/checkAccountWithTrans")]    public async Task<IActionResult> PublishMessageWithTransaction([FromServices]AppDbContext dbContext)    {         using (var trans = dbContext.Database.BeginTransaction())
         {            //指定发送的消息标题(供订阅)和内容
            await _publisher.PublishAsync("xxx.services.account.check",                new Person { Name = "Foo", Age = 11 });            // 你的业务代码。
            trans.Commit();
         }        return Ok();
    }
}

订阅事件/消息

在 Controller 中:
如果是在Controller中,直接添加[CapSubscribe("")] 来订阅相关消息。

public class PublishController : Controller{
    [NoAction]
    [CapSubscribe("xxx.services.account.check")]    public async Task CheckReceivedMessage(Person person)    {
        Console.WriteLine(person.Name);
        Console.WriteLine(person.Age);     
        return Task.CompletedTask;
    }
}

在 xxxService中:
如果你的方法没有位于Controller 中,那么你订阅的类需要继承 ICapSubscribe,然后添加[CapSubscribe("")]标记:

namespace xxx.Service{    public interface ISubscriberService
    {        public void CheckReceivedMessage(Person person);
    }    
    
    public class SubscriberService: ISubscriberService, ICapSubscribe
    {
        [CapSubscribe("xxx.services.account.check")]        public void CheckReceivedMessage(Person person)        {
            
        }
    }
}

然后在 Startup.cs 中的 ConfigureServices() 中注入你的 ISubscriberService 类

public void ConfigureServices(IServiceCollection services){
    services.AddTransient<ISubscriberService,SubscriberService>();
}

结束了,怎么样,是不是很简单?