源码解析-Abp vNext丨分布式事件总线DistributedEventBus

时间:2022-08-27 12:09:18

前言

上一节咱们讲了LocalEventBus,本节来讲本地事件总线(DistributedEventBus),采用的RabbitMQ进行实现。

Volo.Abp.EventBus.RabbitMQ模块内部代码并不多,RabbitMQ的操作都集中在Volo.Abp.RabbitMQ这个包中。

正文

我们从模块定义开始看,项目启动的时候分别读取了appsetting.json的配置参数和调用了RabbitMqDistributedEventBusInitialize函数。

    public class AbpEventBusRabbitMqModule : AbpModule
{
public override void ConfigureServices(ServiceConfigurationContext context)
{
var configuration = context.Services.GetConfiguration(); Configure<AbpRabbitMqEventBusOptions>(configuration.GetSection("RabbitMQ:EventBus"));
} public override void OnApplicationInitialization(ApplicationInitializationContext context)
{
context
.ServiceProvider
.GetRequiredService<RabbitMqDistributedEventBus>()
.Initialize();
}
}

Initialize函数中我们根据 MessageConsumerFactory.Create向内部进行查阅可以看到最终调用方法为RabbitMqMessageConsumer.TryCreateChannelAsync并且在其内部我们可以看到下面代码,这里定义了消费的回调函数。反推Initialize方法其实是在启动一个消费者。

      public void Initialize()
{
Consumer = MessageConsumerFactory.Create(
new ExchangeDeclareConfiguration(
AbpRabbitMqEventBusOptions.ExchangeName,
type: "direct",
durable: true
),
new QueueDeclareConfiguration(
AbpRabbitMqEventBusOptions.ClientName,
durable: true,
exclusive: false,
autoDelete: false
),
AbpRabbitMqEventBusOptions.ConnectionName
); Consumer.OnMessageReceived(ProcessEventAsync); SubscribeHandlers(AbpDistributedEventBusOptions.Handlers);
}
 var consumer = new AsyncEventingBasicConsumer(Channel);
consumer.Received += HandleIncomingMessageAsync;

继续向下看Consumer.OnMessageReceived(ProcessEventAsync);该方法向一个并发安全集合输入一个委托事件,并该事件会在上面的HandleIncomingMessageAsync会调中触发故确定为消费者的执行逻辑,而ProcessEventAsync其实还是走了我们在讲LocalEventBus哪一套,寻找Handler执行函数。

SubscribeHandlers还是上节讲的基类的函数,这里要注意内部调用的Subscribe该方法中的 Consumer.BindAsync会根据为消费者Bind路由,这样才能触发事件处理函数。


public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory)
{
var handlerFactories = GetOrCreateHandlerFactories(eventType); if (factory.IsInFactories(handlerFactories))
{
return NullDisposable.Instance;
} handlerFactories.Add(factory); if (handlerFactories.Count == 1) //TODO: Multi-threading!
{
Consumer.BindAsync(EventNameAttribute.GetNameOrDefault(eventType));
} return new EventHandlerFactoryUnregistrar(this, eventType, factory);
}

看完了事件消费者我们来看看事件发布,直接看PublishAsync函数就完事了,整个函数非常简单,都是RabbitMQ的操作语法,这里的路由Key是在EventNameAttribute.GetNameOrDefault(eventType);函数中通过读取ETO上指定注解Name来指定的。

protected Task PublishAsync(
string eventName,
byte[] body,
IBasicProperties properties,
Dictionary<string, object> headersArguments = null,
Guid? eventId = null)
{
using (var channel = ConnectionPool.Get(AbpRabbitMqEventBusOptions.ConnectionName).CreateModel())
{
channel.ExchangeDeclare(
AbpRabbitMqEventBusOptions.ExchangeName,
"direct",
durable: true
); if (properties == null)
{
properties = channel.CreateBasicProperties();
properties.DeliveryMode = RabbitMqConsts.DeliveryModes.Persistent;
} if (properties.MessageId.IsNullOrEmpty())
{
properties.MessageId = (eventId ?? GuidGenerator.Create()).ToString("N");
} SetEventMessageHeaders(properties, headersArguments); channel.BasicPublish(
exchange: AbpRabbitMqEventBusOptions.ExchangeName,
routingKey: eventName,
mandatory: true,
basicProperties: properties,
body: body
);
} return Task.CompletedTask;
}

解析

整个分布式事件的实现其实非常简单,在事件发生时发布者只需要定义好路由名称和消息内容发送RabbitMQ中,而消费者则是在项目运行的时候的通过调用Initialize就启动起来了。

这里我们也同样根据整个原理自己实现一下这个流程。

Dppt.EventBus分别定义IDistributedEventBus、DistributedEventBusOptions、IDistributedEventHandler分别用于采用分布式事件总线调用、配置选项用于存储处理程序Handler、定义分布式处理程序抽象。

新建Dppt.EventBus.RabbitMQ类库先简单对RabbitMQ进行一个简单的封装

public class RabbitMqConnections : IRabbitMqConnections
{
private readonly IConnectionFactory _connectionFactory;
private readonly ILogger<RabbitMqConnections> _logger;
IConnection _connection;
bool _disposed;
public RabbitMqConnections(IConnectionFactory connectionFactory, ILogger<RabbitMqConnections> logger)
{
_connectionFactory = connectionFactory;
_logger = logger;
} public bool IsConnected
{
get
{
return _connection != null && _connection.IsOpen && !_disposed;
}
} public void TryConnect() { _connection = _connectionFactory.CreateConnection(); } public IModel CreateModel()
{
if (!IsConnected)
{
throw new InvalidOperationException("No RabbitMQ connections are available to perform this action");
} return _connection.CreateModel();
} public void Dispose()
{
if (_disposed) return; _disposed = true; try
{
_connection.Dispose();
}
catch (IOException ex)
{
_logger.LogCritical(ex.ToString());
}
} }

然后我们分别定义ExchangeDeclareConfiguration、QueueDeclareConfiguration用于记录配置信息。

开始处理RabbitMqEventBus处理程序首先是发布事件,大体代码如下就是往RabbitMQ里面丢消息。

        /// <summary>
/// rabbmitmq 连接服务
/// </summary>
public readonly IRabbitMqConnections _rabbitMqConnections; public Task PublishAsync<TEvent>(TEvent eventData)
{
var eventName = EventNameAttribute.GetNameOrDefault(typeof(TEvent));
var body = JsonSerializer.Serialize(eventData);
return PublishAsync(eventName, body, null, null);
} public Task PublishAsync(string eventName, string body, IBasicProperties properties, Dictionary<string, object> headersArguments = null, Guid? eventId = null)
{ if (!_rabbitMqConnections.IsConnected)
{
_rabbitMqConnections.TryConnect();
}
using (var channel = _rabbitMqConnections.CreateModel())
{
// durable 设置队列持久化
channel.ExchangeDeclare(RabbitMqEventBusOptions.ExchangeName, "direct", durable: true); if (properties == null)
{
properties = channel.CreateBasicProperties();
// 设置消息持久化
properties.DeliveryMode = 2;
} if (properties.MessageId.IsNullOrEmpty())
{
// 消息的唯一性标识
properties.MessageId = (eventId ?? Guid.NewGuid()).ToString("N");
} SetEventMessageHeaders(properties, headersArguments); channel.BasicPublish(
exchange: RabbitMqEventBusOptions.ExchangeName,
routingKey: eventName,
mandatory: true,
basicProperties: properties,
body: Encoding.UTF8.GetBytes(body)
); } return Task.CompletedTask;
} private void SetEventMessageHeaders(IBasicProperties properties, Dictionary<string, object> headersArguments)
{
if (headersArguments == null)
{
return;
} properties.Headers ??= new Dictionary<string, object>(); foreach (var header in headersArguments)
{
properties.Headers[header.Key] = header.Value;
}
}

然后就是消费者的处理,我们同样定义Initialize函数,并简化部分封装代码,完成消费者启动。

 public void Initialize()
{ Exchange = new ExchangeDeclareConfiguration(RabbitMqEventBusOptions.ExchangeName,"direct",true);
Queue = new QueueDeclareConfiguration(RabbitMqEventBusOptions.ClientName, true, false, false); // 启动一个消费者
if (!_rabbitMqConnections.IsConnected)
{
_rabbitMqConnections.TryConnect();
} try
{ Channel = _rabbitMqConnections.CreateModel(); Channel.ExchangeDeclare(
exchange: Exchange.ExchangeName,
type: Exchange.Type,
durable: Exchange.Durable,
autoDelete: Exchange.AutoDelete,
arguments: Exchange.Arguments
); Channel.QueueDeclare(
queue: Queue.QueueName,
durable: Queue.Durable,
exclusive: Queue.Exclusive,
autoDelete: Queue.AutoDelete,
arguments: Queue.Arguments
); var consumer = new AsyncEventingBasicConsumer(Channel);
consumer.Received += HandleIncomingMessageAsync; Channel.BasicConsume(
queue: Queue.QueueName,
autoAck: false,
consumer: consumer
); SubscribeHandlers(DistributedEventBusOptions.Handlers);
}
catch (Exception ex)
{
Console.WriteLine("Error:" + ex.Message);
}
}

参数配置这边主要是读取AppSetting信息和索要Handler

 public static class DpptEventBusRabbitMqRegistrar
{
public static void AddDpptEventBusRabbitMq(this IServiceCollection services, IConfiguration configuration, List<Type> types)
{ services.AddSingleton<IRabbitMqConnections>(sp =>
{
var logger = sp.GetRequiredService<ILogger<RabbitMqConnections>>(); var factory = new ConnectionFactory()
{
HostName = configuration["RabbitMQ:EventBusConnection"],
VirtualHost = configuration["RabbitMQ:EventBusVirtualHost"],
DispatchConsumersAsync = true,
AutomaticRecoveryEnabled = true
}; if (!string.IsNullOrEmpty(configuration["RabbitMQ:EventBusUserName"]))
{
factory.UserName = configuration["RabbitMQ:EventBusUserName"];
} if (!string.IsNullOrEmpty(configuration["RabbitMQ:EventBusPassword"]))
{
factory.Password = configuration["RabbitMQ:EventBusPassword"];
} return new RabbitMqConnections(factory, logger);
}); var distributedHandlers = types;
foreach (var item in distributedHandlers)
{
services.AddSingleton(item);
} services.Configure<DistributedEventBusOptions>(options =>
{
options.Handlers.AddIfNotContains(distributedHandlers);
}); services.Configure<DpptRabbitMqEventBusOptions>(options => { options.ExchangeName = configuration["RabbitMQ:EventBus:ExchangeName"];
options.ClientName = configuration["RabbitMQ:EventBus:ClientName"];
}); services.AddSingleton<IDistributedEventBus, RabbitMqDistributedEventBus>(); }
}

测试

新建一个空项目,进行插件注册,然后创建ETO和Handler进行测试。

源码解析-Abp vNext丨分布式事件总线DistributedEventBus

测试结果放在下面了。

源码解析-Abp vNext丨分布式事件总线DistributedEventBus

源码解析-Abp vNext丨分布式事件总线DistributedEventBus

结语

本次挑选了一个比较简单的示例来讲,整个EventBus我应该分成3篇 下一篇我来讲分布式事务。

最后欢迎各位读者关注我的博客, https://github.com/MrChuJiu/Dppt/tree/master/src 欢迎大家Star

另外这里有个社区地址(https://github.com/MrChuJiu/Dppt/discussions),如果大家有技术点希望我提前档期可以写在这里,希望本项目助力我们一起成长