在上一篇中我们主要介绍如何在Asp.Net Core中同步Kafka消息,通过上一篇的操作我们发现上面一篇中介绍的只能够进行简单的首发kafka消息并不能够消息重发、重复消费、乐观锁冲突等问题,这些问题在实际的生产环境中是非常要命的,如果在消息的消费方没有做好必须的幂等性操作,那么消费者重复消费的问题会比较严重的,另外对于消息的生产者来说,记录日志的方式也不是足够友好,很多时候在后台监控程序中我们需要知道记录更多的关于消息的分区、偏移等更多的消息。而在消费者这边我们更多的需要去解决发送方发送重复消息,以及面对乐观锁冲突的时候该怎么解决这些问题,当然代码中的这些方案都是我们在实际生产中摸索出来的一些方案,当然这些都是需要后续进行进一步优化的,这里我们将分别就生产者和消费者中出现的问题来进行分析和说明。
图一 消费者方几乎同一时刻接收到两条同样的Kafka消息(Grafana监控)
一 生产者方
using System;
using System.ComponentModel.DataAnnotations;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;
using Abp.Dependency;
using Confluent.Kafka;
using JetBrains.Annotations;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Sunlight.Kafka.Abstractions; [assembly: InternalsVisibleTo("DynamicProxyGenAssembly2")] namespace Sunlight.Kafka {
/// <summary>
/// Kafka 生产者的 Domain Service
/// </summary>
public class KafkaProducer : ISingletonDependency, IDisposableDependencyObjectWrapper<IProducer<string, string>>, IMessageProducer {
private readonly IConfiguration _config;
private readonly ILogger<KafkaProducer> _logger;
private readonly IProducer<string, string> _producer; /// <summary>
/// 构造 <see cref="KafkaProducer"/>
/// </summary>
/// <param name="config"></param>
/// <param name="logger"></param>
public KafkaProducer(IConfiguration config,
ILogger<KafkaProducer> logger) {
_config = config;
_logger = logger; var producerConfig = new ProducerConfig {
BootstrapServers = _config.GetValue<string>("Kafka:BootstrapServers"),
MessageTimeoutMs = _config.GetValue<int>("Kafka:MessageTimeoutMs")
}; var builder = new ProducerBuilder<string, string>(producerConfig);
_producer = builder.Build();
Object = _producer;
} /// <summary>
/// 发送事件
/// </summary>
/// <param name="event"></param>
public void Produce(IIntegrationEvent @event) {
ProduceAsync(@event).GetAwaiter().GetResult();
} /// <summary>
/// 发送事件
/// </summary>
/// <param name="event"></param>
public async Task ProduceAsync(IIntegrationEvent @event) {
await ProduceAsync(@event, @event.GetType().Name);
} /// <inheritdoc />
public async Task ProduceAsync(IIntegrationEvent @event, [NotNull] string eventName) {
if (string.IsNullOrEmpty(eventName)) {
throw new ArgumentNullException(nameof(eventName));
} var topic = _config.GetValue<string>($"Kafka:Topics:{eventName}");
if (string.IsNullOrEmpty(topic)) {
throw new NullReferenceException("topic不能为空");
}
var key = Guid.NewGuid().ToString();
try {
var json = JsonConvert.SerializeObject(@event);
var dr = await _producer.ProduceAsync(topic, new Message<string, string> { Key = key, Value = json });
_logger.LogInformation($"成功发送消息 {dr.Key}.{ @event.Key}, offSet: {dr.TopicPartitionOffset}");
} catch (ProduceException<string, string> ex) {
_logger.LogError(ex, $"发送失败 {topic}.{key}.{ @event.Key}, 原因 {ex.Error.Reason} ");
throw new ValidationException("当前服务器繁忙,请稍后再尝试");
}
} /// <summary>
/// 释放方法
/// </summary>
public void Dispose() {
_producer?.Dispose();
} /// <summary>
/// 要释放的对象
/// </summary>
public IProducer<string, string> Object { get; }
}
}
在这里我们来看看IMessageProducer接口定义
using System.Threading.Tasks;
using Sunlight.Kafka.Abstractions; namespace Sunlight.Kafka
{
/// <summary>
/// 消息的生产者
/// </summary>
public interface IMessageProducer
{
/// <summary>
/// 发送事件
/// </summary>
/// <param name="event"></param>
void Produce(IIntegrationEvent @event); /// <summary>
/// 发送事件
/// </summary>
/// <param name="event"></param>
Task ProduceAsync(IIntegrationEvent @event); /// <summary>
/// 发送事件
/// </summary>
/// <param name="event"></param>
/// <param name="eventName">指定事件的名称</param>
/// <returns></returns>
Task ProduceAsync(IIntegrationEvent @event, string eventName);
}
}
在接口中我们分别定义了消息发送的同步和异步及重载方法,另外我们还继承了ABP中的IDisposableDependencyObjectWrapper接口,关于这个接口我们来看一下接口的声明和定义(想了解更多的关于ABP的知识,也可点击这里关注本人之前的博客)。
using System; namespace Abp.Dependency
{
/// <summary>
/// This interface is used to wrap an object that is resolved from IOC container.
/// It inherits <see cref="IDisposable"/>, so resolved object can be easily released.
/// In <see cref="IDisposable.Dispose"/> method, <see cref="IIocResolver.Release"/> is called to dispose the object.
/// This is non-generic version of <see cref="IDisposableDependencyObjectWrapper{T}"/> interface.
/// </summary>
public interface IDisposableDependencyObjectWrapper : IDisposableDependencyObjectWrapper<object>
{ }
}
如果想了解关于这个接口更多的信息,请点击这里。
另外在实际发送消息的时候,我们需要记录消息的具体Partition以及Offset这样我们就能够快速找到这条消息,从而方便后面的重试,另外有时候由于服务器的网络问题的时候可能抛出MessageTimeout的消息,这个时候我们需要通过Confluent.Kafka库中的ProduceException异常来捕获这些信息记录抛出异常信息,另外在我们的业务层需要给出一个“当前服务器繁忙,请稍后再尝试”这样一个友好的提示信息。
另外在发送消息的时候,每一次都会产生一个Guid类型的Key发送到消息的消费方,这个Key将会作为接收消息的实体KafkaReceivedMessage 的主键Id,这个会在后文有具体的解释。
二 消费者方
在我们这篇文章记录的重点就是消费方,因为这里我们需要解决诸如消息重复消费以及乐观锁冲突的一系列问题,后面我们将会就这些问题来一一进行讲解和说明。
2.1 如何解决消息重复消费
在这里我们通过KafkaReceivedMessage这样一个实体来在数据库中记录收到的消息,并且在发送方每次发送时候传递唯一的一个Guid,这样我们就简单利用每次插入消息时主键Id不允许重复来处理重复发送的同一条消息的问题,我们首先来看看这个实体。
/// <summary>
/// Kafka消费者收到的消息记录
/// </summary>
public class KafkaReceivedMessage : Entity<Guid> {
/// <summary>
/// 消费者组
/// </summary>
[MaxLength(50)]
[Required]
public string Group { get; set; } /// <summary>
/// 消息主题
/// </summary>
[MaxLength(100)]
[Required]
public string Topic { get; set; } /// <summary>
/// 消息编号, 用于记录日志, 便于区分, 建议用编号
/// </summary>
[MaxLength(50)]
public string Code { get; set; } /// <summary>
/// 消息内容
/// </summary>
[MaxLength(int.MaxValue)]
public string Content { get; set; } /// <summary>
/// kafka 中的 partition
/// </summary>
public int? Partition { get; set; } /// <summary>
/// kafka 中的 offset
/// </summary>
[MaxLength(100)]
[Required]
public string Offset { get; set; } /// <summary>
/// 接受时间
/// </summary>
public DateTime ReceivedTime { get; set; } /// <summary>
/// 过期时间
/// </summary>
public DateTime? ExpiresAt { get; set; } /// <summary>
/// 重试次数
/// </summary>
public int Retries { get; set; } /// <summary>
/// 不是用Guid做全局唯一约束的消息
/// </summary>
public bool Old { get; set; } /// <inheritdoc />
public override string ToString() {
return $"{Group}.{Topic}.{Id}.{Code},{Partition}:{Offset}";
}
有了这个实体,我们在接收到这条消息的时候我们首先会尝试将这条消息存入到数据库,如果存入成功就说明不是重复消息,如果存入失败,就记录Kafka收到重复消息,我们先来看一下具体的实现。
using System;
using System.ComponentModel.DataAnnotations;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Data.SqlClient;
using Abp.Domain.Uow;
using Abp.Runtime.Validation;
using Confluent.Kafka;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Sunlight.Kafka.Abstractions;
using Sunlight.Kafka.Models; namespace Sunlight.Kafka {
/// <summary>
/// Kafka 消费者的后台服务基础类
/// </summary>
/// <typeparam name="T">事件类型</typeparam>
public abstract class KafkaConsumerHostedService<T> : BackgroundService where T : IIntegrationEvent {
/// <summary>
/// IOC服务提供方
/// </summary>
protected IServiceProvider Services { get; } /// <summary>
/// 配置文件
/// </summary>
protected IConfiguration Config { get; } /// <summary>
/// 主题
/// </summary>
protected string Topic { get; } /// <summary>
/// 日志
/// </summary>
protected ILogger<KafkaConsumerHostedService<T>> Logger { get; } /// <summary>
/// DbContext的类型, 必须是业务中实际的类型
/// </summary>
protected Type DbContextType { get; } /// <summary>
/// 消费者的配置
/// </summary>
protected ConsumerConfig ConsumerConfig { get; } /// <summary>
/// 保存失败时的重复次数, 一般用于 DbUpdateConcurrencyException
/// </summary>
protected int SaveDataRetries { get; } /// <summary>
/// 构造 <see cref="KafkaConsumerHostedService{T}"/>
/// </summary>
/// <param name="services"></param>
/// <param name="config"></param>
/// <param name="logger"></param>
/// <param name="dbContext">DbContext的类型, 必须是业务中实际的类型</param>
protected KafkaConsumerHostedService(IServiceProvider services,
IConfiguration config,
ILogger<KafkaConsumerHostedService<T>> logger, DbContext dbContext) {
Services = services;
Config = config;
Logger = logger;
DbContextType = dbContext.GetType(); Topic = Config.GetValue<string>($"Kafka:Topics:{typeof(T).Name}");
if (string.IsNullOrWhiteSpace(Topic)) {
Logger.LogCritical($"未能找到{typeof(T).Name}所对应的Topic");
Environment.Exit(0);
} const int MaxRetries = 5;
const int DefaultRetries = 2;
SaveDataRetries = Config.GetValue<int?>("Kafka:SaveDataRetries") ?? DefaultRetries;
SaveDataRetries = Math.Min(SaveDataRetries, MaxRetries); ConsumerConfig = new ConsumerConfig {
BootstrapServers = Config.GetValue<string>("Kafka:BootstrapServers"),
AutoOffsetReset = AutoOffsetReset.Earliest,
GroupId = Config.GetValue<string>("Application:Name"),
EnableAutoCommit = true
};
} /// <summary>
/// 消费该事件,比如调用 Application Service 持久化数据等
/// </summary>
/// <param name="event">事件内容</param>
protected abstract void DoWork(T @event); /// <summary>
/// 保存收到的消息到数据库, 防止重复消费
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
private async Task<bool> SaveMessageAsync(KafkaReceivedMessage message) {
using var scope = Services.CreateScope();
var service = (DbContext)scope.ServiceProvider.GetRequiredService(DbContextType);
service.Set<KafkaReceivedMessage>().Add(message);
try {
await service.SaveChangesAsync();
Logger.LogInformation($"Kafka 收到消息 {message}");
return true;
} catch (DbUpdateException ex) when (ex.InnerException?.Message.Contains("PRIMARY KEY") == true) {
Logger.LogError($"Kafka 收到重复消息 {message}");
} finally {
service.Entry(message).State = EntityState.Detached;
}
return false;
} /// <summary>
/// 反序列化消息
/// </summary>
/// <param name="result"></param>
/// <param name="message"></param>
/// <returns></returns>
protected virtual async Task<T> DeserializeEvent(ConsumeResult<string, string> result, KafkaReceivedMessage message) {
T @event;
try {
@event = JsonConvert.DeserializeObject<T>(result.Value);
} catch (Exception e) when(e is JsonReaderException || e is JsonSerializationException || e is JsonException) {
@event = default;
if (!await SaveMessageAsync(message))
Logger.LogError(e, ErrorMessageTemp, message, e.InnerException?.Message ?? e.Message);
} if (Guid.TryParse(result.Key, out var key) && result.Key != @event?.Key) {
message.Code = @event?.Key;
message.Id = key;
} else {
message.Id = Guid.NewGuid();
message.Code = result.Key;
message.Old = true;
} return await SaveMessageAsync(message) ? @event : default;
} private async Task TryDoWork(T @event, KafkaReceivedMessage message, int saveRetries) {
if (saveRetries <= 0) { Logger.LogError(ErrorMessageTemp, message, "乐观锁冲突");
return;
} try {
DoWork(@event);
// 在遇到 乐观锁冲突的时候, 需要重试几次, 因为这很容易就发生了.
} catch (DbUpdateConcurrencyException) {
#pragma warning disable SCS0005 // Weak random generator // 这样在收到重复消息的时候, 能降低冲突的概率
await Task.Delay(new Random(DateTime.Now.Millisecond).Next(10,100));
await TryDoWork(@event, message, --saveRetries);
} catch (AbpDbConcurrencyException) {
await Task.Delay(new Random(DateTime.Now.Millisecond).Next(10,100));
await TryDoWork(@event, message, --saveRetries);
}
#pragma warning restore SCS0005 // Weak random generator } const string ErrorMessageTemp = "Kafka 消息 {0} 消费失败, 原因: {1}"; /// <summary>
/// 构造 Kafka 消费者实例,监听指定 Topic,获得最新的事件
/// </summary>
/// <param name="stoppingToken">终止标识</param>
/// <returns></returns>
protected override async Task ExecuteAsync(CancellationToken stoppingToken) {
await Task.Factory.StartNew(async () => {
var builder = new ConsumerBuilder<string, string>(ConsumerConfig);
using var consumer = builder.Build();
consumer.Subscribe(Topic);
//当前事件的Key
Logger.LogInformation($"Kafka 消费者订阅 {Topic}");
while (!stoppingToken.IsCancellationRequested) {
try {
var result = consumer.Consume(stoppingToken);
//包含分区和OffSet的详细信息
var message = new KafkaReceivedMessage {
Group = ConsumerConfig.GroupId,
Topic = result.Topic,
Content = result.Value,
Partition = result.Partition,
Offset = result.Offset.ToString(),
ReceivedTime = DateTime.Now
};
try {
var @event = await DeserializeEvent(result, message);
if (@event == null)
continue;
TryDoWork(@event, message, SaveDataRetries);
} catch (ValidationException ex) {
Logger.LogError(ex, ErrorMessageTemp, message, ex.InnerException?.Message ?? ex.Message);
} catch (AbpValidationException ex) {
Logger.LogError(ex, ErrorMessageTemp, message, GetValidationErrorNarrative(ex));
} catch (SqlException ex) {
Logger.LogError(ex, ErrorMessageTemp, message, ex.InnerException?.Message ?? ex.Message);
} catch (Exception ex) {
Logger.LogError(ex, ErrorMessageTemp, message, ex.InnerException?.Message ?? ex.Message);
}
} catch (OperationCanceledException ex) {
consumer.Close();
Logger.LogInformation(ex, "Kafka 消费者结束,退出后台线程");
} catch (ConsumeException ex) {
Logger.LogError(ex, "Kafka 消费者产生异常,");
} catch (KafkaException ex) {
Logger.LogError(ex, "Kafka 产生异常,");
}
}
}, stoppingToken, TaskCreationOptions.LongRunning, TaskScheduler.Default);
} private static string GetValidationErrorNarrative(AbpValidationException validationException) {
var detailBuilder = new StringBuilder();
detailBuilder.AppendLine("验证过程中检测到以下错误"); foreach (var validationResult in validationException.ValidationErrors) {
detailBuilder.AppendFormat(" - {0}", validationResult.ErrorMessage);
detailBuilder.AppendLine();
} return detailBuilder.ToString();
}
}
}
这里我们通过SaveMessageAsync这个异步方法来保存数据到数据库,检测的时候我们通过捕获InnerException里面的Message中是否包含"PRIMARY KEY"来判断是不是主键冲突的。
3.2 乐观锁冲突校验
在做了第一步消息重复消费校验后,我们需要利用数据库中的DbUpdateConcurrencyException来捕获乐观锁的冲突,因为我们的业务处理都是通过继承KafkaConsumerHostedService这个基类,然后重载里面的DoWork方法来实现对业务代码的调用的,当然由于Kafka消息的异步特性,所以不可避免多个消息同时修改同一个实体,而由于这些异步消息产生的DbUpdateConcurrencyException就不可避免,在这里我们采用的默认次数是2次,最多可以重试5次的机制,通过这种方式来保证乐观锁冲突,如果5次重试还是失败则会提示乐观锁冲突,并且日志记录当前错误内容,通过这种方式能够在一定程度上减少由于并发问题导致的消费者消费失败的概率,当然关于这方面的探索还在随着业务的不断深入而不断去优化,期待后续的持续关注。
3.3 异常的捕获与处理
在我们的接收到消息以后会产生各种异常,如果处理这些异常也是非常重要的,当然根据这些异常的级别分别记录不同级别的日志是非常重要的,这里仅选择一种AbpValidationException这一种特例来进行说明,如果你对ABP中的AbpValidationException还不是很熟悉的话,请先阅读这篇文章。
private static string GetValidationErrorNarrative(AbpValidationException validationException) {
var detailBuilder = new StringBuilder();
detailBuilder.AppendLine("验证过程中检测到以下错误"); foreach (var validationResult in validationException.ValidationErrors) {
detailBuilder.AppendFormat(" - {0}", validationResult.ErrorMessage);
detailBuilder.AppendLine();
} return detailBuilder.ToString();
}
由于在这里ABP中ValidationException中ValidationErrors会记录一组之前验证的错误信息,所以这里需要特别注意,这里在阅读的时候需要特别注意。