前言
上一节咱们讲了LocalEventBus,本节来讲本地事件总线(DistributedEventBus),采用的RabbitMQ进行实现。
Volo.Abp.EventBus.RabbitMQ
模块内部代码并不多,RabbitMQ的操作都集中在Volo.Abp.RabbitMQ
这个包中。
正文
我们从模块定义开始看,项目启动的时候分别读取了appsetting.json
的配置参数和调用了RabbitMqDistributedEventBus
的Initialize
函数。
public class AbpEventBusRabbitMqModule : AbpModule
{
public override void ConfigureServices(ServiceConfigurationContext context)
{
var configuration = context.Services.GetConfiguration();
Configure<AbpRabbitMqEventBusOptions>(configuration.GetSection("RabbitMQ:EventBus"));
}
public override void OnApplicationInitialization(ApplicationInitializationContext context)
{
context
.ServiceProvider
.GetRequiredService<RabbitMqDistributedEventBus>()
.Initialize();
}
}
在Initialize
函数中我们根据 MessageConsumerFactory.Create
向内部进行查阅可以看到最终调用方法为RabbitMqMessageConsumer.TryCreateChannelAsync
并且在其内部我们可以看到下面代码,这里定义了消费的回调函数。反推Initialize
方法其实是在启动一个消费者。
public void Initialize()
{
Consumer = MessageConsumerFactory.Create(
new ExchangeDeclareConfiguration(
AbpRabbitMqEventBusOptions.ExchangeName,
type: "direct",
durable: true
),
new QueueDeclareConfiguration(
AbpRabbitMqEventBusOptions.ClientName,
durable: true,
exclusive: false,
autoDelete: false
),
AbpRabbitMqEventBusOptions.ConnectionName
);
Consumer.OnMessageReceived(ProcessEventAsync);
SubscribeHandlers(AbpDistributedEventBusOptions.Handlers);
}
var consumer = new AsyncEventingBasicConsumer(Channel);
consumer.Received += HandleIncomingMessageAsync;
继续向下看Consumer.OnMessageReceived(ProcessEventAsync);
该方法向一个并发安全集合输入一个委托事件,并该事件会在上面的HandleIncomingMessageAsync
会调中触发故确定为消费者的执行逻辑,而ProcessEventAsync
其实还是走了我们在讲LocalEventBus哪一套,寻找Handler执行函数。
SubscribeHandlers
还是上节讲的基类的函数,这里要注意内部调用的Subscribe
该方法中的 Consumer.BindAsync
会根据为消费者Bind路由,这样才能触发事件处理函数。
public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory)
{
var handlerFactories = GetOrCreateHandlerFactories(eventType);
if (factory.IsInFactories(handlerFactories))
{
return NullDisposable.Instance;
}
handlerFactories.Add(factory);
if (handlerFactories.Count == 1) //TODO: Multi-threading!
{
Consumer.BindAsync(EventNameAttribute.GetNameOrDefault(eventType));
}
return new EventHandlerFactoryUnregistrar(this, eventType, factory);
}
看完了事件消费者我们来看看事件发布,直接看PublishAsync
函数就完事了,整个函数非常简单,都是RabbitMQ的操作语法,这里的路由Key是在EventNameAttribute.GetNameOrDefault(eventType);
函数中通过读取ETO上指定注解Name来指定的。
protected Task PublishAsync(
string eventName,
byte[] body,
IBasicProperties properties,
Dictionary<string, object> headersArguments = null,
Guid? eventId = null)
{
using (var channel = ConnectionPool.Get(AbpRabbitMqEventBusOptions.ConnectionName).CreateModel())
{
channel.ExchangeDeclare(
AbpRabbitMqEventBusOptions.ExchangeName,
"direct",
durable: true
);
if (properties == null)
{
properties = channel.CreateBasicProperties();
properties.DeliveryMode = RabbitMqConsts.DeliveryModes.Persistent;
}
if (properties.MessageId.IsNullOrEmpty())
{
properties.MessageId = (eventId ?? GuidGenerator.Create()).ToString("N");
}
SetEventMessageHeaders(properties, headersArguments);
channel.BasicPublish(
exchange: AbpRabbitMqEventBusOptions.ExchangeName,
routingKey: eventName,
mandatory: true,
basicProperties: properties,
body: body
);
}
return Task.CompletedTask;
}
解析
整个分布式事件的实现其实非常简单,在事件发生时发布者只需要定义好路由名称和消息内容发送RabbitMQ中,而消费者则是在项目运行的时候的通过调用Initialize
就启动起来了。
这里我们也同样根据整个原理自己实现一下这个流程。
在Dppt.EventBus
分别定义IDistributedEventBus、DistributedEventBusOptions、IDistributedEventHandler
分别用于采用分布式事件总线调用、配置选项用于存储处理程序Handler、定义分布式处理程序抽象。
新建Dppt.EventBus.RabbitMQ
类库先简单对RabbitMQ进行一个简单的封装
public class RabbitMqConnections : IRabbitMqConnections
{
private readonly IConnectionFactory _connectionFactory;
private readonly ILogger<RabbitMqConnections> _logger;
IConnection _connection;
bool _disposed;
public RabbitMqConnections(IConnectionFactory connectionFactory, ILogger<RabbitMqConnections> logger)
{
_connectionFactory = connectionFactory;
_logger = logger;
}
public bool IsConnected
{
get
{
return _connection != null && _connection.IsOpen && !_disposed;
}
}
public void TryConnect() {
_connection = _connectionFactory.CreateConnection();
}
public IModel CreateModel()
{
if (!IsConnected)
{
throw new InvalidOperationException("No RabbitMQ connections are available to perform this action");
}
return _connection.CreateModel();
}
public void Dispose()
{
if (_disposed) return;
_disposed = true;
try
{
_connection.Dispose();
}
catch (IOException ex)
{
_logger.LogCritical(ex.ToString());
}
}
}
然后我们分别定义ExchangeDeclareConfiguration、QueueDeclareConfiguration
用于记录配置信息。
开始处理RabbitMqEventBus处理程序首先是发布事件,大体代码如下就是往RabbitMQ里面丢消息。
/// <summary>
/// rabbmitmq 连接服务
/// </summary>
public readonly IRabbitMqConnections _rabbitMqConnections;
public Task PublishAsync<TEvent>(TEvent eventData)
{
var eventName = EventNameAttribute.GetNameOrDefault(typeof(TEvent));
var body = JsonSerializer.Serialize(eventData);
return PublishAsync(eventName, body, null, null);
}
public Task PublishAsync(string eventName, string body, IBasicProperties properties, Dictionary<string, object> headersArguments = null, Guid? eventId = null)
{
if (!_rabbitMqConnections.IsConnected)
{
_rabbitMqConnections.TryConnect();
}
using (var channel = _rabbitMqConnections.CreateModel())
{
// durable 设置队列持久化
channel.ExchangeDeclare(RabbitMqEventBusOptions.ExchangeName, "direct", durable: true);
if (properties == null)
{
properties = channel.CreateBasicProperties();
// 设置消息持久化
properties.DeliveryMode = 2;
}
if (properties.MessageId.IsNullOrEmpty())
{
// 消息的唯一性标识
properties.MessageId = (eventId ?? Guid.NewGuid()).ToString("N");
}
SetEventMessageHeaders(properties, headersArguments);
channel.BasicPublish(
exchange: RabbitMqEventBusOptions.ExchangeName,
routingKey: eventName,
mandatory: true,
basicProperties: properties,
body: Encoding.UTF8.GetBytes(body)
);
}
return Task.CompletedTask;
}
private void SetEventMessageHeaders(IBasicProperties properties, Dictionary<string, object> headersArguments)
{
if (headersArguments == null)
{
return;
}
properties.Headers ??= new Dictionary<string, object>();
foreach (var header in headersArguments)
{
properties.Headers[header.Key] = header.Value;
}
}
然后就是消费者的处理,我们同样定义Initialize
函数,并简化部分封装代码,完成消费者启动。
public void Initialize()
{
Exchange = new ExchangeDeclareConfiguration(RabbitMqEventBusOptions.ExchangeName,"direct",true);
Queue = new QueueDeclareConfiguration(RabbitMqEventBusOptions.ClientName, true, false, false);
// 启动一个消费者
if (!_rabbitMqConnections.IsConnected)
{
_rabbitMqConnections.TryConnect();
}
try
{
Channel = _rabbitMqConnections.CreateModel();
Channel.ExchangeDeclare(
exchange: Exchange.ExchangeName,
type: Exchange.Type,
durable: Exchange.Durable,
autoDelete: Exchange.AutoDelete,
arguments: Exchange.Arguments
);
Channel.QueueDeclare(
queue: Queue.QueueName,
durable: Queue.Durable,
exclusive: Queue.Exclusive,
autoDelete: Queue.AutoDelete,
arguments: Queue.Arguments
);
var consumer = new AsyncEventingBasicConsumer(Channel);
consumer.Received += HandleIncomingMessageAsync;
Channel.BasicConsume(
queue: Queue.QueueName,
autoAck: false,
consumer: consumer
);
SubscribeHandlers(DistributedEventBusOptions.Handlers);
}
catch (Exception ex)
{
Console.WriteLine("Error:" + ex.Message);
}
}
参数配置这边主要是读取AppSetting信息和索要Handler
public static class DpptEventBusRabbitMqRegistrar
{
public static void AddDpptEventBusRabbitMq(this IServiceCollection services, IConfiguration configuration, List<Type> types)
{
services.AddSingleton<IRabbitMqConnections>(sp =>
{
var logger = sp.GetRequiredService<ILogger<RabbitMqConnections>>();
var factory = new ConnectionFactory()
{
HostName = configuration["RabbitMQ:EventBusConnection"],
VirtualHost = configuration["RabbitMQ:EventBusVirtualHost"],
DispatchConsumersAsync = true,
AutomaticRecoveryEnabled = true
};
if (!string.IsNullOrEmpty(configuration["RabbitMQ:EventBusUserName"]))
{
factory.UserName = configuration["RabbitMQ:EventBusUserName"];
}
if (!string.IsNullOrEmpty(configuration["RabbitMQ:EventBusPassword"]))
{
factory.Password = configuration["RabbitMQ:EventBusPassword"];
}
return new RabbitMqConnections(factory, logger);
});
var distributedHandlers = types;
foreach (var item in distributedHandlers)
{
services.AddSingleton(item);
}
services.Configure<DistributedEventBusOptions>(options =>
{
options.Handlers.AddIfNotContains(distributedHandlers);
});
services.Configure<DpptRabbitMqEventBusOptions>(options => {
options.ExchangeName = configuration["RabbitMQ:EventBus:ExchangeName"];
options.ClientName = configuration["RabbitMQ:EventBus:ClientName"];
});
services.AddSingleton<IDistributedEventBus, RabbitMqDistributedEventBus>();
}
}
测试
新建一个空项目,进行插件注册,然后创建ETO和Handler进行测试。
测试结果放在下面了。
结语
本次挑选了一个比较简单的示例来讲,整个EventBus我应该分成3篇 下一篇我来讲分布式事务。
最后欢迎各位读者关注我的博客, https://github.com/MrChuJiu/Dppt/tree/master/src 欢迎大家Star
另外这里有个社区地址(https://github.com/MrChuJiu/Dppt/discussions),如果大家有技术点希望我提前档期可以写在这里,希望本项目助力我们一起成长