asp.net core集成CAP(分布式事务总线)

时间:2024-01-21 21:23:14

一、前言


感谢杨晓东大佬为社区贡献的CAP开源项目,传送门在此:.NET Core 事件总线,分布式事务解决方案:CAP 以及 如何在你的项目中集成 CAP【手把手视频教程】,之前也在工作中遇到分布式数据一致性的问题,也一直都是基于CAP理论和Base。

之前一直有关注杨老板的博客,直到今天才尝试一下CAP,发现好用,非常的棒,特此把CAP以组件化的方式引入到我的框架中。

二、CAP介绍


针对CAP介绍可以参考上面给出的两个链接。在此我只简单的说明一下:

CAP 是一个在分布式系统中(SOA,MicroService)实现事件总线及最终一致性(分布式事务)的一个开源的 C# 库,她具有轻量级,高性能,易使用等特点。

你可以轻松的在基于 .NET Core 技术的分布式系统中引入CAP,包括但限于 ASP.NET Core 和 ASP.NET Core on .NET Framework。

CAP 以 NuGet 包的形式提供,对项目无任何入侵,你仍然可以以你喜爱的方式来构建分布式系统。

CAP 具有 Event Bus 的所有功能,并且CAP提供了更加简化的方式来处理EventBus中的发布/订阅。

CAP 具有消息持久化的功能,也就是当你的服务进行重启或者宕机时,她可以保证消息的可靠性。

CAP 实现了分布式事务中的最终一致性,你不用再去处理这些琐碎的细节。

CAP 提供了基于 Microsoft DI 的 API 服务,她可以和你的 ASP.NET Core 系统进行无缝结合,并且能够和你的业务代码集成支持强一致性的事务处理。

 

三、asp.net core集成CAP


 由于我的框架是DDD六边形架构,为了解耦方便,我针对外部的工具都是以组件化的方式引入到项目中,即新建了一个CAP类库。

 

第一步:扩展了Startup类中的IServiceCollection,在CAP中我加入了Consul的注册,如下图:

public static void AddCAPConfigure(this IServiceCollection services, IConfiguration configuration)
        {
            services.AddCap(x =>
            {
                //使用Dapper ORM
                x.UseMySql(configuration.GetConnectionString("DBConnection"));

                //使用kafka 进行日志、case的消息推送
                //需要配置一下MQ地址,kafka放在linux系统上,不建议放在window上
                x.UseKafka(configuration["KafkaConfig"]);
                x.UseDashboard();//得到UI界面
                //注册服务发现
                x.UseDiscovery(d =>
                {
                    d.DiscoveryServerHostName = "192.168.161.163";
                    d.DiscoveryServerPort = 8500;
                    d.CurrentNodeHostName = "localhost";
                    d.CurrentNodePort = 64616;
                    d.NodeName = "CAP No.1 Node";
                });
            });
        }

 

我用的是mysql数据库,以及使用kafka消息队列,这边要注意,kafka最好部署在linux系统上,在windows系统会存在很多的坑,如果你觉得你的天坑能力强,可以尝试一下。

我这边也集成了 Consul服务注册,如果大家对cosnul感谢的兴趣的可以看我的另外一篇文章:实战中的asp.net core结合Consul集群&Docker实现服务治理 里面有讲解了consul集群部署。

 

好了然后我在我的主项目中配置一下,就开始用吧:

#region 配置CAP
            services.AddCAPConfigure(Configuration);
#endregion

 

 

第二步:在asp.net core webapi项目中新建一个控制器

配置如下:

[Route("api/[controller]/[action]")]
    public class ValuesController : Controller
    {
        private readonly ICapPublisher _capBus;
        public ValuesController(ICapPublisher capPublisher)
        {
            _capBus = capPublisher;
        }
        [HttpGet]
        public IActionResult Get()
        {
            _capBus.Publish("show.time", DateTime.Now);
            return Ok();
        }
        [HttpGet]
        [CapSubscribe("show.time")]
        public void CheckReceiveMessage(DateTime time)
        {
            Console.WriteLine(time.AddDays(1));
        }
    }

 

此处的mysql配置大家可自行补充,或者按照 @Savorboard  给出的demo操作即可。

 

第三步:部署一下kafka。

我在centos服务器上采用docker部署,命令如下:

//下载zookeeper
docker pull wurstmeister/zookeeper

//下载kafka
docker pull wurstmeister/kafka:2.11-0.11.0.3

 

//启动zookeeper
docker run -d --name zookeeper --publish 2181:2181 --volume /etc/localtime:/etc/localtime wurstmeister/zookeeper

//启动kafka
docker run -d --name kafka --publish 9092:9092 \
--link zookeeper \
--env KAFKA_ZOOKEEPER_CONNECT=192.168.161.163:2181 \
--env KAFKA_ADVERTISED_HOST_NAME=192.168.161.163 \
--env KAFKA_ADVERTISED_PORT=9092 \
--volume /etc/localtime:/etc/localtime \
wurstmeister/kafka:2.11-0.11.0.3

 

部署完毕后就进入下一步运行啦。

第四步:运行项目,运行成功后,我嗯可以在数据库中发现cap会自动在数据库中创建两张表,一张是 发布信息表、一张是接收信息表。

 

 表:

 

 

发现 表中有数据存在:

数据体现法发送接收成功。

 

 

我们再来看看cap有提供的UI界面,发现里面有一个我们用consul注册的服务器。完美实现。

我们看一下consul集群:

 

 

四、总结

 欢迎大家积极尝试CAP,我也会在后续的项目中采用CAP,希望我们的社区越来越强大。


 

asp.net Core 交流群:787464275 欢迎加群交流
如果您认为这篇文章还不错或者有所收获,您可以点击右下角的【推荐】按钮精神支持,因为这种支持是我继续写作,分享的最大动力!

作者:LouieGuo
声明:原创博客请在转载时保留原文链接或者在文章开头加上本人博客地址,如发现错误,欢迎批评指正。凡是转载于本人的文章,不能设置打赏功能,如有特殊需求请与本人联系!

微信公众号:欢迎关注                                                 QQ技术交流群: 欢迎加群