在Asp.Net Core中集成Kafka

时间:2024-12-23 15:37:44

  在我们的业务中,我们通常需要在自己的业务子系统之间相互发送消息,一端去发送消息另一端去消费当前消息,这就涉及到使用消息队列MQ的一些内容,消息队列成熟的框架有多种,这里你可以读这篇文章来了解这些MQ的不同,这篇文章的主要目的是用来系统讲述如何在Asp.Net Core中使用Kafka,整篇文章将介绍如何写消息发送方代码、消费方代码、配套的工具的使用,希望读完这篇文章之后对整个消息的运行机制有一定的理解,在这里通过一张图来简要了解一下消息队列中的一些概念。

在Asp.Net Core中集成Kafka

图一 Kafka消息队列

  一 安装NUGET包

  在写代码之前首先要做的就是安装nuget包了,我们这里使用的是Confluent.Kafka 1.0.0-RC4版本,具体项目要根据具体的时间来确定引用包的版本,这些包可能更新比较快。

在Asp.Net Core中集成Kafka

图二 引用Kafka包依赖

  二 消息发送方(Producer)

  1 在项目中添加所有触发事件的接口 IIntegrationEvent,后面所有的触发事件都是继承自这个接口。

/// <summary>
/// 集成事件的接口定义
/// </summary>
public interface IIntegrationEvent {
string Key { get; set; }
}

  2 定义Kafka生产者

/// <summary>
/// Kafka 生产者的 Domain Service
/// </summary>
public class KafkaProducer : DomainService {
private readonly IConfiguration _config;
private readonly ILogger<KafkaProducer> _logger; public KafkaProducer(IConfiguration config,
ILogger<KafkaProducer> logger) {
_config = config;
_logger = logger;
} /// <summary>
/// 发送事件
/// </summary>
/// <param name="event"></param>
public void Produce(IIntegrationEvent @event) {
var topic = _config.GetValue<string>($"Kafka:Topics:{@event.GetType().Name}"); var producerConfig = new ProducerConfig {
BootstrapServers = _config.GetValue<string>("Kafka:BootstrapServers"),
MessageTimeoutMs = _config.GetValue<int>("Kafka:MessageTimeoutMs")
}; var builder = new ProducerBuilder<string, string>(producerConfig);
using (var producer = builder.Build()) {
try {
var json = JsonConvert.SerializeObject(@event);
var dr = producer.ProduceAsync(topic, new Message<string, string> { Key = @event.Key, Value = json }).GetAwaiter().GetResult();
_logger.LogDebug("发送事件 {0} 到 {1} 成功", dr.Value, dr.TopicPartitionOffset);
} catch (ProduceException<string, string> ex) {
_logger.LogError(ex, "发送事件到 {0} 失败,原因 {1} ", topic, ex.Error.Reason);
}
}
}
}

  在这里我们的Producer根据业务的需要定义在领域服务中,这里面最关键的就是Produce方法了,该方法的参数是继承自IIntegrationEvent 接口的各种各样事件,在这个方法中,我们获取配置在appsetting.json中配置的各种Topic以及Kafka服务器的地址,具体的配置如下方截图所示。  

在Asp.Net Core中集成Kafka

图三 配置服务器地址以及各种Topic

  通过当前配置我们就知道我们的消息要发往何处,然后我们就可以创建一个producer来将我们的事件(实际上是定义的数据结构)序列化成Json,然后通过异步的方式发送出去,这里需要注意我们创建的Producer要放在一个using块中,这样在创建完成并发送消息之后就会释放当前生产者。这里如果发送失败会在当前日志中记录发送的值以及错误的原因从而便于进行调试。这里举出其中的一个事件RepairContractFinishedEvent为例来说明。

/// <summary>
/// 维修合同完成的事件
/// </summary>
public class RepairContractFinishedEvent : IIntegrationEvent {
public RepairContract RepairContract { get; set; } //一个维修合同会对应多个调整单
public List<RepairContractAdjust> RepairContractAdjusts { get; set; } public string Key { get; set; }
}

  这个里面RepairContract以及List集合都是我们定义的一种数据结构。

  最后我们来看看在具体的领域层中我们该如何触发此事件的,这里我们也定义了一个叫做IRepairContractEventManager接口的领域服务,并在里面定义了一个叫做Finished的接口,然后在RepairContractEventManager中实现该方法。

 public class RepairContractEventManager : DomainService, IRepairContractEventManager {
private readonly KafkaProducer _producer;
private readonly IRepository<RepairContract, Guid> _repairContractRepository;
private readonly IRepository<RepairContractAdjust, Guid> _repairContractAdjustRepository; public RepairContractEventManager(KafkaProducer producer,
IRepository<RepairContract, Guid> repairContractRepository,
IRepository<RepairContractAdjust, Guid> repairContractAdjustRepository) {
_producer = producer;
_repairContractRepository = repairContractRepository;
_repairContractAdjustRepository = repairContractAdjustRepository;
} public void Finished(Guid repairContractId) {
var repairContract = _repairContractRepository.GetAll()
.Include(c => c.RepairContractWorkItems).ThenInclude(w => w.Materials)
.SingleOrDefaultAsync(c => c.Id == repairContractId).GetAwaiter().GetResult();
var repairContractAdjusts = _repairContractAdjustRepository.GetAll()
.Include(a => a.WorkItems).ThenInclude(w => w.Materials)
.Where(a => a.RepairContractId == repairContractId).ToListAsync().GetAwaiter().GetResult(); var @event = new RepairContractFinishedEvent {
Key = repairContract?.Code,
RepairContract = repairContract,
RepairContractAdjusts = repairContractAdjusts
};
_producer.Produce(@event);
}
}

  这段代码就是组装RepairContractFinishedEvent的具体实现过程,然后调用我们之前创建的KafkaProducer对象然后将消息发送出去,这样在需要触发当前RepairContractFinishedEvent 的地方来注入IRepairContractEventManager接口,然后调对应的Finished方法,这样就完成了整个消息的发送的过程了。

  三 查看消息的发送

  在发送完消息后我们可以到Kafka 集群 Control Center中查找我们发送的所有消息。选择其中的一条消息,双击,然后选择INSPECT来查看发送的消息

在Asp.Net Core中集成Kafka

图四 Kafka Control Center中查看发送消息

  这里通过这个网页去观察发送和接收的消息时有时候会存在一定的延时,这里推荐另外一个Kafka Tool 的工具,通过这个工具能够比较好的实时监测发送和接收消息,先来看看整个界面,然后我们再来看看到底该怎么配置这个软件。

在Asp.Net Core中集成Kafka

图五 Kafka Tool中查看发送消息

  要想使用这个工具,首先第一步就是要配置Cluster端,具体的配置我们看看有哪些东西。

在Asp.Net Core中集成Kafka

图六 Kafka Tool中新增Cluster

  另外一点需要注意的就是默认接收到的Message都是byte数组,我们这里需要配置ContentType,配置的方法是选中其中的一条消息--》Properties--》ContentType-->Update

在Asp.Net Core中集成Kafka

图七 Kafka Tool中配置ContentType 

  四 消息的接收方(Consumer) 

  在正确创建消息的发送方后紧接着就是定义消息的接收方了,消息的接收方顾名思义就是消费刚才消息的一方,这里的步骤和发送类似,但是也有很大的不同,消息的消费方核心是一个后台服务,并且在单独的线程中监听来自发送方的消息,并进行消费,这里我们先定义一个叫做KafkaConsumerHostedService的基类,我们具体来看看代码。

/// <summary>
/// Kafka 消费者的后台服务基础类
/// </summary>
/// <typeparam name="T">事件类型</typeparam>
public abstract class KafkaConsumerHostedService<T> : BackgroundService where T : IIntegrationEvent {
protected readonly IServiceProvider _services;
protected readonly IConfiguration _config;
protected readonly ILogger<KafkaConsumerHostedService<T>> _logger; public KafkaConsumerHostedService(IServiceProvider services, IConfiguration config, ILogger<KafkaConsumerHostedService<T>> logger) {
_services = services;
_config = config;
_logger = logger;
} /// <summary>
/// 消费该事件,比如调用 Application Service 持久化数据等
/// </summary>
/// <param name="event">事件内容</param>
protected abstract void DoWork(T @event); /// <summary>
/// 构造 Kafka 消费者实例,监听指定 Topic,获得最新的事件
/// </summary>
/// <param name="stoppingToken">终止标识</param>
/// <returns></returns>
protected override async Task ExecuteAsync(CancellationToken stoppingToken) {
await Task.Factory.StartNew(() => {
var topic = _config.GetValue<string>($"Kafka:Topics:{typeof(T).Name}"); var consumerConfig = new ConsumerConfig {
BootstrapServers = _config.GetValue<string>("Kafka:BootstrapServers"),
AutoOffsetReset = AutoOffsetReset.Earliest,
GroupId = _config.GetValue<string>("Application:Name"),
EnableAutoCommit = true,
};
var builder = new ConsumerBuilder<string, string>(consumerConfig);
using (var consumer = builder.Build()) {
consumer.Subscribe(topic);
while (!stoppingToken.IsCancellationRequested) {
try {
var result = consumer.Consume(stoppingToken);
var @event = JsonConvert.DeserializeObject<T>(result.Value);
DoWork(@event);
//consumer.StoreOffset(result);
} catch (OperationCanceledException ex) {
consumer.Close();
_logger.LogDebug(ex, "Kafka 消费者结束,退出后台线程");
} catch (AbpValidationException ex) {
_logger.LogError(ex, $"Kafka {GetValidationErrorNarrative(ex)}");
} catch (ConsumeException ex) {
_logger.LogError(ex, "Kafka 消费者产生异常");
} catch (KafkaException ex) {
_logger.LogError(ex, "Kafka 产生异常");
} catch (ValidationException ex) {
_logger.LogError(ex, "Kafka 消息验证失败");
} catch (Exception ex) {
_logger.LogError(ex, "Kafka 捕获意外异常");
}
}
}
}, stoppingToken, TaskCreationOptions.LongRunning, TaskScheduler.Default);
} private string GetValidationErrorNarrative(AbpValidationException validationException) {
var detailBuilder = new StringBuilder();
detailBuilder.AppendLine("验证过程中检测到以下错误"); foreach (var validationResult in validationException.ValidationErrors) {
detailBuilder.AppendFormat(" - {0}", validationResult.ErrorMessage);
detailBuilder.AppendLine();
} return detailBuilder.ToString();
}
}

  这段代码中我们会创建一个consumer,这里我们会在一个While循环中去订阅特定Topic消息,这里的BootstrapServers是和发送方保持一致,并且也是在当前应用程序中的appsetting.json中进行配置的,而且这里的consumer.Consume方法是一个阻塞式方法,当发送方发送特定事件后,这里会接收到同样名称的Topic的消息,然后将接收到的Json数据进行反序列化,然后交由后面的DoWork方法进行处理。这里还是以之前生成者发送的RepairContractFinished事件为例,这里也需要定义一个RepairContractFinishedEventHandler来处理生产者发送的消息。

public class RepairContractFinishedEventHandler : KafkaConsumerHostedService<RepairContractFinishedEvent> {
public RepairContractFinishedEventHandler(IServiceProvider services,
IConfiguration config, ILogger<KafkaConsumerHostedService<RepairContractFinishedEvent>> logger)
: base(services, config, logger) {
} /// <summary>
/// 调用 Application Service,新增或更新维修合同及关联实体
/// </summary>
/// <param name="event">待消费的事件</param>
protected override void DoWork(RepairContractFinishedEvent @event) {
using (var scope = _services.CreateScope()) {
var service = scope.ServiceProvider.GetRequiredService<IRepairContractAppService>();
service.AddOrUpdateRepairContract(@event.RepairContract, @event.RepairContractAdjusts);
}
}
}

  这里需要特别注意的是在这里我么也需要定义一个继承自IIntegrationEvent接口的事件,这里也是定义一种数据结构,并且这里的数据结构和生成者定义的要保持一致,否则消费方在反序列化的时候会丢失不能够匹配的信息。

public class RepairContractFinishedEvent : IIntegrationEvent {
public RepairContractDto RepairContract { get; set; } public List<RepairContractAdjustDto> RepairContractAdjusts { get; set; } public string Key { get; set; }
}

  另外在DoWork方法中我们也需要注意代码也需要用using包裹,从而在消费方消费完后释放掉当前的应用服务。最后需要注意的就是我们的每一个Handle都是一个后台服务,我们需要在Asp.Net Core的Startup的ConfigureServices进行配置,从而将当前的后台服务添加到Asp.Net Core依赖注入容器中。

   /// <summary>
/// 注册集成事件的处理器
/// </summary>
/// <param name="services"></param>
private void AddIntegrationEventHandlers(IServiceCollection services) {
services.AddHostedService<RepairContractFinishedEventHandler>();
services.AddHostedService<ProductTransferDataEventHandler>();
services.AddHostedService<PartUpdateEventHandler>();
services.AddHostedService<VehicleSoldFinishedEventHandler>();
services.AddHostedService<AddOrUpdateDealerEventHandler>();
services.AddHostedService<AddOrUpdateProductCategoryEventHandler>();
services.AddHostedService<CustomerFinishedEventHandler>();
services.AddHostedService<VehicleSoldUpdateStatusEventHandler>();
services.AddHostedService<AddCustomerEventHandler>();
}

  最后我们也看看我们的appsetting.json的配置文件关于kafka的配置。

"Kafka": {
"BootstrapServers": "127.0.0.1:9092",
"MessageTimeoutMs": 5000,
"Topics": {
"RepairContractFinishedEvent": "repair-contract-finished",
"AddOrUpdateProductCategoryEvent": "add-update-product-category",
"AddOrUpdateDealerEvent": "add-update-dealer",
"ClaimApproveEvent": "claim-approve",
"ProductTransferDataEvent": "product-update",
"PartUpdateEvent": "part-update",
"VehicleSoldFinishedEvent": "vehiclesold-finished",
"CustomerFinishedEvent": "customer-update",
"VehicleInformationUpdateStatusEvent": "add-update-vehicle-info",
"AddCustomerEvent": "add-customer"
}
},

  这里需要注意的是发送方和接收方必须保证Topic一致,并且配置的服务器名称端口保持一致,这样才能够保证消息的准确发送和接收。最后对于服务端,这里推荐一个VSCode的插件kafka,能够创建并发送消息,这样就方便我们来发送我们需要的数据了,这里同样需要我们先建立一个.kafka的文件,然后配置Kafka服务的地址和端口号。

在Asp.Net Core中集成Kafka

图八 利用VSCode Kafka插件发送消息

  

相关文章