net core集成CAP

时间:2022-10-31 18:37:01

net core集成CAP

https://www.cnblogs.com/guolianyu/p/9756941.html

一、前言

感谢杨晓东大佬为社区贡献的CAP开源项目,传送门在此:.NET Core 事件总线,分布式事务解决方案:CAP 以及 如何在你的项目中集成 CAP【手把手视频教程】,之前也在工作中遇到分布式数据一致性的问题,也一直都是基于CAP理论和Base。

之前一直有关注杨老板的博客,直到今天才尝试一下CAP,发现好用,非常的棒,特此把CAP以组件化的方式引入到我的框架中。

二、CAP介绍

针对CAP介绍可以参考上面给出的两个链接。在此我只简单的说明一下:

CAP 是一个在分布式系统中(SOA,MicroService)实现事件总线及最终一致性(分布式事务)的一个开源的 C# 库,她具有轻量级,高性能,易使用等特点。

你可以轻松的在基于 .NET Core 技术的分布式系统中引入CAP,包括但限于 ASP.NET Core 和 ASP.NET Core on .NET Framework。

CAP 以 NuGet 包的形式提供,对项目无任何入侵,你仍然可以以你喜爱的方式来构建分布式系统。

CAP 具有 Event Bus 的所有功能,并且CAP提供了更加简化的方式来处理EventBus中的发布/订阅。

CAP 具有消息持久化的功能,也就是当你的服务进行重启或者宕机时,她可以保证消息的可靠性。

CAP 实现了分布式事务中的最终一致性,你不用再去处理这些琐碎的细节。

CAP 提供了基于 Microsoft DI 的 API 服务,她可以和你的 ASP.NET Core 系统进行无缝结合,并且能够和你的业务代码集成支持强一致性的事务处理。

三、asp.net core集成CAP

由于我的框架是DDD六边形架构,为了解耦方便,我针对外部的工具都是以组件化的方式引入到项目中,即新建了一个CAP类库。

第一步:扩展了Startup类中的IServiceCollection,在CAP中我加入了Consul的注册,如下图:

复制代码

public static void AddCAPConfigure(this IServiceCollection services, IConfiguration configuration)

{

services.AddCap(x =>

{

//使用Dapper ORM

x.UseMySql(configuration.GetConnectionString("DBConnection"));

            //使用kafka 进行日志、case的消息推送
//需要配置一下MQ地址,kafka放在linux系统上,不建议放在window上
x.UseKafka(configuration["KafkaConfig"]);
x.UseDashboard();//得到UI界面
//注册服务发现
x.UseDiscovery(d =>
{
d.DiscoveryServerHostName = "192.168.161.163";
d.DiscoveryServerPort = 8500;
d.CurrentNodeHostName = "localhost";
d.CurrentNodePort = 64616;
d.NodeName = "CAP No.1 Node";
});
});
}

复制代码

我用的是mysql数据库,以及使用kafka消息队列,这边要注意,kafka最好部署在linux系统上,在windows系统会存在很多的坑,如果你觉得你的天坑能力强,可以尝试一下。

我这边也集成了 Consul服务注册,如果大家对cosnul感谢的兴趣的可以看我的另外一篇文章:实战中的asp.net core结合Consul集群&Docker实现服务治理 里面有讲解了consul集群部署。

好了然后我在我的主项目中配置一下,就开始用吧:

region 配置CAP

        services.AddCAPConfigure(Configuration);

endregion

第二步:在asp.net core webapi项目中新建一个控制器

配置如下:

复制代码

[Route("api/[controller]/[action]")]

public class ValuesController : Controller

{

private readonly ICapPublisher _capBus;

public ValuesController(ICapPublisher capPublisher)

{

_capBus = capPublisher;

}

[HttpGet]

public IActionResult Get()

{

_capBus.Publish("show.time", DateTime.Now);

return Ok();

}

[HttpGet]

[CapSubscribe("show.time")]

public void CheckReceiveMessage(DateTime time)

{

Console.WriteLine(time.AddDays(1));

}

}

复制代码

此处的mysql配置大家可自行补充,或者按照 @Savorboard 给出的demo操作即可。

第三步:部署一下kafka。

我在centos服务器上采用docker部署,命令如下:

//下载zookeeper

docker pull wurstmeister/zookeeper

//下载kafka

docker pull wurstmeister/kafka:2.11-0.11.0.3

//启动zookeeper

docker run -d --name zookeeper --publish 2181:2181 --volume /etc/localtime:/etc/localtime wurstmeister/zookeeper

//启动kafka

docker run -d --name kafka --publish 9092:9092

--link zookeeper

--env KAFKA_ZOOKEEPER_CONNECT=192.168.161.163:2181

--env KAFKA_ADVERTISED_HOST_NAME=192.168.161.163

--env KAFKA_ADVERTISED_PORT=9092

--volume /etc/localtime:/etc/localtime

wurstmeister/kafka:2.11-0.11.0.3

部署完毕后就进入下一步运行啦。

第四步:运行项目,运行成功后,我嗯可以在数据库中发现cap会自动在数据库中创建两张表,一张是 发布信息表、一张是接收信息表。

表:

发现 表中有数据存在:

数据体现法发送接收成功。

我们再来看看cap有提供的UI界面,发现里面有一个我们用consul注册的服务器。完美实现。

我们看一下consul集群:

四、总结

欢迎大家积极尝试CAP,我也会在后续的项目中采用CAP,希望我们的社区越来越强大。