在上文中,我们讨论了事件措置惩罚惩罚器中东西生命周期的问题,在进入新的讨论之前,首先让我们总结一下,我们已经实现了哪些内容。下面的类图描述了我们已经实现的组件及其之间的关系,貌似系统已经变得越来越庞大了。
此中绿色的部分就是上文中新实现的部分,包孕一个简单的Event Store,一个事件措置惩罚惩罚器执行上下文的接口,以及一个基于ASP.NET Core依赖注入框架的执行上下文的实现。接下来,我们筹算裁减PassThroughEventBus,然后基于RabbitMQ实现一套新的事件总线。
事件总线的重构按照前面的结论,事件总线的执行需要依赖于事件措置惩罚惩罚器执行上下文,也就是上面类图中PassThroughEventBus对付IEventHandlerExecutionContext的引用。更具体些,是在事件总线订阅某种类型的事件时,需要将事件措置惩罚惩罚器注册到IEventHandlerExecutionContext中。那么在实现RabbitMQ时,也会有着类似的设计需求,即RabbitMQEventBus也需要依赖IEventHandlerExecutionContext接口,以保证事件措置惩罚惩罚器生命周期的合理性。
为此,我们新建一个基类:BaseEventBus,并将这部分大众的代码提取出来,需要注意以下几点:
通过BaseEventBus的结构函数传入IEventHandlerExecutionContext实例,也就限定了所有子类的实现中,必需在结构函数中传入IEventHandlerExecutionContext实例,这对付框架的设计非常有利:在实现新的事件总线时,框架的使用者无需检察API文档,即可知道事件总线与IEventHandlerExecutionContext之间的关系,这切合SOLID原则中的Open/Closed Principle
BaseEventBus的实现应该放在EdaSample.Common措施集中,更确切地说,它应该放在EdaSample.Common.Events定名空间下,因为它是属于框架级另外组件,并且不会依赖任何根本布局层的组件
BaseEventBus的代码如下:
public abstract class BaseEventBus : IEventBus { protected readonly IEventHandlerExecutionContext eventHandlerExecutionContext; protected BaseEventBus(IEventHandlerExecutionContext eventHandlerExecutionContext) { this.eventHandlerExecutionContext = eventHandlerExecutionContext; } public abstract Task PublishAsync<TEvent>(TEvent @event, CancellationToken cancellationToken = default) where TEvent : IEvent; public abstract void Subscribe<TEvent, TEventHandler>() where TEvent : IEvent where TEventHandler : IEventHandler<TEvent>; // Disposable接口实现代码省略 }在上面的代码中,PublishAsync和Subscribe要领是抽象要领,以便子类按照差此外需要来实现。
接下来就是调解PassThroughEventBus,使其担任于BaseEventBus:
public sealed class PassThroughEventBus : BaseEventBus { private readonly EventQueue eventQueue = new EventQueue(); private readonly ILogger logger; public PassThroughEventBus(IEventHandlerExecutionContext context, ILogger<PassThroughEventBus> logger) : base(context) { this.logger = logger; logger.LogInformation($"PassThroughEventBus结构函数挪用完成。Hash Code:{this.GetHashCode()}."); eventQueue.EventPushed += EventQueue_EventPushed; } private async void EventQueue_EventPushed(object sender, EventProcessedEventArgs e) => await this.eventHandlerExecutionContext.HandleEventAsync(e.Event); public override Task PublishAsync<TEvent>(TEvent @event, CancellationToken cancellationToken = default) { return Task.Factory.StartNew(() => eventQueue.Push(@event)); } public override void Subscribe<TEvent, TEventHandler>() { if (!this.eventHandlerExecutionContext.HandlerRegistered<TEvent, TEventHandler>()) { this.eventHandlerExecutionContext.RegisterHandler<TEvent, TEventHandler>(); } } // Disposable接口实现代码省略 }代码都很简单,也就不久不多做说明了,接下来,我们开始实现RabbitMQEventBus。
RabbitMQEventBus的实现首先需要新建一个.NET Standard 2.0的项目,使用.NET Standard 2.0的项目模板所创建的项目,可以同时被.NET Framework 4.6.1或者.NET Core 2.0的应用措施所引用。创建新的类库项目的目的,是因为RabbitMQEventBus的实现需要依赖RabbitMQ C#开发库这个外部引用。因此,为了保证框架核心的纯净和不变,需要在新的类库项目中实现RabbitMQEventBus。
Note:对付RabbitMQ及其C#库的介绍,本文就不再涉及了,网上有很多资料和文档,博客园有很多伴侣在这方面都有使用经验分享,RabbitMQ官方文档也写得非常详细,固然是英文版的,,如果英语对照好的话,建议参考官方文档。
以下就是在EdaSample案例中,RabbitMQEventBus的实现,我们先读一读代码,再对这部分代码做些分析。
public class RabbitMQEventBus : BaseEventBus { private readonly IConnectionFactory connectionFactory; private readonly IConnection connection; private readonly IModel channel; private readonly string exchangeName; private readonly string exchangeType; private readonly string queueName; private readonly bool autoAck; private readonly ILogger logger; private bool disposed; public RabbitMQEventBus(IConnectionFactory connectionFactory, ILogger<RabbitMQEventBus> logger, IEventHandlerExecutionContext context, string exchangeName, string exchangeType = ExchangeType.Fanout, string queueName = null, bool autoAck = false) : base(context) { this.connectionFactory = connectionFactory; this.logger = logger; this.connection = this.connectionFactory.CreateConnection(); this.channel = this.connection.CreateModel(); this.exchangeType = exchangeType; this.exchangeName = exchangeName; this.autoAck = autoAck; this.channel.ExchangeDeclare(this.exchangeName, this.exchangeType); this.queueName = this.InitializeEventConsumer(queueName); logger.LogInformation($"RabbitMQEventBus结构函数挪用完成。Hash Code:{this.GetHashCode()}."); } public override Task PublishAsync<TEvent>(TEvent @event, CancellationToken cancellationToken = default(CancellationToken)) { var json = JsonConvert.SerializeObject(@event, new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All }); var eventBody = Encoding.UTF8.GetBytes(json); channel.BasicPublish(this.exchangeName, @event.GetType().FullName, null, eventBody); return Task.CompletedTask; } public override void Subscribe<TEvent, TEventHandler>() { if (!this.eventHandlerExecutionContext.HandlerRegistered<TEvent, TEventHandler>()) { this.eventHandlerExecutionContext.RegisterHandler<TEvent, TEventHandler>(); this.channel.QueueBind(this.queueName, this.exchangeName, typeof(TEvent).FullName); } } protected override void Dispose(bool disposing) { if (!disposed) { if (disposing) { this.channel.Dispose(); this.connection.Dispose(); logger.LogInformation($"RabbitMQEventBus已经被Dispose。Hash Code:{this.GetHashCode()}."); } disposed = true; base.Dispose(disposing); } } private string InitializeEventConsumer(string queue) { var localQueueName = queue; if (string.IsNullOrEmpty(localQueueName)) { localQueueName = this.channel.QueueDeclare().QueueName; } else { this.channel.QueueDeclare(localQueueName, true, false, false, null); } var consumer = new EventingBasicConsumer(this.channel); consumer.Received += async (model, eventArgument) => { var eventBody = eventArgument.Body; var json = Encoding.UTF8.GetString(eventBody); var @event = (IEvent)JsonConvert.DeserializeObject(json, new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All }); await this.eventHandlerExecutionContext.HandleEventAsync(@event); if (!autoAck) { channel.BasicAck(eventArgument.DeliveryTag, false); } }; this.channel.BasicConsume(localQueueName, autoAck: this.autoAck, consumer: consumer); return localQueueName; } }阅读上面的代码,需要注意以下几点:
正如上面所述,结构函数需要接受IEventHandlerExecutionContext东西,并通过结构函数的base挪用,将该东西通报给基类
结构函数中,queueName参数是可选参数,也就是说: