ASP.NET Core Web API下事件驱动型架构的实现(二):事件处理器中对象生命周期的管理

时间:2024-01-13 19:46:08

上文中,我介绍了事件驱动型架构的一种简单的实现,并演示了一个完整的事件派发、订阅和处理的流程。这种实现太简单了,百十行代码就展示了一个基本工作原理。然而,要将这样的解决方案运用到实际生产环境,还有很长的路要走。今天,我们就研究一下在事件处理器中,对象生命周期的管理问题。

事实上,不仅仅是在事件处理器中,我们需要关心对象的生命周期,在整个ASP.NET Core Web API的应用程序里,我们需要理解并仔细推敲被注册到IoC容器中的服务,它们的生命周期应该是个怎样的情形,这也是服务端应用程序设计必须认真考虑的内容。因为如果生命周期管理不合理,程序申请的资源无法合理释放,最后便会带来内存泄漏、程序崩溃等各种问题,然而这样的问题对于服务端应用程序来说,是非常严重的。

记得在上一篇文章的结束部分,我给大家留下一个练习,就是让大家在CustomerCreatedEventHandler事件处理器的HandleAsync方法中,填入自己的代码,以便对获得的事件消息做进一步的处理。作为本文的引子,我们首先将这部分工作做完,然后再进一步分析生命周期的问题。

Event Store

Event Store是CQRS体系结构模式中最为重要的一个组成部分,它的主要职责就是保存发生于领域模型中的领域事件,并对事件数据进行归档。当仓储需要获取领域模型对象时,Event Store也会配合快照数据库一起,根据领域事件的发生顺序,逐步回放并重塑领域模型对象。事实上,Event Store的实现是非常复杂的,虽然从它的职责上来看并不算太复杂,然而它所需要解决的事件同步、快照、性能、消息派发等问题,使得CQRS体系结构的实现变得非常复杂。在实际应用中,已经有一些比较成熟的框架和工具集,能够帮助我们在CQRS中很方便地实现Event Store,比如GetEventStore就是一个很好的开源Event Store框架,它是基于.NET开发的,在微软官方的eShopOnContainers说明文档中,也提到了这个框架,推荐大家上他们的官网(https://eventstore.org/)了解一下。在这里我们就先不深入研究Event Store应该如何实现,我们先做一个简单的Event Store,以便展示我们需要讨论的问题。

延续着上一版的代码库(https://github.com/daxnet/edasample/tree/chapter_1),我们首先在EdaSample.Common.Events命名空间下,定义一个IEventStore的接口,这个接口非常简单,仅仅包含一个保存事件的方法,代码如下:

public interface IEventStore : IDisposable
{
Task SaveEventAsync<TEvent>(TEvent @event)
where TEvent : IEvent;
}

SaveEventAsync方法仅有一个参数:由泛型类型TEvent绑定的@event对象。泛型约束表示SaveEventAsync方法仅能接受IEvent接口及其实现类型的对象作为参数传入。接口定义好了,下一步就是实现这个接口,对传入的事件对象进行保存。为了实现过程的简单,我们使用Dapper,将事件数据保存到SQL Server数据库中,来模拟Event Store对事件的保存操作。

Note:为什么IEventStore接口的SaveEventAsync方法签名中,没有CancellationToken参数?严格来说,支持async/await异步编程模型的方法定义上,是需要带上CancellationToken参数的,以便调用方请求取消操作的时候,方法内部可以根据情况对操作进行取消。然而有些情况下取消操作并不是那么合理,或者方法内部所使用的API并没有提供更深层的取消支持,因此也就没有必要在方法定义上增加CancellationToken参数。在此处,为了保证接口的简单,没有引入CancellationToken的参数。

接下来,我们实现这个接口,并用Dapper将事件数据保存到SQL Server中。出于框架设计的考虑,我们新建一个Net Standard Class Library项目,在这个新的项目中实现IEventStore接口,这么做的原因已经在上文中介绍过了。代码如下:

public class DapperEventStore : IEventStore
{
private readonly string connectionString; public DapperEventStore(string connectionString)
{
this.connectionString = connectionString;
} public async Task SaveEventAsync<TEvent>(TEvent @event) where TEvent : IEvent
{
const string sql = @"INSERT INTO [dbo].[Events]
([EventId], [EventPayload], [EventTimestamp])
VALUES
(@eventId, @eventPayload, @eventTimestamp)";
using (var connection = new SqlConnection(this.connectionString))
{
await connection.ExecuteAsync(sql, new
{
eventId = @event.Id,
eventPayload = JsonConvert.SerializeObject(@event),
eventTimestamp = @event.Timestamp
});
}
} #region IDisposable Support
// 此处省略
#endregion
}

IDisposable接口的实现部分暂且省略,可以看到,实现还是非常简单的:通过构造函数传入数据库的连接字符串,在SaveEventAsyc方法中,基于SqlConnection对象执行Dapper的扩展方法来完成事件数据的保存。

Note: 此处使用了JsonConvert.SerializeObject方法来序列化事件对象,也就意味着DapperEventStore程序集需要依赖Newtonsoft.Json程序集。虽然在我们此处的案例中不会有什么影响,但这样做会造成DapperEventStore对Newtonsoft.Json的强依赖,这样的依赖关系不仅让DapperEventStore变得不可测试,而且Newtonsoft.Json将来未知的变化,也会影响到DapperEventStore,带来一些不确定性和维护性问题。更好的做法是,引入一个IMessageSerializer接口,在另一个新的程序集中使用Newtonsoft.Json来实现这个接口,同时仅让DapperEventStore依赖IMessageSerializer,并在应用程序启动时,将Newtonsoft.Json的实现注册到IoC容器中。此时,IMessageSerializer可以被Mock,DapperEventStore就变得可测试了;另一方面,由于只有那个新的程序集会依赖Newtonsoft.Json,因此,Newtonsoft.Json的变化也仅仅会影响那个新的程序集,不会对框架主体的其它部分造成任何影响。

EventStore实现好了,接下来,我们将其用在CustomerCreatedEventHandler中,以便将订阅的CustomerCreatedEvent保存下来。

事件数据的保存

保存事件数据的第一步,就是在ASP.NET Core Web API的IoC容器中,将DapperEventStore注册进去。这一步是非常简单的,只需要在Startup.cs的ConfigureServices方法中完成即可。代码如下:

public void ConfigureServices(IServiceCollection services)
{
services.AddMvc(); services.AddTransient<IEventHandler, CustomerCreatedEventHandler>();
services.AddTransient<IEventStore>(serviceProvider => new DapperEventStore(Configuration["mssql:connectionString"]));
services.AddSingleton<IEventBus, PassThroughEventBus>();
}

注意我们使用的是services.AddTransient方法来注册DapperEventStore,我们希望应用程序在每次请求IEventStore实例时,都能获得一个新的DapperEventStore的实例。

接下来,打开CustomerCreatedEventHandler.cs文件,在构造函数中加入对IEventStore的依赖,然后修改HandleAsync方法,在该方法中使用IEventStore的实例来完成事件数据的保存。代码如下:

public class CustomerCreatedEventHandler : IEventHandler<CustomerCreatedEvent>
{
private readonly IEventStore eventStore; public CustomerCreatedEventHandler(IEventStore eventStore)
{
this.eventStore = eventStore;
} public bool CanHandle(IEvent @event)
=> @event.GetType().Equals(typeof(CustomerCreatedEvent)); public async Task<bool> HandleAsync(CustomerCreatedEvent @event, CancellationToken cancellationToken = default)
{
await this.eventStore.SaveEventAsync(@event);
return true;
} public Task<bool> HandleAsync(IEvent @event, CancellationToken cancellationToken = default)
=> CanHandle(@event) ? HandleAsync((CustomerCreatedEvent)@event, cancellationToken) : Task.FromResult(false);
}

OK,代码修改完毕,测试一下。

ASP.NET Core Web API下事件驱动型架构的实现(二):事件处理器中对象生命周期的管理

看看数据库中客户信息是否已经创建:

ASP.NET Core Web API下事件驱动型架构的实现(二):事件处理器中对象生命周期的管理

看看数据库中事件数据是否已经保存成功:

ASP.NET Core Web API下事件驱动型架构的实现(二):事件处理器中对象生命周期的管理

OK,数据全部保存成功。

然而,事情真的就这么简单么?No。在追踪了IEventStore实例(也就是DapperEventStore)的生命周期后,你会发现,问题没有想象的那么简单。

追踪对象的生命周期

在使用services.AddTransient/AddScoped/AddSingleton/AddScoped这些方法对服务进行注册时,使用不同的方法也就意味着选择了不同的对象生命周期。在此我们也不再深入讨论每种方法之间的差异,微软官方有详细的文档和demo(抱歉我没有贴出中文链接,因为机器翻译的缘故,实在有点不堪入目),如果对ASP.NET Core的IoC容器不熟悉的话,建议先了解一下官网文章的内容。在上面我稍微提了一下,我们是用AddTransient方法来注册DapperEventStore的,因为我们希望在每次使用IEventStore的时候,都会有一个新的DapperEventStore被创建。现在,让我们来验证一下,看情况是否果真如此。

日志的使用

追踪程序执行的最有效的方式就是使用日志。在我们的场景中,使用基于文件的日志会更合适,因为这样我们可以更清楚地看到程序的执行过程以及对象的变化过程。同样,我不打算详细介绍如何在ASP.NET Core Web API中使用日志,微软官网同样有着非常详尽的文档来介绍这些内容。在这里,我简要地将相关代码列出来,以介绍如何启用基于文件的日志系统。

首先,在Web API服务的项目上,添加对Serilog.Extensions.Logging.File的nuget包,使用它能够非常方便地启用基于文件的日志。然后,打开Program.cs文件,添加ConfigureLogging的调用:

public static IWebHost BuildWebHost(string[] args) =>
WebHost.CreateDefaultBuilder(args)
.ConfigureLogging((context, lb) =>
{
lb.AddFile(LogFileName);
})
.UseStartup<Startup>()
.Build();

此处LogFileName为本地文件系统中的日志文件文件名,为了避免权限问题,我将日志写入C:\Users\<user>\appdata\local目录下,因为我的Web API进程是由当前登录用户启动的,所以写在这个目录下不会有权限问题。如果今后我们把Web API host在IIS中,那么启动IIS服务的用户需要对日志所在的目录具有写入的权限,日志文件才能被正确写入,这一点是需要注意的。

好了,现在可以使用日志了,先试试看。在Startup类的构造函数中,加入ILoggerFactory参数,并在构造函数执行时获取ILogger实例,然后在ConfigureServices调用中输出一些内容:

public class Startup
{
private readonly ILogger logger; public Startup(IConfiguration configuration, ILoggerFactory loggerFactory)
{
Configuration = configuration;
this.logger = loggerFactory.CreateLogger<Startup>();
} public IConfiguration Configuration { get; } public void ConfigureServices(IServiceCollection services)
{
this.logger.LogInformation("正在对服务进行配置..."); services.AddMvc(); services.AddTransient<IEventHandler, CustomerCreatedEventHandler>();
services.AddTransient<IEventStore>(serviceProvider =>
new DapperEventStore(Configuration["mssql:connectionString"]));
services.AddSingleton<IEventBus, PassThroughEventBus>();
this.logger.LogInformation("服务配置完成,已注册到IoC容器!");
} // 其它方法暂时省略
}

现在重新启动服务,然后查看日志文件,发现日志可以被正确输出:

ASP.NET Core Web API下事件驱动型架构的实现(二):事件处理器中对象生命周期的管理

接下来,使用类似的方式,向PassThroughEventBus的构造函数和Dispose方法中加入一些日志输出,在CustomersController的Create方法中、CustomerCreatedEventHandler的构造函数和HandleAsync方法中、DapperEventStore的构造函数和Dispose方法中也加入一些日志输出,以便能够观察当新的客户信息被创建时,Web API的执行过程。限于文章篇幅,就不在此一一贴出各方法中加入日志输出的代码了,大家可以根据本文最后所提供的源代码链接来获取源代码。简单地举个例子吧,比如对于DapperEventStore,我们通过构造函数注入ILogger的实例:

public class DapperEventStore : IEventStore
{
private readonly string connectionString;
private readonly ILogger logger; public DapperEventStore(string connectionString,
ILogger<DapperEventStore> logger)
{
this.connectionString = connectionString;
this.logger = logger;
logger.LogInformation($"DapperEventStore构造函数调用完成。Hash Code:{this.GetHashCode()}.");
}
// 其它函数省略
}

这样一来,在DapperEventStore的其它方法中,就可以通过logger来输出日志了。

发现问题

同样,再次运行Web API,并通过Powershell发起一次创建客户信息的请求,然后打开日志文件,整个程序的执行过程基本上就一目了然了:

ASP.NET Core Web API下事件驱动型架构的实现(二):事件处理器中对象生命周期的管理

从上面的日志内容可以得知,当应用程序正常退出时,由IoC容器托管的PassThroughEventBus和DapperEventStore都能够被正常Dispose,目前看来没什么问题,因为资源可以正常释放。现在让我们重新启动Web API,连续发送两次创建客户信息的请求,再次查看日志,我们得到了下面的内容:

ASP.NET Core Web API下事件驱动型架构的实现(二):事件处理器中对象生命周期的管理

从上面的日志内容可以看到,在Web API的整个运行期间,CustomerCreatedEventHandler仅被构造了一次,而且在每次处理CustomerCreatedEvent事件的时候,都是使用同一个DapperEventStore实例来保存事件数据。也就是说,CustomerCreatedEventHandler和DapperEventStore在整个Web API服务的生命周期中,有且仅有一个实例,它们是Singleton的!然而,在进行系统架构的时候,我们应该尽量保证较短的对象生命周期,以免因为状态的不一致性导致不可回滚的错误出现,这也是架构设计中的一种最佳实践。虽然目前我们的DapperEventStore在程序正常退出的时候能够被Dispose掉,但如果DapperEventStore使用了非托管资源,并且非托管资源并没有很好地管理自己的内存呢?久而久之,DapperEventStore就产生了内存泄漏点,慢慢地,Web API就会出现内存泄漏,系统资源将被耗尽。假如Web API被部署在云中,应用程序监控装置(比如AWS的Cloud Watch)就会持续报警,并强制服务断线,整个系统的可用性就无法得到保障。所以,我们更期望DapperEventStore能够正确地实现C#的Dispose模式,在Dispose方法中合理地释放资源,并且仅在需要使用DapperEventStore时候才去构建它,用完就及时Dispose,以保证资源的合理使用。这也就是为什么我们使用services.AddTransient方法来注册CustomerCreatedEventHandler以及DapperEventStore的原因。

然而,事实却并非如此。究其原因,就是因为PassThroughEventBus是单例实例,它的生命周期是整个Web API服务。而在PassThroughEventBus的构造函数中,CustomerCreatedEventHandler被作为参数传入,于是,PassThroughEventBus产生了对CustomerCreatedEventHandler的依赖,而连带地也产生了对DapperEventStore的依赖。换句话说,在整个应用程序运行的过程中,IoC框架完全没有理由再去创建新的CustomerCreatedEventHandler以及DapperEventStore的实例,因为事件处理器作为强引用被注册到PassThroughEventBus中,而PassThroughEventBus至始至终没有变过!

Note:为什么PassThroughEventBus可以作为单例注册到IoC容器中?因为它提供了无状态的全局性的基础结构层服务:事件总线。在PassThroughEventBus的实现中,这种全局性体现得不明显,我们当然可以每一次HTTP请求都创建一个新的PassThroughEventBus来转发事件消息并作处理。然而,在今后我们要实现的基于RabbitMQ的事件总线中,如果我们还是每次HTTP请求都创建一个新的消息队列,不仅性能得不到保证,而且消息并不能路由到新创建的channel上。注意:我们将其注册成单例,一个很重要的依据是由于它是无状态的,但即使如此,我们也要注意在应用程序退出的时候,合理Dispose掉它所占用的资源。当然,在这里,ASP.NET Core的IoC机制会帮我们解决这个问题(因为我注册了PassThroughEventBus,但我没有显式调用Dispose方法,我仍然能从日志中看到“PassThroughEventBus已经被Dispose”的字样),然而有些情况下,ASP.NET Core不会帮我们做这些,就需要我们自己手工完成。

OMG!由于构造函数注入,使得对象之间产生了依赖关系,从而影响到了它们的生命周期,这可怎么办?既然问题是由依赖引起的,那么就需要想办法解耦。

解耦!解决事件处理器对象生命周期问题

经过分析,我们需要解除PassThroughEventBus对各种EventHandler的直接依赖。因为PassThroughEventBus是单例的,那么由它引用的所有组件也只可能具有相同的生命周期。然而,这样的解耦又该如何做呢?将EventHandler封装到另一个类中?结果还是一样,PassThroughEventBus总会通过某种对象关系,来间接引用到EventHandler上,造成EventHandler全局唯一。

或许,应该要有另一套生命周期管理体系来管理EventHandler的生命周期,使得每当PassThroughEventBus需要使用EventHandler对所订阅的事件进行处理的时候,都会通过这套体系来请求新的EventHandler实例,这样一来,PassThroughEventBus也就不再依赖于某个特定的实例了,而仅仅是引用了各种EventHandler在新的生命周期管理体系中的注册信息。每当需要的时候,PassThroughEventBus都会将事件处理器的注册信息传给新的管理体系,然后由这套新的体系来维护事件处理器的生命周期。

通过阅读微软官方的eShopOnContainers案例代码后,证实了这一想法。在案例中,有如下代码:

// namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
private async Task ProcessEvent(string eventName, string message)
{
if (_subsManager.HasSubscriptionsForEvent(eventName))
{
using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME))
{
var subscriptions = _subsManager.GetHandlersForEvent(eventName);
foreach (var subscription in subscriptions)
{
if (subscription.IsDynamic)
{
var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler;
dynamic eventData = JObject.Parse(message);
await handler.Handle(eventData);
}
else
{
var eventType = _subsManager.GetEventTypeByName(eventName);
var integrationEvent = JsonConvert.DeserializeObject(message, eventType);
var handler = scope.ResolveOptional(subscription.HandlerType);
var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);
await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent });
}
}
}
}
}

可以看到,高亮的这一行,通过Autofac创建了一个新的LifetimeScope,在这个Scope中,通过eventName来获得一个subscription对象(也就是EventHandler的注册信息),进而通过scope的ResolveOptional调用来获得新的EventHandler实例。基本过程就是这样,目前也不需要纠结IDynamicIntegrationEventHandler是干什么用的,也不需要纠结为什么要使用dynamic来保存事件数据。重点是,autofac的BeginLifetimeScope方法调用创建了一个新的IoC Scope,在这个Scope中解析(resolve)了新的EventHandler实例。在eShopOnContainer案例中,EventBusRabbitMQ的设计是特定的,必须依赖于Autofac作为依赖注入框架。或许这部分设计可以进一步改善,使得EventBusRabbitMQ不会强依赖于Autofac。

接下来,我们会引入一个新的概念:事件处理器执行上下文,使用类似的方式来解决对象生命周期问题。

事件处理器执行上下文

事件处理器执行上下文(Event Handler Execution Context, EHEC)为事件处理器提供了一个完整的生命周期管理机制,在这套机制中,事件处理器及其引用的对象资源可以被正常创建和正常销毁。现在让我们一起看看,如何在EdaSample的案例代码中使用事件处理器执行上下文。

事件处理器执行上下文的接口定义如下,当然,这部分接口是放在EdaSample.Common.Events目录下,作为消息系统的框架代码提供给调用方:

public interface IEventHandlerExecutionContext
{
void RegisterHandler<TEvent, THandler>()
where TEvent : IEvent
where THandler : IEventHandler<TEvent>; void RegisterHandler(Type eventType, Type handlerType); bool HandlerRegistered<TEvent, THandler>()
where TEvent : IEvent
where THandler : IEventHandler<TEvent>; bool HandlerRegistered(Type eventType, Type handlerType); Task HandleEventAsync(IEvent @event, CancellationToken cancellationToken = default);
}

这个接口主要包含三种方法:注册事件处理器、判断事件处理器是否已经注册,以及对接收到的事件消息进行处理。整个结构还是非常清晰简单的。现在需要实现这个接口。根据上面的分析,这个接口的实现是需要依赖于IoC容器的,目前简单起见,我们仅使用微软ASP.NET Core标准的Dependency Injection框架来实现,当然,也可以使用Autofac,取决于你怎样去实现上面这个接口。需要注意的是,由于该接口的实现是需要依赖于第三方组件的(在这里是微软的Dependency Injection框架),因此,最佳做法是新建一个类库,并引用EdaSample.Common程序集,并在这个新的类库中,依赖Dependency Injection框架来实现这个接口。

以下是基于Microsoft.Extensions.DependencyInjection框架来实现的事件处理器执行上下文完整代码,这里有个兼容性问题,就是构造函数的第二个参数:serviceProviderFactory。在Microsoft.Extensions.DependencyInjection框架2.0版本之前,IServiceCollection.BuildServiceProvider方法的返回类型是IServiceProvider,但从2.0开始,它的返回类型已经从IServiceProvider接口,变成了ServiceProvider类。这里引出了框架设计的另一个原则,就是依赖较低版本的.NET Core,以便获得更好的兼容性。如果我们的EdaSample是使用.NET Core 1.1开发的,那么当下面这个类被直接用在ASP.NET Core 2.0的项目中时,如果不通过构造函数参数传入ServiceProvider创建委托,而是直接在代码中使用registry.BuildServiceProvider调用,就会出现异常。

public class EventHandlerExecutionContext : IEventHandlerExecutionContext
{
private readonly IServiceCollection registry;
private readonly Func<IServiceCollection, IServiceProvider> serviceProviderFactory;
private readonly ConcurrentDictionary<Type, List<Type>> registrations = new ConcurrentDictionary<Type, List<Type>>(); public EventHandlerExecutionContext(IServiceCollection registry,
Func<IServiceCollection, IServiceProvider> serviceProviderFactory = null)
{
this.registry = registry;
this.serviceProviderFactory = serviceProviderFactory ?? (sc => registry.BuildServiceProvider());
} public async Task HandleEventAsync(IEvent @event, CancellationToken cancellationToken = default(CancellationToken))
{
var eventType = @event.GetType();
if (this.registrations.TryGetValue(eventType, out List<Type> handlerTypes) &&
handlerTypes?.Count > 0)
{
var serviceProvider = this.serviceProviderFactory(this.registry);
using (var childScope = serviceProvider.CreateScope())
{
foreach(var handlerType in handlerTypes)
{
var handler = (IEventHandler)childScope.ServiceProvider.GetService(handlerType);
if (handler.CanHandle(@event))
{
await handler.HandleAsync(@event, cancellationToken);
}
}
}
}
} public bool HandlerRegistered<TEvent, THandler>()
where TEvent : IEvent
where THandler : IEventHandler<TEvent>
=> this.HandlerRegistered(typeof(TEvent), typeof(THandler)); public bool HandlerRegistered(Type eventType, Type handlerType)
{
if (this.registrations.TryGetValue(eventType, out List<Type> handlerTypeList))
{
return handlerTypeList != null && handlerTypeList.Contains(handlerType);
} return false;
} public void RegisterHandler<TEvent, THandler>()
where TEvent : IEvent
where THandler : IEventHandler<TEvent>
=> this.RegisterHandler(typeof(TEvent), typeof(THandler)); public void RegisterHandler(Type eventType, Type handlerType)
{
Utils.ConcurrentDictionarySafeRegister(eventType, handlerType, this.registrations);
this.registry.AddTransient(handlerType);
}
}

好了,事件处理器执行上下文就定义好了,接下来就是在我们的ASP.NET Core Web API中使用。为了使用IEventHandlerExecutionContext,我们需要修改事件订阅器的接口定义,并相应地修改PassThroughEventBus以及Startup.cs。代码如下:

// IEventSubscriber
public interface IEventSubscriber : IDisposable
{
void Subscribe<TEvent, TEventHandler>()
where TEvent : IEvent
where TEventHandler : IEventHandler<TEvent>;
} // PassThroughEventBus
public sealed class PassThroughEventBus : IEventBus
{
private readonly EventQueue eventQueue = new EventQueue();
private readonly ILogger logger;
private readonly IEventHandlerExecutionContext context; public PassThroughEventBus(IEventHandlerExecutionContext context,
ILogger<PassThroughEventBus> logger)
{
this.context = context;
this.logger = logger;
logger.LogInformation($"PassThroughEventBus构造函数调用完成。Hash Code:{this.GetHashCode()}."); eventQueue.EventPushed += EventQueue_EventPushed;
} private async void EventQueue_EventPushed(object sender, EventProcessedEventArgs e)
=> await this.context.HandleEventAsync(e.Event); public Task PublishAsync<TEvent>(TEvent @event, CancellationToken cancellationToken = default)
where TEvent : IEvent
=> Task.Factory.StartNew(() => eventQueue.Push(@event)); public void Subscribe<TEvent, TEventHandler>()
where TEvent : IEvent
where TEventHandler : IEventHandler<TEvent>
{
if (!this.context.HandlerRegistered<TEvent, TEventHandler>())
{
this.context.RegisterHandler<TEvent, TEventHandler>();
}
} #region IDisposable Support
private bool disposedValue = false; // To detect redundant calls
void Dispose(bool disposing)
{
if (!disposedValue)
{
if (disposing)
{
this.eventQueue.EventPushed -= EventQueue_EventPushed;
logger.LogInformation($"PassThroughEventBus已经被Dispose。Hash Code:{this.GetHashCode()}.");
} disposedValue = true;
}
}
public void Dispose() => Dispose(true); #endregion
} // Startup.cs
public void ConfigureServices(IServiceCollection services)
{
this.logger.LogInformation("正在对服务进行配置..."); services.AddMvc(); services.AddTransient<IEventStore>(serviceProvider =>
new DapperEventStore(Configuration["mssql:connectionString"],
serviceProvider.GetRequiredService<ILogger<DapperEventStore>>())); var eventHandlerExecutionContext = new EventHandlerExecutionContext(services,
sc => sc.BuildServiceProvider());
services.AddSingleton<IEventHandlerExecutionContext>(eventHandlerExecutionContext);
services.AddSingleton<IEventBus, PassThroughEventBus>(); this.logger.LogInformation("服务配置完成,已注册到IoC容器!");
} // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
public void Configure(IApplicationBuilder app, IHostingEnvironment env)
{
var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>();
eventBus.Subscribe<CustomerCreatedEvent, CustomerCreatedEventHandler>(); if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
} app.UseMvc();
}

代码修改完成后,再次执行Web API,并发送两次(或多次)创建客户的请求,然后查看日志,我们发现,每次请求都会使用新的事件处理器去处理接收到的消息,在保存消息数据时,会使用新的DapperEventStore来保存数据,而保存完成后,会及时将DapperEventStore dispose掉:

ASP.NET Core Web API下事件驱动型架构的实现(二):事件处理器中对象生命周期的管理

小结

本文篇幅比较长,或许你没有太多耐心将文章读完。但我尽量将问题分析清楚,希望提供给读者的内容是详细的、有理有据的。文章中黑体部分是在设计过程中的一些思考和需要注意的地方,希望能够给读者在工作和学习之中带来启发和收获。总而言之,对象生命周期的管理,在服务端应用程序中是非常重要的,需要引起足够的重视。在下文中,我们打算逐步摆脱PassThroughEventBus,基于RabbitMQ来实现消息总线的基础结构。

源代码的使用

本系列文章的源代码在https://github.com/daxnet/edasample这个Github Repo里,通过不同的release tag来区分针对不同章节的源代码。本文的源代码请参考chapter_2这个tag,如下:

ASP.NET Core Web API下事件驱动型架构的实现(二):事件处理器中对象生命周期的管理