CQRS之旅——旅程7(增加弹性和优化性能)

时间:2024-02-01 22:41:58

旅程7:增加弹性和优化性能

到达旅程的终点:最后的任务。
“你不能飞的像一只长着鹪鹩翅膀的老鹰那样。”亨利·哈德逊

我们旅程的最后阶段的三个主要目标是使系统对故障更具弹性,提高UI的响应能力,并确保我们的设计是可伸缩的。加强系统的工作主要集中在订单和注册限界上下文中的RegistrationProcessManager类。性能改进工作的重点是当订单创建时UI与领域域模型的交互方式。

本章的工作术语定义:

本章使用了一些术语,我们将在下面进行描述。有关更多细节和可能的替代定义,请参阅参考指南中的“深入CQRS和ES”。

  • 命令(Command):命令是要求系统执行更改系统状态的操作。命令是必须服从(执行)的一种指令,例如:MakeSeatReservation。在这个限界上下文中,命令要么来自用户发起请求时的UI,要么来自流程管理器(当流程管理器指示聚合执行某个操作时)。命令由单个接收方只处理一次,命令要么通过命令总线(command bus)传递给接收方,要么直接在进程中传递。如果是通过总线传递的,则该命令是异步发送的,在进程中传递,命令将同步发送。

  • 事件(Event):一个事件,比如OrderConfirmed,描述了系统中发生的一些事情,通常是一个命令的结果。领域模型中的聚合引发事件。事件也可以来自其他限界上下文。多个订阅者可以处理特定的事件。聚合将事件发布到事件总线。处理程序在事件总线上注册特定类型的事件,然后将事件传递给订阅服务器。在订单和注册限界上下文中,订阅者是流程管理器和读取模型生成器。

  • 快照(Snapshots):快照是一种可以应用于事件源的优化。在重新还原(rehydrated)聚合时,不需要重播与聚合相关的所有持久化事件,而是加载聚合状态的最新副本,然后只重播保存快照后持久的事件。通过这种方式,可以减少必须从事件存储里加载的数据量。

  • 幂等性(Idempotency):幂等性是一个操作的特性,这意味着该操作可以多次应用而不改变结果。例如,“将x的值设置为10”的操作是幂等的,而“将x的值加1”的操作不是幂等的。在消息传递环境中,如果消息可以多次传递而不改变结果,则消息是幂等的:这可能是因为消息本身的性质,也可能是因为系统处理消息的方式。

  • 最终一致性(Eventual consistency):最终一致性是一个一致性模型,它不能保证立即访问更新的值。对数据对象进行更新后,存储系统不保证对该对象的后续访问将返回更新后的值。然而,存储系统确实保证,如果在足够长的时间内没有对对象进行新的更新,那么最终所有访问都可以返回最后更新的值。

架构

该应用程序旨在部署到Microsoft Azure。在旅程的这个阶段,应用程序由两个角色组成,一个包含ASP.Net MVC Web应用程序的web角色和一个包含消息处理程序和领域对象的工作角色。应用程序在写端和读端都使用Azure SQL DataBase实例进行数据存储。在某些地方,应用程序还在写端使用Azure table,在读端使用Azure blob来存储数据。应用程序使用Azure服务总线来提供其消息传递基础设施。下图展示了这个高级体系结构。

在研究和测试解决方案时,可以在本地运行它,可以使用Azure compute emulator,也可以直接运行MVC web应用程序,并运行承载消息处理程序和领域域对象的控制台应用程序。在本地运行应用程序时,可以使用本地SQL Server Express数据库,并使用一个在SQL Server Express数据库实现的简单的消息传递基础设施。

有关运行应用程序的选项的更多信息,请参见附录1“发布说明”。

增加弹性

在旅程的这个阶段,团队研究了加强RegistrationProcessManager类的方法。该类负责管理订单和注册上下文中的聚合之间的交互,并确保它们彼此一致。如果要将限界上下文作为一个整体来维护其一致状态,那么流程管理器必须能够适应各种各样的故障条件。

通常,流程管理器接收传入的事件,然后在限界上下文内部基于流程管理器的状态发出一个或多个命令到聚合。当流程管理器发出命令时,它通常会更改自己的状态。

订单和注册限界上下文包含RegistrationProcessManager类。此流程管理器在此限界上下文中和支付限界上下文中负责通过路由事件和命令协调聚合的活动。因此,流程管理器负责确保这些限界上下文中的聚合正确地彼此同步。

Gary(CQRS专家)发言:

一个聚合决定了写模型中的一致性边界,这个边界和系统持久存储数据的一致性相关。流程管理器管理不同聚合(可能在不同的限界上下文中)之间的关系,并确保聚合最终彼此一致。

注册过程的失败可能对系统产生不利后果:聚合可能彼此不同步,这可能导致系统中出现不可预测的行为,或者一些进程可能最终成为僵尸进程,继续运行并使用资源,但永远不会完成。团队确定了以下与RegistrationProcessManager流程管理器相关的特定故障场景。流程管理器也许会:

  • 崩溃或者无法在发送任何命令之前和接收事件之后持久化其状态。这样消息处理器可能无法将事件标记为完成,因此在超时之后,将事件放回Topic订阅并重新处理。
  • 在发送任何命令之前和持久化其状态之后崩溃。这将使系统处于不一致的状态,因为流程管理器保存了其新的状态,但没有发送预期的命令。原始事件被放回Topic订阅并重新处理。
  • 未能标记事件已被处理。流程管理器将第二次处理该事件,因此在超时之后,系统将把该事件重新放到服务总线Topic订阅中。
  • 在等待它所期望的特定事件时超时。流程管理器无法继续处理并达到预期的最终状态。
  • 接收到一个流程管理器处于特定状态时不期望接收的事件。这可能表明在其他地方存在问题,这意味着流程管理器继续工作是不安全的。

这些设想可归纳为两个具体的问题:

  • RegistrationProcessManager成功地处理了一个事件,但是没有将其标记为完成。在事件自动返回到Azure服务总线Topic订阅之后,RegistrationProcessManager将再次处理该事件。
  • RegistrationProcessManager成功地处理一个事件,并将其标记为完成,但随后未能发送命令。

使系统能弹性的重新处理事件

如果流程管理器本身的行为是幂等的,那么如果它第二次接收并处理一个事件,则不会导致系统中的不一致。使流程管理器的行为具有幂等性,可以避免前三种故障条件中固有的问题。崩溃之后,您可以简单地重新启动流程管理器,并第二次重新处理传入的事件。

您可以让流程管理器发送的所有命令都是幂等的,来替代让流程管理器幂等。重新启动流程管理器可能会导致第二次发送命令,但如果这些命令是幂等的,则不会对流程或系统产生不利影响。要使此方法生效,您仍然需要修改流程管理器,以确保它至少发送一次所有命令。如果命令是幂等的,那么多次发送它们并不重要,但是如果根本不发送就很重要。

在V1版本中,大多数消息处理要么已经是幂等的,要么系统检测到重复的消息并将它们发送到dead-letter队列。例外情况是OrderPlaced事件和SeatsReserved事件,因此团队修改了系统V3版本处理这两个事件的方式,以解决这个问题。

确保始终发送命令

需要事务行为来确保当RegistrationProcessManager类保存其状态时,系统始终会发送命令。这要求团队实现一个伪事务,因为将Azure服务总线和SQL数据库表一起放到分布式事务中既不可取也不可行。

团队为V3版本所采用的解决方案是确保系统持久保存RegistrationProcessManager生成的所有命令,同时持久保存RegistrationProcessManager实例的状态。然后,系统尝试发送命令,并在成功发送之后将它们从存储中删除。每当从存储中加载RegistrationProcessManager实例时,系统还检查未发送的消息。

性能优化

在这个阶段,我们使用Visual Studio运行性能和压力测试,以分析响应时间并确定瓶颈。团队使用Visual Studio Load Test来模拟访问应用程序的不同用户数量,并在代码中添加了额外的跟踪,以记录时间信息,以便进行详细分析。团队在Azure中创建了性能测试环境,在Azure VM角色实例中运行测试控制器和测试代理。这使我们能够通过使用测试代理模拟不同数量的虚拟用户来测试Contoso会议管理系统在不同负载下的执行情况。

作为这项工作的结果,团队对系统进行了许多更改,以优化其性能。

Gary(CQRS专家)发言:

尽管在旅程中,团队在项目结束时进行了性能测试和优化工作,但通常在你想做的时候就做这个工作是有意义的,这可以解决可伸缩性问题并尽快加固代码。如果您正在构建自己的基础设施,并且需要能够处理高吞吐量,则尤其如此。

Markus(软件开发人员)发言:

因为实现CQRS模式会导致对组成系统的许多不同部分的职责进行非常清晰的分离,所以我们发现添加优化和加强相对容易,因为许多必要的更改在系统中都非常容易定位。

优化前的UI流程

当注册者创建一个订单时,她将访问UI中的以下页面序列。

  1. 注册页面。该页面根据最终一致的读模型显示会议的门票类型和当前可用的座位数量。注册者选择她想购买的每种座位类型的数量。
  2. 付款的页面。此页面显示订单摘要,其中包括一个总价和一个倒计时计时器,它告诉注册者座位将保留多久。注册者输入她的详细信息和首选的付款方式。
  3. 支付页面。这里模拟了一个第三方支付处理器。
  4. 注册成功页面。这将显示支付是否成功。它向注册者显示一个订单定位器代码,并链接到另一个页面,该页面允许注册者为参会者分配座位。

有关UI中的屏幕和流程的更多信息,请参阅第5章“准备发布V1版本”中的“基于任务的UI”一节。

在V2版本中,系统必须在注册页面付款页面之间处理以下命令和事件:

  • RegisterToConference
  • OrderPlaced
  • MakeSeatReservation
  • SeatsReserved
  • MarkSeatsAsReserved
  • OrderReservationCompleted
  • OrderTotalsCalculated

此外,MVC控制器在发送初始RegisterToConference命令之前通过查询读模型来填充订单,从而验证是否有足够的座位可用。

当团队使用Visual Studio Load Test和不同的用户负载模式来对应用程序做负载测试时,我们注意到高负载常常发生在UI等待领域完成其处理时和读模型接收写模型数据时。这样无法显示下一个页面。特别是,随着V2版本部署到中型的web和工作角色实例后,我们发现:

  • 对于每秒少于5个订单的恒定负载模式,所有订单都在5秒的窗口内处理。
  • 对于每秒8到10个订单之间的恒定负载模式,许多订单不能在5秒的窗口内处理。
  • 对于每秒8到10个订单之间的恒定负载模式,角色实例使用得不够理想(例如CPU使用率很低)。

备注:从UI在服务总线上发送初始命令到读模型中出现定价订单,从而使UI能够向用户显示下一个屏幕。5秒是我们希望看到的最大等待时间。

为了解决这个问题,团队确定了两个优化目标:UI和领域之间的交互,以及基础设施。我们决定首先处理UI和领域之间的交互。当这不能充分提高性能时,我们还进行了基础设施优化。

优化UI

团队与领域专家讨论了在UI向领域发送RegisterToConference命令之前,是否总是需要验证座位可用性。

Gary(CQRS专家)发言:

这个场景说明了与最终一致性相关的一些实际问题。读端(在本例中是定价订单视图模型)最终与写端保持一致。通常,当您实现CQRS模式时,您应该能够接受最终的一致性,而不需要在UI中等待更改传播到读取端。然而,在这种情况下,UI必须等待写模型传播到与特定顺序相关的读端信息。这可能表明原系统这一部分的分析和设计存在问题。

领域专家明确表示,系统应该在接受付款之前确认座位是否可用。Contoso不希望出售座位之后向注册人解释,这些座位是不可用的。因此,该团队寻找了简化流程的方法,直到注册者看到付款屏幕为止。

Beth(业务经理)发言:

这种谨慎的策略并不适用于所有情况。在某些情况下,即使不能立即完成订单,企业也可能宁愿接受这笔钱。企业可能知道库存很快就会补充,或者客户很乐意等待。在我们的场景中,尽管Contoso可以在没有票的情况下将钱退还给注册者,注册者也许仍然会购买机票,因为他以为系统已经确认过,这笔钱是没法退还的。所以这很明显是一个业务和领域专家要做的决策。

团队确定了对UI流的以下两个优化。

UI优化1

大多数情况下,会议有足够的座位,注册者不必相互争夺来预订座位。随着大会的门票接近售罄,只有很短的一段时间内,报名者才会争夺最后几个座位。

如果会议有足够的可用座位,那么注册者到达付款界面却发现系统无法预订座位的风险就很小。在这种情况下,V2版本里,在到达付款页面之前执行的一些处理可以在付款页面上当用户输入信息的时候异步发生,这样就减少了注册者在看到付款页面前经历延迟的机会。

Jana(软件架构师)发言:

从本质上讲,我们所依赖的事实是,预订会成功,所以避免了耗时的检查。但我们仍然要在注册人付款之前执行检查,以确保座位是可用的。

但是,如果控制器在发送RegisterToConference命令之前就检查并发现没有足够的座位来完成订单,则可以重新显示注册屏幕,使注册者能够根据当前可用性更新其订单。

Jana(软件架构师)发言:

对这一战略的一个可能改进是,在发送RegisterToConference命令之前,看看是否可能有足够的座位可用。这可以减少注册者在最后几个座位售罄时调整订单的次数。然而,这种场景发生的频率很低,可能不值得实现。

UI优化2

在V2版本中,MVC控制器不显示付款页面,直到领域发布OrderTotalsCalculated事件,并且系统更新了price-order视图模型。此事件是控制器显示屏幕之前发生的最后一个事件。

如果系统更早地计算总数并更新价格订单视图模型,控制器就可以更早地显示付款页面。团队确定,订单聚合可以在订单下单时计算总数,而不是在预订完成时计算总数。这将使UI流比V2版本更快的走到付款页面。

优化基础设施

“每天都有一些新的事实浮现,一些新的障碍那些威胁着我们的最严重的障碍。我想这就是为什么这款游戏如此值得一玩的原因。” 罗伯特·弗尔肯·斯科特

团队在旅程的这个阶段添加的第二组优化与系统的基础设施相关。这些更改同时处理了系统的性能和可伸缩性。下面的部分描述了我们在这里所做的最重要的更改。

异步发送和接收命令和事件

作为优化过程的一部分,团队更新了系统,以确保在服务总线上发送的所有消息都是异步发送的。这种优化旨在提高应用程序的总体响应能力,并提高消息的吞吐量。作为此更改的一部分,团队还使用了Transient Fault Handling Application Block来处理使用服务总线时遇到的任何瞬时错误。

Markus(软件开发人员)发言:

这种优化导致了对基础设施代码的重大更改。将异步调用与Transient Fault Handling Application Block相结合是复杂的,我们将受益于c# 4.5中的一些新的简化语法!

Jana(软件架构师)发言:

有关在使用Azure服务总线时帮助优化性能的其他经过验证的实践,请参阅本指南:Best Practices for Performance Improvements Using Service Bus Brokered Messaging

优化命令处理

V2版本对命令和事件使用相同的消息传递基础设施——Azure服务总线。团队评估了Contoso会议管理系统是否需要使用相同的基础设施发送所有命令消息。

在决定是否继续使用Azure服务总线传输所有命令消息时,我们考虑了许多因素。

  • 哪些命令(如果有的话)可以在进程中处理?
  • 如果处理一些进程中的命令,系统的弹性会降低吗?
  • 如果在进程中处理一些命令,会有显著的性能提升吗?

我们确定了一组命令,系统可以从会议web应用程序在进程中同步地发送这些命令。为了实现这种优化,我们必须向会议web应用程序添加一些基础设施元素(事件存储库、事件总线和事件发布者)。以前,这些基础设施元素只在系统的工作角色中。

异步命令是不存在的,它实际上是另一个事件。如果我必须接受一个你发给我的消息并且如果我不同意必须发出一个事件。那这就不是你要我做什么,而是你告诉我什么已经做完了。乍一看,这似乎只有一点点不同,但它有很多含义。

Why do lots of developers use one-way command messaging (async handling) when it's not needed? Greg Young - DDD/CQRS Group

对事件源使用快照

性能测试还发现了使用可用座位(SeatsAvailability)聚合的瓶颈,我们使用快照的形式解决了这个瓶颈。

Jana(软件架构师)发言:

一旦团队确定了这个瓶颈,就很容易实现和测试这个解决方案。我们在实现CQRS模式时所遵循的方法的优点之一是:我们可以在系统中进行小的局部更改。更新不需要我们去跨系统的多个部分进行复杂的更改。

当系统从事件存储中重新还原(rehydrates)聚合实例时,它必须加载并重播与该聚合实例关联的所有事件。这里可能的优化是存储聚合状态在最近某个时间点的滚动快照,以便系统只需要加载快照和后续事件,从而减少必须重新加载和重播的事件数量。在Contoso会议管理系统中,随着时间的推移,唯一可能会累积大量事件的聚合是可用座位(SeatsAvailability)聚合。我们决定使用Memento模式作为快照解决方案的基础,以便与可用座位(SeatsAvailability)聚合一起使用。我们实现的解决方案使用一个memento来捕获座位可用性聚合的状态,然后在缓存中保存一个memento的副本。然后,系统尝试处理缓存的数据,而不是总是从事件存储中重新加载聚合。

Gary(CQRS专家)发言:

通常,在事件源上下文中,快照是持久化的,而不是我们在项目中实现的临时本地缓存。

并行发布事件

就提高系统中事件消息的吞吐量而言,并行发布事件被证明是最重要的优化之一。为了得到最好的结果,团队进行了多次迭代:

  • 迭代1:这种方法使用并行。使用Parallel.ForEach方法和自定义分区(把消息分配到分区中),并设置并行度的上限。它还使用同步的Azure服务总线API调用来发布消息。
  • 迭代2:这种方法使用了一些异步API调用。它需要使用基于自定义信号量的节流来正确处理异步回调。
  • 迭代3:这种方法使用动态节流,它考虑到顺时故障,这些故障表明向特定Topic发送了太多的消息。这种方法使用异步的Azure服务总线API调用。

Jana(软件架构师)发言:

当系统从服务总线检索消息时,我们在SubscriptionReceiverSessionSubscriptionReceiver类中采用了相同的动态节流方法。

在订阅中过滤消息

另一个优化是向Azure服务总线Topic订阅添加过滤器,以避免读取那些稍后将被与订阅关联的处理程序忽略的消息。

Markus(软件开发人员)发言:

这里我们利用了Azure服务总线提供的特性。

为可用座位(SeatsAvailability)聚合创建专用接收器

这使可用座位(SeatsAvailability)聚合的接收者能够使用支持会话的订阅。这是为了确保每个聚合实例只有一个写入者,因为可用座位(SeatsAvailability)聚合是一个高争用的聚合。这阻止了我们在扩展时接收大量并发异常。

Jana(软件架构师)发言:

在其他地方,我们使用带有会话的订阅来保证事件的顺序。在本例中,我们使用会话是出于不同的原因——以确保每个聚合实例只有一个写入者。

缓存会议信息

这个优化缓存了会议web网站到处使用的几个读模型。它包含逻辑来决定如何基于特定会议的可用座位的数量来保持缓存中的数据:如果有很多空位,系统可以缓存数据很长一段时间,但是如果很少有空位就不缓存数据。

划分服务总线

团队还对服务总线进行了划分,以使应用程序更具可伸缩性,并避免在系统发送的消息量接近服务总线能够处理的最大吞吐量时进行节流。每个服务总线Topic可以由Azure中的不同节点处理,因此通过使用多个Topic,我们可以增加潜在的吞吐量。我们考虑了以下分区方案:

  • 为不同的消息类型使用不同的Topic。
  • 使用多个相似的Topic,并以循环方式监听和读取它们,以分散负载。

有关这些划分方案的详细讨论,请参阅Martin L. Abbott和Michael T. Fisher所写的《可伸缩性规则:Web站点伸缩的50个原则》(Addison-Wesley, 2011)中的第11章“异步通信和消息总线”。

我们决定为订单聚合和可用聚合发布的事件使用单独的Topic,因为这些聚合负责了通过服务总线流动的大多数事件。

Gary(CQRS专家)发言:

并不是所有的信息都具有相同的重要性。您还可以使用消息总线来处理单独的、按优先级排列的不同的消息类型,甚至可以考虑不为某些消息使用消息总线。

Jana(软件架构师)发言:

将服务总线与系统的任何其他关键组件一样对待。这意味着您应该确保您的服务总线可以伸缩。此外,请记住,并非所有数据对您的业务都具有相同的价值。仅仅因为您有一个服务总线,并不意味着所有东西都必须经过它。明智的做法是消除低价值、高成本的流量。

其他的一些优化

团队还执行了一些额外的优化,这些优化在下面的实现细节部分中列出。团队在这一阶段的主要目标是优化系统,以确保UI呈现对用户有足够好的响应。我们还可以执行其他优化,这将有助于进一步提高性能,并优化系统使用资源的方式。例如,团队考虑的进一步优化是扩展视图模型生成器,该生成器填充系统中的各种读取模型。每个承载视图模型生成器实例的web角色都必须通过创建对Azure服务总线主题的订阅来处理写端发布的事件。

提高性能的进一步更改

除了在提高应用程序性能的旅程的最后阶段所做的更改之外,团队还确定了一些其他更改,这些更改将导致进一步的改进。但是,这个旅程的可用时间有限,所以不可能在V3版本中进行这些更改。

  • 我们向应用程序的许多地方添加了异步行为,特别是在应用程序对Azure服务总线的调用中。然而,应用程序还有其他地方仍然执行阻塞调用。我们可以把那些同步调用改成异步:例如,当系统访问数据存储时。此外,我们将使用新的语言特性,如async和await。
  • 通过采用存储转发设计,可以批量处理消息,并减少往返于数据存储的次数。例如,利用Azure服务总线会话将使我们能够从服务总线接收一个会话,从数据存储区读取多个条目、处理多个消息、一次保存到数据存储区,然后完成所有消息。

Markus(软件开发人员)发言:

通过接受一个服务总线会话,只要您保持锁,就只有一个会话的写入者和监听者。这减少了乐观并发异常。这种设计特别适合可用座位聚合的读和写模型。对于具有非常小分区的订单聚合关联的读模型,您可以从服务总线获取多个小会话,并在每个会话上使用存储转发方法。尽管系统中的读和写模型都可以从这种方法中受益,但是在我们期望数据最终是一致的、而不是完全一致的读模型中实现起来更容易。

  • 该网站已经缓存了一些经常访问的读模型数据,但是我们可以将缓存的使用扩展到系统的其他区域。CQRS模式意味着我们可以将缓存视为最终一致的读模型的一部分,如果需要,还可以使用不同的缓存或根本不使用缓存来访问来自系统不同部分的读模型数据。
  • 我们可以改进可用座位(SeatsAvailability)聚合的缓存快照实现。本章稍后将详细描述当前实现,其目的是始终检查事件存储,以查找在系统创建最新缓存快照之后到达的事件。当我们接收到要处理的新命令时,如果我们可以检查是否仍然使用与系统创建最新缓存快照时相同的服务总线会话,那么我们就可以知道事件存储中是否还有其他事件。如果会话没有更改,那么我们就知道自己是惟一的写入者,因此没有必要检查事件存储。如果会话已经更改,那么其他人可能已经将与聚合相关的事件写入到存储中,我们需要进行检查。
  • 应用程序当前使用相同的优先级监听所有服务总线订阅上的所有消息。在实践中,有些信息比其他信息更重要。因此,当应用程序处于压力之下时,我们应该优先处理一些消息,以最小化对核心应用程序功能的影响。例如,我们可以识别某些愿意接受更多延迟的读模型。

    Poe(IT运维人员)发言:

    我们还可以在负载增加时使用自动缩放来扩展应用程序(例如使用Autoscaling Application Block),但是添加新实例需要时间。通过确定某些消息类型的优先级,我们可以在自动缩放添加资源的同时,继续在应用程序的关键领域提供性能。

  • 当前实现使用随机生成的Guid作为存储在SQL数据库实例中的所有实体的键。当系统处于高负载下时,如果使用顺序Guid,特别是与聚集索引相关的Guid,它的性能可能会更好。有关顺序Guid的讨论,请参见The Cost of GUIDs as Primary Keys
  • 作为系统优化的一部分,我们现在在进程中处理一些命令,而不是通过服务总线发送它们。我们可以将此扩展到其他命令,并可能扩展到流程管理器。
  • 在当前实现中,流程管理器处理传入消息,然后存储库尝试同步发送传出消息(如果服务总线由于节流行为引发任何异常,则使用 Transient Fault Handling Application Block重试发送命令)。我们可以替代使用一种类似于EventStoreBusPublisher类的机制以让流程管理器保存一个消息列表,这些消息必须在一个事务里连同它的状态一起发送,然后通知系统的另一部分,这个部分的职责是当有一些新消息准备好要发送的时候负责来发送消息。

    Markus(软件开发人员)发言:

    负责发送消息的系统部分可以异步发送消息。它还可以为发送消息实现动态节流,并动态控制要使用多少个并行发送器。

  • 我们当前的事件存储实现是:为存储在事件存储里的每一个事件发布一个单独的,小的消息到消息总线上。我们可以将其中一些消息组合在一起,以减少服务总线上的I/O操作总数。例如,大型会议的可用座位(SeatsAvailability)聚合实例发布大量事件,订单(Order)聚合以突发方式发布事件(当创建订单(Order)聚合时,它同时发布OrderPlaced事件和OrderTotalsCalculated事件)。这还将有助于减少系统中的延迟,因为目前,在那些顺序很重要的场景中,我们必须在发送下一个事件之前等待一个事件已被发送的确认。将事件序列分组到一条消息中意味着我们不需要在发布单个事件之间等待确认。

增强可伸缩性的进一步更改

Contoso会议管理系统允许您部署web和工作者角色的多个实例,从而扩展应用程序以处理更大的负载。然而,该设计并不是完全可伸缩的,因为系统的其他一些元素,例如消息总线和数据存储对最大可实现的吞吐量有限制。本节概述了我们可以对系统进行的一些更改,以删除其中的一些约束,并显著提高系统的可伸缩性。这次旅程的可用时间有限,所以没能在V3版本中进行这些更改。

  • 数据分区:系统在不同的分区中存储不同类型的数据。在启动代码中,您可以看到不同的限界上下文如何使用不同的连接字符串连接到SQL数据库实例。但是,每个限界上下文目前使用一个SQL数据库实例,我们可以将其更改为使用多个不同的实例,每个实例都包含系统使用的特定数据集。例如,订单和注册限界上下文可以为不同的读取模型使用不同的SQL数据库实例。我们还可以考虑使用federations特性来使用分片扩展一些SQL数据库实例。

    “数据持久性是大多数可伸缩SaaS企业面临的最困难的技术问题。”
    -Evan Cooke, CTO, Twilio,Scaling High-Availability Infrastructure in the Cloud

    Jana(软件架构师)发言:

    在系统将数据存储在Azure表存储中的地方,我们选择用键对数据进行分区以实现可伸缩性。作为使用SQL数据库federations对数据进行切分的替代方法,我们可以将SQL数据库实例中当前的一些读模型数据移动到Azure表存储或blob存储中。

  • 进一步划分服务总线:通过为不同的事件发布者使用不同的Topic,我们已经对服务总线进行了划分,以避免在系统发送的消息量接近服务总线能够处理的最大吞吐量时进行节流。我们可以使用多个相似的Topic来进一步划分主题,并通过循环监听它们来分担负载。有关此方法的详细描述,请参见Abbott和Fisher在Scalability Rules: 50 Principles for Scaling Web Sites, (Addison-Wesley, 2011)中的第11章"Asynchronous Communication and Message Buses"
  • 存储和转发:我们在前面关于性能改进的小节中介绍了存储和转发设计。通过批处理多个操作,您不仅减少了到数据存储的往返次数,并减少了系统中的延迟,还增强了系统的可伸缩性,因为发出更少的请求可以减少对数据存储的压力。
  • 监听节流指示器并对其作出反应:目前,系统使用Transient Fault Handling Application Block来检测瞬时错误条件,比如从Azure服务总线、SQL数据库实例和Azure表存储中检测节流指示器。系统使用Block在这些场景中实现重试,通常使用指数回退策略。目前,我们在单个订阅级别使用动态节流,但是,我们希望修改它来对特定主题的所有订阅执行动态节流。类似地,我们希望在SQL数据库实例级和Azure存储帐户级实现动态节流。

    Jana(软件架构师)发言:

    在应用程序里实现动态节流的一个例子是从服务阻止节流,看EventStoreBusPublisher SubscriptionReceiver, SessionSubscriptionReceiver类是怎样使用DynamicThrottling类来管理他们所使用的并行程度来发送或接收消息的。

    Poe(IT运维人员)发言:

    每一个服务(Azure服务总线, SQL数据库,Azure storage)都有自己独特的方式来实现节流行为,并在负载过重时通知您。例如,请参见SQL Azure Throttling。重要的是要了解应用程序使用的不同服务可能会对您的应用程序造成的所有节流。

    Poe(IT运维人员)发言:

    团队还考虑使用Azure SQL数据库商业版来取代Azure SQL数据库Web版,但经过调查,我们确定目前版本之间的唯一区别是最大数据库大小。不同版本没有进行调优以支持不同类型的工作负载,而且两个版本实现了相同的节流行为。

有关可伸缩性的其他信息,请参阅:

在谈到可伸缩性和高可用性时,重要的是不要抱有错误的乐观态度。尽管使用许多建议的实践,应用程序往往可以更有效地伸缩,并且对失败更有弹性,但它们仍然容易出现高需求瓶颈。确保为性能测试和实现性能目标分配足够的时间。

不停机迁移

“我常说,任何冒险工作的三分之二都是做准备” Amelia Earhart

团队计划在Azure中进行从V2到V3版本的无停机迁移。为了实现这一点,迁移过程使用一个运行在Azure工作者角色中的特殊处理器来执行一些迁移步骤。

迁移过程仍然需要您完成一个配置步骤来关闭V2处理器并打开V3处理器。回想起来,我们应该使用一种不同的机制来简化从V2到V3处理器的转换,该转换基于处理程序本身的反馈,以指示它们何时完成了处理。

有关这些步骤的详细信息,请参见附录1“发布说明”。

Poe(IT运维人员)发言:

在生产环境中执行迁移之前,应该始终在测试环境中演练迁移。

重建读模型

在从V2迁移到V3期间,我们必须执行的步骤之一是通过重播事件日志中的事件来重新构建DraftOrderPricedOrder视图模型,以填充新的V3读模型表。我们可以异步执行此操作。然而,在某个时候,我们需要开始将事件从活动的应用程序发送到这些读模型。此外,我们需要保持这些读模型的V2和V3版本都是最新的,直到迁移过程完成,因为V2前端web角色需要V2的读取模型数据可用,直到切换到V3前端web角色。在切换到V3前端时,我们必须确保V3读取的模型完全是最新的。

为了使这些读取模型保持最新,我们创建了一个作为Azure工作者角色的临时处理器,它在迁移过程中运行。有关更多细节,请参阅会Conference解决方案中的MigrationToV3项目。该处理器执行的步骤是:

  • 创建一组新的Topic订阅,这些订阅将接收活动事件,这些活动事件将用于填充新的V3读模型。这些订阅将开始累积V3应用程序部署时将处理的事件。
  • 重播事件日志中的事件,用历史数据填充新的V3读取模型。
  • 处理活动事件并使V2的读模型保持最新,直到V3前端是活动的,此时我们不再需要V2的读模型。

迁移过程首先从事件存储中重播事件,以填充新的V3读模型。当这一切完成时,我们停止包含事件处理程序的V2处理器,并在V3处理器中启动新的处理程序。当它们运行并跟踪新Topic订阅中积累的事件时,ad-hoc处理器还使V2的读模型保持最新,因为此时我们仍然拥有V2前端。当V3工作者角色准备好时,我们可以执行一个VIP切换来使用新的V3前端。在V3前端运行之后,我们不再需要V2读模型。

使用这种方法要解决的问题之一是,如何确定新的V3处理器应该在什么时候从处理事件日志中的存档事件切换到处理实时的事件流。在将事件写入事件日志的过程中存在一些延迟,因此瞬时切换可能导致一些事件的丢失。团队决定允许V3处理器暂时可以同时处理存档事件和实时事件,这意味着可能会有重复的事件,相同的事件存在于事件存储区和由新订阅累积的事件列表中。但是,我们可以检测这些副本并相应地处理它们。

Markus(软件开发人员)发言:

通常,我们依赖于基础设施来检测重复的消息。在这个重复事件可能来自不同来源的特定场景中,我们不能依赖于基础设施,必须显式地将重复检测逻辑添加到代码中。

我们考虑的另一种方法是在V3处理器中同时包含V2和V3处理。使用这种方法,在迁移期间不需要一个特别的工作人员角色来处理V2事件。但是,我们决定将特定于迁移的代码保存在一个单独的项目中,以避免V3发行版由于包含只在迁移期间需要的功能而膨胀。

Jana(软件架构师)发言:

如果我们在V3处理器中同时包含V2和V3处理,迁移过程会稍微容易一些。但我们认为,这种方法的好处被不必在V3处理器中维护重复功能的好处所抵消。

迁移的每个步骤之间的间隔需要一些时间来完成,因此迁移不会导致停机,但是用户确实会遇到延迟。我们可以从处理切换开关的一些更快的机制中获益,比如停止V2处理器并启动V3处理器。

实现细节

本节描述订单和注册限界上下文的实现的一些重要功能。您可能会发现拥有一份代码拷贝很有用,这样您就可以继续学习了。您可以从Download center下载一个副本,或者在GitHub上查看存储库:https://github.com/mspnp/cqrs-journey-code。您可以从GitHub上的Tags页面下载V3版本的代码。

备注:不要期望代码示例与参考实现中的代码完全匹配。本章描述了CQRS过程中的一个步骤,随着我们了解更多并重构代码,实现可能会发生变化。

增强RegistrationProcessManager类

本节描述了团队如何通过检查SeatsReservedOrderPlaced消息的重复实例来强化RegistrationProcessManager流程管理器。

检测无序的SeatsReserved事件

通常,RegistrationProcessManager类向SeatAvailability聚合发送一个MakeSeatReservation命令,SeatAvailability聚合在进行预订时发布一个SeatsReserved事件,RegistrationProcessManager接收此通知。RegistrationProcessManager在创建订单和更新订单时都发送一条MakeSeatReservation命令。SeatsReserve事件到达的时候可能不是按顺序的,但是,系统应该尊重与最后发送的命令相关的事件。本节描述的解决方案使RegistrationProcessManager能够识别最新的SeatsReserved消息,然后忽略任何较早的消息,而不是重新处理它们。

RegistrationProcessManager类发送MakeSeatReservation命令之前,它将该命令的Id保存在SeatReservationCommandId变量中,如下面的代码示例所示:

public void Handle(OrderPlaced message)
{
    if (this.State == ProcessState.NotStarted)
    {
        this.ConferenceId = message.ConferenceId;
        this.OrderId = message.SourceId;
        // Use the order id as an opaque reservation id for the seat reservation. 
        // It could be anything else, as long as it is deterministic from the    
        // OrderPlaced event.
        this.ReservationId = message.SourceId;
        this.ReservationAutoExpiration = message.ReservationAutoExpiration;
        var expirationWindow = 
            message.ReservationAutoExpiration.Subtract(DateTime.UtcNow);

        if (expirationWindow > TimeSpan.Zero)
        {
            this.State = ProcessState.AwaitingReservationConfirmation;
            var seatReservationCommand =
                new MakeSeatReservation
                {
                    ConferenceId = this.ConferenceId,
                    ReservationId = this.ReservationId,
                    Seats = message.Seats.ToList()
                };
            this.SeatReservationCommandId = seatReservationCommand.Id;

            this.AddCommand(new Envelope<ICommand>(seatReservationCommand)
            {
                TimeToLive = expirationWindow.Add(TimeSpan.FromMinutes(1)),
            });

            ...
}

然后,当它处理SeatsReserved事件时,它检查该事件的CorrelationId属性是否匹配SeatReservationCommandId变量的最新值,如下面的代码示例所示:

public void Handle(Envelope<SeatsReserved> envelope)
{
    if (this.State == ProcessState.AwaitingReservationConfirmation)
    {
        if (envelope.CorrelationId != null)
        {
            if (string.CompareOrdinal(this.SeatReservationCommandId.ToString(), envelope.CorrelationId) != 0)
            {
                // Skip this event.
                Trace.TraceWarning("Seat reservation response for reservation id {0} does not match the expected correlation id.", envelope.Body.ReservationId);
                return;
            }
        }

        ...
}

注意这个Handle方法如何处理Envelope实例而不是SeatsReserved实例。作为V3版本的一部分,事件被封装在一个包含CorrelationId属性的Envelope实例中。EventDispatcher中的DoDispatchMessage方法分配关联Id的值。

Markus(软件开发人员)发言:

作为添加此功能的副作用,EventProcessor类在将事件转发给处理程序时,不能再使用dynamic关键字。现在在V3中,它使用了新的EventDispatcher类,该类使用反射来标识给定消息类型的正确处理程序。

在性能测试期间,团队发现了这个特定的SeatsReserved事件的另一个问题。由于系统在加载时其他地方出现了延迟,因此第二份SeatsReserved事件被发布了。然后,这个Handle方法抛出一个异常,导致系统在将消息发送到dead-letter队列之前多次重试处理该消息。为了解决这个特定的问题,团队修改了这个方法,添加了else if子句,如下面的代码示例所示:

public void Handle(Envelope<SeatsReserved> envelope)
{
    if (this.State == ProcessState.AwaitingReservationConfirmation)
    {
        ...
    }
    else if (string.CompareOrdinal(this.SeatReservationCommandId.ToString(), envelope.CorrelationId) == 0)
    {
        Trace.TraceInformation("Seat reservation response for request {1} for reservation id {0} was already handled. Skipping event.", envelope.Body.ReservationId, envelope.CorrelationId);
    }
    else
    {
        throw new InvalidOperationException("Cannot handle seat reservation at this stage.");
    }
}

Markus(软件开发人员)发言:

此优化仅应用于此特定消息。注意,它使用了之前保存在实例中的SeatReservationCommandId属性的值。如果希望对其他消息执行这种检查,则需要在流程管理器中存储更多信息。

检测重复的OrderPlaced事件

为了检测重复的OrderPlaced事件,RegistrationProcessManagerRouter类现在执行一个检查,以查看事件是否已经被处理。V3版本的新代码如下面的代码示例所示:

public void Handle(OrderPlaced @event)
{
    using (var context = this.contextFactory.Invoke())
    {
        var pm = context.Find(x => x.OrderId == @event.SourceId);
        if (pm == null)
        {
            pm = new RegistrationProcessManager();
        }

        pm.Handle(@event);
        context.Save(pm);
    }
}

当RegistrationProcessManager类保存状态并发送命令时创建伪事务

Azure中不可能有包含将RegistrationProcessManager持久化到存储里并发送命令的事务。因此,团队决定保存流程管理器生成的所有命令,以便在流程崩溃时不会丢失这些命令,它们可以稍后发送。我们使用另一个进程来可靠地处理发送命令。

Markus(软件开发人员)发言:

已经迁移到V3版本的迁移实用程序更新了数据库模式,以适应新的存储需求。

下面来自SqlProcessDataContext类的代码示例显示了系统如何持久化所有命令以及进程管理器的状态:

public void Save(T process)
{
    var entry = this.context.Entry(process);

    if (entry.State == System.Data.EntityState.Detached)
        this.context.Set<T>().Add(process);

    var commands = process.Commands.ToList();
    UndispatchedMessages undispatched = null;
    if (commands.Count > 0)
    {
        // If there are pending commands to send, we store them as undispatched.
        undispatched = new UndispatchedMessages(process.Id)
                            {
                                Commands = this.serializer.Serialize(commands)
                            };
        this.context.Set<UndispatchedMessages>().Add(undispatched);
    }

    try
    {
        this.context.SaveChanges();
    }
    catch (DbUpdateConcurrencyException e)
    {
        throw new ConcurrencyException(e.Message, e);
    }

    this.DispatchMessages(undispatched, commands);
}

下面来自SqlProcessDataContext类的代码示例展示了系统如何发送命令消息:

private void DispatchMessages(UndispatchedMessages undispatched, List<Envelope<ICommand>> deserializedCommands = null)
{
if (undispatched != null)
{
if (deserializedCommands == null)
{
deserializedCommands = this.serializer.Deserialize<IEnumerable<Envelope<ICommand>>>(undispatched.Commands).ToList();
}

var originalCommandsCount = deserializedCommands.Count;
try
{
while (deserializedCommands.Count > 0)
{
this.commandBus.Send(deserializedCommands.First());
deserializedCommands.RemoveAt(0);
}
}
catch (Exception)
{
// We catch a generic exception as we don't know what implementation of ICommandBus we might be using.
if (originalCommandsCount != deserializedCommands.Count)
{
// If we were able to send some commands, then update the undispatched messages.
undispatched.Commands = this.serializer.Serialize(deserializedCommands);
try
{
this.context.SaveChanges();
}
catch (DbUpdateConcurrencyException)
{
// If another thread already dispatched the messages, ignore and surface original exception instead.
}
}

throw;
}

// We remove all the undispatched messages for this process manager.
this.context.Set<UndispatchedMessages>().Remove(undispatched);
this.retryPolicy.ExecuteAction(() => this.context.SaveChanges());
}
}

DispatchMessages方法还从SqlProcessDataContext类中的Find方法调用,以便当系统重新还原(rehydrates)RegistrationProcessManager实例时,它会尝试发送任何未发送的消息。

优化UI流程

第一个优化是允许UI直接导航到注册者页面,前提是会议还有很多座位可用。RegistrationController类的StartRegistration方法介绍了这个变化,它现在会在创建预定并发送RegisterToConference命令之前执行一个额外的检查,确认有足够的剩余座位,如下面的代码示例所示:

[HttpPost]
public ActionResult StartRegistration(RegisterToConference command, int orderVersion)
{
    var existingOrder = orderVersion != 0 ? this.orderDao.FindDraftOrder(command.OrderId) : null;
    var viewModel = existingOrder == null ? this.CreateViewModel() : this.CreateViewModel(existingOrder);
    viewModel.OrderId = command.OrderId;

    if (!ModelState.IsValid)
    {
        return View(viewModel);
    }

    // Checks that there are still enough available seats, and the seat type IDs submitted are valid.
    ModelState.Clear();
    bool needsExtraValidation = false;
    foreach (var seat in command.Seats)
    {
        var modelItem = viewModel.Items.FirstOrDefault(x => x.SeatType.Id == seat.SeatType);
        if (modelItem != null)
        {
            if (seat.Quantity > modelItem.MaxSelectionQuantity)
            {
                modelItem.PartiallyFulfilled = needsExtraValidation = true;
                modelItem.OrderItem.ReservedSeats = modelItem.MaxSelectionQuantity;
            }
        }
        else
        {
            // Seat type no longer exists for conference.
            needsExtraValidation = true;
        }
    }

    if (needsExtraValidation)
    {
        return View(viewModel);
    }

    command.ConferenceId = this.ConferenceAlias.Id;
    this.commandBus.Send(command);

    return RedirectToAction(
        "SpecifyRegistrantAndPaymentDetails",
        new { conferenceCode = this.ConferenceCode, orderId = command.OrderId, orderVersion = orderVersion });
}

如果没有足够的可用座位,控制器将重新显示当前屏幕,显示当前可用的座位数量,以便注册者修改其订单。

更改的其余部分在RegistrationController类中的SpecifyRegistrantAndPaymentDetails方法中。下面来自V2版本的代码示例显示,在优化之前,控制器在继续跳转到注册页面之前调用WaitUntilSeatsAreConfirmed方法:

[HttpGet]
[OutputCache(Duration = 0, NoStore = true)]
public ActionResult SpecifyRegistrantAndPaymentDetails(Guid orderId, int orderVersion)
{
    var order = this.WaitUntilSeatsAreConfirmed(orderId, orderVersion);
    if (order == null)
    {
        return View("ReservationUnknown");
    }

    if (order.State == DraftOrder.States.PartiallyReserved)
    {
        return this.RedirectToAction("StartRegistration", new { conferenceCode = this.ConferenceCode, orderId, orderVersion = order.OrderVersion });
    }

    if (order.State == DraftOrder.States.Confirmed)
    {
        return View("ShowCompletedOrder");
    }

    if (order.ReservationExpirationDate.HasValue && order.ReservationExpirationDate < DateTime.UtcNow)
    {
        return RedirectToAction("ShowExpiredOrder", new { conferenceCode = this.ConferenceAlias.Code, orderId = orderId });
    }

    var pricedOrder = this.WaitUntilOrderIsPriced(orderId, orderVersion);
    if (pricedOrder == null)
    {
        return View("ReservationUnknown");
    }

    this.ViewBag.ExpirationDateUTC = order.ReservationExpirationDate;

    return View(
        new RegistrationViewModel
        {
            RegistrantDetails = new AssignRegistrantDetails { OrderId = orderId },
            Order = pricedOrder
        });
}

下面的代码示例显示了这个方法的V3版本,它不再等待预订被确认:

[HttpGet]
[OutputCache(Duration = 0, NoStore = true)]
public ActionResult SpecifyRegistrantAndPaymentDetails(Guid orderId, int orderVersion)
{
    var pricedOrder = this.WaitUntilOrderIsPriced(orderId, orderVersion);
    if (pricedOrder == null)
    {
        return View("PricedOrderUnknown");
    }

    if (!pricedOrder.ReservationExpirationDate.HasValue)
    {
        return View("ShowCompletedOrder");
    }

    if (pricedOrder.ReservationExpirationDate < DateTime.UtcNow)
    {
        return RedirectToAction("ShowExpiredOrder", new { conferenceCode = this.ConferenceAlias.Code, orderId = orderId });
    }

    return View(
        new RegistrationViewModel
        {
            RegistrantDetails = new AssignRegistrantDetails { OrderId = orderId },
            Order = pricedOrder
        });
}
备注:我们将在稍后的旅程中使这个方法异步。

UI流程的第二个优化是在流程的前面执行订单总数的计算。在上面的代码示例中,SpecifyRegistrantAndPaymentDetails方法仍然调用WaitUntilOrderIsPriced方法,这将暂停界面流直到系统计算出订单的总数并使其可用于控制器(在读端保存在priced-order视图模型中)。

实现此功能的关键变更是在订单(Order)聚合里。Order类中的构造函数现在调用CalculateTotal方法并引发OrderTotalsCalculated事件,如下面的代码示例所示:

public Order(Guid id, Guid conferenceId, IEnumerable<OrderItem> items, IPricingService pricingService)
    : this(id)
{
    var all = ConvertItems(items);
    var totals = pricingService.CalculateTotal(conferenceId, all.AsReadOnly());

    this.Update(new OrderPlaced
    {
        ConferenceId = conferenceId,
        Seats = all,
        ReservationAutoExpiration = DateTime.UtcNow.Add(ReservationAutoExpiration),
        AccessCode = HandleGenerator.Generate(6)
    });
    this.Update(new OrderTotalsCalculated { Total = totals.Total, Lines = totals.Lines != null ? totals.Lines.ToArray() : null, IsFreeOfCharge = totals.Total == 0m });
}

之前,在V2版本中,订单(Order)聚合一直等到收到MarkAsReserved命令才调用CalculateTotal方法。

异步接收、完成和发送消息

本节概述了系统现在如何异步地在Azure服务总线上执行所有I/O。

异步接收消息

SubscriptionReceiverSessionSubscriptionReceiver类现在异步接收消息,而不是在ReceiveMessages方法的循环中同步接收消息。

有关详细信息,请参阅SubscriptionReceiver类中的ReceiveMessages方法或SessionSubscriptionReceiver类中的ReceiveMessagesAndCloseSession方法。

Markus(软件开发人员)发言:

此代码示例还展示了如何使用Transient Fault Handling Application Block来可靠地异步接收来自服务总线Topic的消息。异步循环使代码更难以读取,但效率更高。这是推荐的最佳实践。这段代码将受益于c# 4中新的async关键字。

异步完成消息

系统使用peek/lock机制从服务总线Topic订阅中检索消息。要了解系统如何异步执行这些操作,请参阅SubscriptionReceiverSessionSubscriptionReceiver类中的ReceiveMessages方法。这提供了一个系统如何使用异步api的例子。

异步发送消息

应用程序现在异步发送服务总线上的所有消息。有关详细信息,请参见TopicSender类。

在进程中同步处理命令

在V2版本中,系统使用Azure服务总线将所有命令传递给它们的接收者。这意味着系统异步地交付命令。在V3版本中,MVC控制器现在同步地在进程中发送命令,以便通过绕过命令总线并将命令直接传递给处理程序来改进UI中的响应时间。此外,在ConferenceProcessor工作者角色中,发送到订单(Order)聚合的命令使用相同的机制在进程中同步发送。

Markus(软件开发人员)发言:

我们仍然异步地向可用座位(SeatsAvailability)聚合发送命令,因为随着RegistrationProcessManager的多个实例并行运行,将会出现争用,因为多个线程都试图访问可用座位(SeatsAvailability)聚合的同一个实例。

团队实现这种行为通过添加SynchronousCommandBusDecoratorCommandDispatcher类到基础设施并且在web角色启动的时候注册它们,如下面的代码展示了Global.asax.Azure.cs文件里的OnCreateContainer方法**:

var commandBus = new CommandBus(new TopicSender(settings.ServiceBus, "conference/commands"), metadata, serializer);
var synchronousCommandBus = new SynchronousCommandBusDecorator(commandBus);

container.RegisterInstance<ICommandBus>(synchronousCommandBus);
container.RegisterInstance<ICommandHandlerRegistry>(synchronousCommandBus);


container.RegisterType<ICommandHandler, OrderCommandHandler>("OrderCommandHandler");
container.RegisterType<ICommandHandler, ThirdPartyProcessorPaymentCommandHandler>("ThirdPartyProcessorPaymentCommandHandler");
container.RegisterType<ICommandHandler, SeatAssignmentsHandler>("SeatAssignmentsHandler");
备注:在Conference.Azure.cs文件中也有类似的代码,用于配置工作角色,以便在进程中发送一些命令。

下面的代码示例展示了SynchronousCommandBusDecorator类如何实现命令消息的发送:

public class SynchronousCommandBusDecorator : ICommandBus, ICommandHandlerRegistry
{
    private readonly ICommandBus commandBus;
    private readonly CommandDispatcher commandDispatcher;

    public SynchronousCommandBusDecorator(ICommandBus commandBus)
    {
        this.commandBus = commandBus;
        this.commandDispatcher = new CommandDispatcher();
    }

    ...

    public void Send(Envelope<ICommand> command)
    {
        if (!this.DoSend(command))
        {
            Trace.TraceInformation("Command with id {0} was not handled locally. Sending it through the bus.", command.Body.Id);
            this.commandBus.Send(command);
        }
    }

    ...

    private bool DoSend(Envelope<ICommand> command)
    {
        bool handled = false;

        try
        {
            var traceIdentifier = string.Format(CultureInfo.CurrentCulture, " (local handling of command with id {0})", command.Body.Id);
            handled = this.commandDispatcher.ProcessMessage(traceIdentifier, command.Body, command.MessageId, command.CorrelationId);

        }
        catch (Exception e)
        {
            Trace.TraceWarning("Exception handling command with id {0} synchronously: {1}", command.Body.Id, e.Message);
        }

        return handled;
    }
}

注意这个类是如何尝试在不使用服务总线的情况下同步发送命令,但是如果它找不到该命令的处理程序,它将返回到使用服务总线。下面的代码示例展示了CommandDispatcher类如何试图定位处理程序并传递命令消息:

public bool ProcessMessage(string traceIdentifier, ICommand payload, string messageId, string correlationId)
{
    var commandType = payload.GetType();
    ICommandHandler handler = null;

    if (this.handlers.TryGetValue(commandType, out handler))
    {
        Trace.WriteLine("-- Handled by " + handler.GetType().FullName + traceIdentifier);
        ((dynamic)handler).Handle((dynamic)payload);
        return true;
    }
    else
    {
        return false;
    }
}

使用memento模式实现快照

在Contoso会议管理系统中,唯一有事件源的聚合就是可用座位(SeatAvailability)聚合。它可能包含大量的事件并且可以从快照中获益。

Markus(软件开发人员)发言:

因为我们选择使用memento模式,所以聚合状态的快照存储在memento中。

下面的代码示例来自AzureEventSourcedRepository类中的Save方法,展示了如果存在缓存且聚合实现了IMementoOriginator接口,系统如何创建缓存的memento对象。

public void Save(T eventSourced, string correlationId)
{
    ...

    this.cacheMementoIfApplicable.Invoke(eventSourced);
}

然后,当系统调用AzureEventSourcedRepository类中的Find方法加载聚合时,它会检查是否有缓存的memento其中包含要使用对象状态的快照:

private readonly Func<Guid, Tuple<IMemento, DateTime?>> getMementoFromCache;

...

public T Find(Guid id)
{
var cachedMemento = this.getMementoFromCache(id);
if (cachedMemento != null && cachedMemento.Item1 != null)
{
IEnumerable<IVersionedEvent> deserialized;
if (!cachedMemento.Item2.HasValue || cachedMemento.Item2.Value < DateTime.UtcNow.AddSeconds(-1))
{
deserialized = this.eventStore.Load(GetPartitionKey(id), cachedMemento.Item1.Version + 1).Select(this.Deserialize);
}
else
{
deserialized = Enumerable.Empty<IVersionedEvent>();
}

return this.originatorEntityFactory.Invoke(id, cachedMemento.Item1, deserialized);
}
else
{
var deserialized = this.eventStore.Load(GetPartitionKey(id), 0)
.Select(this.Deserialize)
.AsCachedAnyEnumerable();

if (deserialized.Any())
{
return this.entityFactory.Invoke(id, deserialized);
}
}

return null;
}

如果缓存条目在最近几秒钟内更新了,那么它很有可能没有过期,因为我们只有一个写入者用于高争用聚合。因此,当memento创建之后,我们乐观的不用在事件存储中检查新事件。否则,我们需要在事件存储中检查创建memento之后到达的事件。

下面的代码示例显示了SeatsAvailability类如何将其状态数据快照添加到要缓存的memento对象中:

public IMemento SaveToMemento()
{
    return new Memento
    {
        Version = this.Version,
        RemainingSeats = this.remainingSeats.ToArray(),
        PendingReservations = this.pendingReservations.ToArray(),
    };
}

并行发布事件

在第5章“准备发布V1版本”中,您了解了系统如何在将事件保存到事件存储时发布事件。这种优化使系统能够并行发布其中一些事件,而不是按顺序发布。重要的是,与特定聚合实例关联的事件必须按照正确的顺序发送,因此系统只为不同的分区键创建新任务。下面的代码示例来自EventStoreBusPublisher类中的Start方法,展示了如何定义并行任务:

Task.Factory.StartNew(
    () =>
    {
        try
        {
            foreach (var key in GetThrottlingEnumerable(this.enqueuedKeys.GetConsumingEnumerable(cancellationToken), this.throttlingSemaphore, cancellationToken))
            {
                if (!cancellationToken.IsCancellationRequested)
                {
                    ProcessPartition(key);
                }
                else
                {
                    this.enqueuedKeys.Add(key);
                    return;
                }
            }
        }
        catch (OperationCanceledException)
        {
            return;
        }
    },
    TaskCreationOptions.LongRunning);

SubscriptionReceiverSessionSubscriptionReceiver类使用相同的DynamicThrottling类来动态限制从服务总线检索消息的速度。

在订阅中过滤消息

团队向Azure服务总线订阅添加了过滤器,以将每个订阅接收到的消息限制为订阅打算处理的消息。您可以在Settings.Template.xml文件中看到这些过滤器的定义,如下面的代码片段所示:

<Topic Path="conference/events" IsEventBus="true">
  <Subscription Name="log" RequiresSession="false"/>
  <Subscription Name="Registration.RegistrationPMOrderPlaced" RequiresSession="false" SqlFilter="TypeName IN ('OrderPlaced')"/>
  <Subscription Name="Registration.RegistrationPMNextSteps" RequiresSession="false" SqlFilter="TypeName IN ('OrderUpdated','SeatsReserved','PaymentCompleted','OrderConfirmed')"/>
  <Subscription Name="Registration.OrderViewModelGenerator" RequiresSession="true" SqlFilter="TypeName IN ('OrderPlaced','OrderUpdated','OrderPartiallyReserved','OrderReservationCompleted','OrderRegistrantAssigned','OrderConfirmed','OrderPaymentConfirmed')"/>
  <Subscription Name="Registration.PricedOrderViewModelGenerator" RequiresSession="true" SqlFilter="TypeName IN ('OrderPlaced','OrderTotalsCalculated','OrderConfirmed','OrderExpired','SeatAssignmentsCreated','SeatCreated','SeatUpdated')"/>
  <Subscription Name="Registration.ConferenceViewModelGenerator" RequiresSession="true" SqlFilter="TypeName IN ('ConferenceCreated','ConferenceUpdated','ConferencePublished','ConferenceUnpublished','SeatCreated','SeatUpdated','AvailableSeatsChanged','SeatsReserved','SeatsReservationCancelled')"/>
  <Subscription Name="Registration.SeatAssignmentsViewModelGenerator" RequiresSession="true" SqlFilter="TypeName IN ('SeatAssignmentsCreated','SeatAssigned','SeatUnassigned','SeatAssignmentUpdated')"/>
  <Subscription Name="Registration.SeatAssignmentsHandler" RequiresSession="true" SqlFilter="TypeName IN ('OrderConfirmed','OrderPaymentConfirmed')"/>
  <Subscription Name="Conference.OrderEventHandler" RequiresSession="true" SqlFilter="TypeName IN ('OrderPlaced','OrderRegistrantAssigned','OrderTotalsCalculated','OrderConfirmed','OrderExpired','SeatAssignmentsCreated','SeatAssigned','SeatAssignmentUpdated','SeatUnassigned')"/>

  ...
</Topic>

为可用座位(SeatsAvailability)聚合创建一个专用的SessionSubscriptionReceiver实例

在V2版本中,系统没有为命令使用会话,因为我们不需要命令的顺序保证。然而,我们现在希望为命令使用会话来保证每个可用座位(SeatsAvailability)聚合实例都有一个监听者,这将在不从这个高争用聚合中获得大量并发异常的情况下帮助我们进行扩展。

Conference.Processor.Azure.cs文件中的以下代码示例显示了系统如何创建一个专用的SessionSubscriptionReceiver实例来接收发送到可用座位(SeatsAvailability)聚合的消息:

var seatsAvailabilityCommandProcessor =
    new CommandProcessor(new SessionSubscriptionReceiver(azureSettings.ServiceBus, Topics.Commands.Path, Topics.Commands.Subscriptions.SeatsAvailability, false), serializer);

...

container.RegisterInstance<IProcessor>("SeatsAvailabilityCommandProcessor", seatsAvailabilityCommandProcessor);

下面的代码示例显示了新的抽象SeatsAvailabilityCommand类,其中包含一个基于与该命令关联的会议的会话ID:

public abstract class SeatsAvailabilityCommand : ICommand, IMessageSessionProvider
{
    public SeatsAvailabilityCommand()
    {
        this.Id = Guid.NewGuid();
    }

    public Guid Id { get; set; }
    public Guid ConferenceId { get; set; }

    string IMessageSessionProvider.SessionId
    {
        get { return "SeatsAvailability_" + this.ConferenceId.ToString(); }
    }
}

命令总线现在使用一个单独的订阅来订阅为可用座位(SeatsAvailability)聚合指定的命令。

Markus(软件开发人员)发言:

团队对RegistrationProcessManager流程管理器应用了类似的技术,为OrderPlaced事件创建单独的订阅来处理新订单。一个单独的订阅接收指定给流程管理器的所有其他事件。

缓存读模型的数据

作为V3版本中性能优化的一部分,团队为存储在订单和注册限界上下文读模型中的会议信息添加了缓存行为。这减少了读取这些常用数据所花费的时间。

下面的代码示例来自CachingConferenceDao类中的GetPublishedSeatTypes方法,展示了系统如何根据可用座位的数量决定是否缓存会议数据:

TimeSpan timeToCache;
if (seatTypes.All(x => x.AvailableQuantity > 200 || x.AvailableQuantity <= 0))
{
    timeToCache = TimeSpan.FromMinutes(5);
}
else if (seatTypes.Any(x => x.AvailableQuantity < 30 && x.AvailableQuantity > 0))
{
    // There are just a few seats remaining. Do not cache.
    timeToCache = TimeSpan.Zero;
}
else if (seatTypes.Any(x => x.AvailableQuantity < 100 && x.AvailableQuantity > 0))
{
    timeToCache = TimeSpan.FromSeconds(20);
}
else
{
    timeToCache = TimeSpan.FromMinutes(1);
}

if (timeToCache > TimeSpan.Zero)
{
    this.cache.Set(key, seatTypes, new CacheItemPolicy { AbsoluteExpiration = DateTimeOffset.UtcNow.Add(timeToCache) });
}

Jana(软件架构师)发言:

您可以看到,通过调整缓存时间,甚至决定根本不缓存数据,我们是如何管理“显示陈旧数据”相关的风险。

系统现在还使用缓存来保存PricedOrderViewModelGenerator类中的座位类型描述。

使用多个Topic划分服务总线

为了减少流经服务总线Topic的消息数量,我们创建了两个附加主题来传输订单(Order)和可用座位(SeatAvailability)聚合发布的事件,从而对服务总线进行了分区。这有助于我们避免在应用程序承受非常高的负载时被服务总线节流。Settings.xml文件中的以下片段展示了这些新Topic的定义:

Topic Path="conference/orderevents" IsEventBus="true">
  <Subscription Name="logOrders" RequiresSession="false"/>
  <Subscription Name="Registration.RegistrationPMOrderPlacedOrders" RequiresSession="false"
    SqlFilter="TypeName IN ('OrderPlaced')"/>
  <Subscription Name="Registration.RegistrationPMNextStepsOrders" RequiresSession="false"
    SqlFilter="TypeName IN ('OrderUpdated','SeatsReserved','PaymentCompleted','OrderConfirmed')"/>
  <Subscription Name="Registration.OrderViewModelGeneratorOrders" RequiresSession="true"
    SqlFilter="TypeName IN ('OrderPlaced','OrderUpdated','OrderPartiallyReserved','OrderReservationCompleted',
    'OrderRegistrantAssigned','OrderConfirmed','OrderPaymentConfirmed')"/>
  <Subscription Name="Registration.PricedOrderViewModelOrders" RequiresSession="true"
    SqlFilter="TypeName IN ('OrderPlaced','OrderTotalsCalculated','OrderConfirmed',
    'OrderExpired','SeatAssignmentsCreated','SeatCreated','SeatUpdated')"/>
  <Subscription Name="Registration.SeatAssignmentsViewModelOrders" RequiresSession="true"
    SqlFilter="TypeName IN ('SeatAssignmentsCreated','SeatAssigned','SeatUnassigned','SeatAssignmentUpdated')"/>
  <Subscription Name="Registration.SeatAssignmentsHandlerOrders" RequiresSession="true"
    SqlFilter="TypeName IN ('OrderConfirmed','OrderPaymentConfirmed')"/>
  <Subscription Name="Conference.OrderEventHandlerOrders" RequiresSession="true"
    SqlFilter="TypeName IN ('OrderPlaced','OrderRegistrantAssigned','OrderTotalsCalculated',
    'OrderConfirmed','OrderExpired','SeatAssignmentsCreated','SeatAssigned','SeatAssignmentUpdated','SeatUnassigned')"/>
</Topic>
<Topic Path="conference/availabilityevents" IsEventBus="true">
  <Subscription Name="logAvail" RequiresSession="false"/>
  <Subscription Name="Registration.RegistrationPMNextStepsAvail" RequiresSession="false"
    SqlFilter="TypeName IN ('OrderUpdated','SeatsReserved','PaymentCompleted','OrderConfirmed')"/>
  <Subscription Name="Registration.PricedOrderViewModelAvail" RequiresSession="true"
    SqlFilter="TypeName IN ('OrderPlaced','OrderTotalsCalculated','OrderConfirmed',
    'OrderExpired','SeatAssignmentsCreated','SeatCreated','SeatUpdated')"/>
  <Subscription Name="Registration.ConferenceViewModelAvail" RequiresSession="true"
    SqlFilter="TypeName IN ('ConferenceCreated','ConferenceUpdated','ConferencePublished',
    'ConferenceUnpublished','SeatCreated','SeatUpdated','AvailableSeatsChanged',
    'SeatsReserved','SeatsReservationCancelled')"/>
</Topic>

其他的优化和坚固性更改

本节概述了团队优化应用程序性能和提高其弹性的一些额外方法:

  • 使用顺序GUIDs。
  • 使用异步ASP.NET MVC controllers。
  • 使用预取从服务总线获取多个消息。
  • 并行接受多个Azure服务总线会话。
  • 使座位预订命令过期。

顺序GUIDs

在此之前,系统生成Guid,用于聚合的Id,例如订单和注册聚合。使用Guid.NewGuid方法,它生成随机Guid。如果在SQL数据库实例中使用这些Guid作为主键值,这将导致索引中频繁的页面分割,从而对数据库的性能产生负面影响。在V3版本中,团队添加了一个实用程序类来生成连续的Guid。这确保SQL数据库表中的新条目总是追加在后面,这提高了数据库的整体性能。下面的代码示例显示了新的GuidUtil类:

public static class GuidUtil
{
private static readonly long EpochMilliseconds = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc).Ticks / 10000L;

/// <summary>
/// Creates a sequential GUID according to SQL Server's ordering rules.
/// </summary>
public static Guid NewSequentialId()
{
// This code was not reviewed to guarantee uniqueness under most conditions, nor completely optimize for avoiding
// page splits in SQL Server when doing inserts from multiple hosts, so do not re-use in production systems.
var guidBytes = Guid.NewGuid().ToByteArray();

// Get the milliseconds since Jan 1 1970.
byte[] sequential = BitConverter.GetBytes((DateTime.Now.Ticks / 10000L) - EpochMilliseconds);

// Discard the 2 most significant bytes, as we only care about the milliseconds increasing, but the highest ones should be 0 for several thousand years to come.
if (BitConverter.IsLittleEndian)
{
guidBytes[10] = sequential[5];
guidBytes[11] = sequential[4];
guidBytes[12] = sequential[3];
guidBytes[13] = sequential[2];
guidBytes[14] = sequential[1];
guidBytes[15] = sequential[0];
}
else
{
Buffer.BlockCopy(sequential, 2, guidBytes, 10, 6);
}

return new Guid(guidBytes);
}
}

有关进一步信息,请参见The Cost of GUIDs as Primary KeysGood Page Splits and Sequential GUID Key Generation

异步ASP.NET MVC controllers

团队将公共会议web应用程序中的一些MVC控制器转换为异步控制器。这避免了阻塞一些ASP.NET线程并使我们能够在里面使用Task类。

例如,团队修改了控制器在读模型中使用计时器的轮询更新方式。

为服务总线采用预读取

当系统从Azure服务总线检索消息时,团队启用了prefetch选项。此选项使系统能够在一次到服务器的往返中检索多个消息,并有助于减少从服务总线Topic检索现有消息的延迟。

下面来自SubscriptionReceiver类的代码示例展示了如何启用此选项。

protected SubscriptionReceiver(ServiceBusSettings settings, string topic, string subscription, bool processInParallel, ISubscriptionReceiverInstrumentation instrumentation, RetryStrategy backgroundRetryStrategy)
{
    this.settings = settings;
    this.topic = topic;
    this.subscription = subscription;
    this.processInParallel = processInParallel;

    this.tokenProvider = TokenProvider.CreateSharedSecretTokenProvider(settings.TokenIssuer, settings.TokenAccessKey);
    this.serviceUri = ServiceBusEnvironment.CreateServiceUri(settings.ServiceUriScheme, settings.ServiceNamespace, settings.ServicePath);

    var messagingFactory = MessagingFactory.Create(this.serviceUri, tokenProvider);
    this.client = messagingFactory.CreateSubscriptionClient(topic, subscription);
    if (this.processInParallel)
    {
        this.client.PrefetchCount = 18;
    }
    else
    {
        this.client.PrefetchCount = 14;
    }

    ...
}

并行接收多个会话

在V2版本中,SessionSubscriptionReceiver创建会话来依次接收来自Azure服务总线的消息。但是,如果您正在使用会话,则只能处理来自该会话的消息。在切换到另一个会话之前,其他消息将被忽略。在V3版本中,SessionSubscriptionReceiver并行创建多个会话,使系统能够同时接收来自多个会话的消息。

有关详细信息,请参见SessionSubscriptionReceiver类中的AcceptSession方法。

Markus(软件开发人员)发言:

AcceptSession方法使用Transient Fault Handling Application Block来可靠地接收会话。

添加一个乐观并发性检查

当系统通过向RegistrationProcessManager类添加时间戳属性来保存RegistrationProcessManager类时,团队还添加了一个乐观并发性检查,如下面的代码示例所示:

[ConcurrencyCheck]
[Timestamp]
public byte[] TimeStamp { get; private set; }

有关更多信息,请参见MSDN网站上的Code First Data Annotations

在进行了乐观并发性检查之后,我们还删除了SessionSubscriptionReceiver类中的锁,这是系统中潜在的瓶颈。

给MakeSeatReservation命令添加一个time-to-live值

Azure服务总线代理消息可以为TimeToLive属性分配一个值,当time-to-live过期时,消息将自动发送到dead-letter队列。如果与MakeSeatReservation命令关联的订单已经过期,应用程序将使用服务总线的这个特性来避免处理MakeSeatReservation命令。

减少到数据库的往返次数

我们在PricedOrderViewModelGenerator类中标识了许多位置,可以在这些位置优化代码。以前,当这个类处理正在预定或过期的订单时,系统对SQL数据库实例进行两次调用,现在系统只发起一个调用。

对测试的影响

在旅程的这个阶段,团队重新组织了Conference.Specflow项目,以更好地反映测试的目的。

集成测试

Conference.Specflow项目中的Features\Integration文件夹中的测试旨在直接测试领域的行为,通过查看发送和接收的命令和事件来验证领域的行为。这些测试的目的是让程序员而不是领域专家能够理解,并使用比普遍使用的语言更专业的词汇表来表示。除了验证领域的行为并帮助开发人员理解系统中的命令流和事件流之外,这些测试还被证明对于在事件丢失或接收顺序错误的场景中测试领域的行为非常有用。

Conference文件夹包含会议管理限界上下文的集成测试,而Registration文件夹包含订单和注册限界上下文的测试。

Markus(软件开发人员)发言:

这些集成测试假定命令处理程序信任命令的发送者发送的是有效的命令消息。这一点可能不适用于正在设计测试的其他系统。

UI测试

UserInterface文件夹包含验收测试。这些测试在第4章“扩展和增强订单和注册限界上下文”有更详细的描述。Controllers文件夹包含使用MVC控制器作为入口点的测试,Views文件夹包含使用WatiN通过其UI驱动系统的测试。

总结

在我们的CQRS旅程和V3伪产品发布的最后阶段中,重点是弹性和性能。下一章总结了我们在整个旅程中所学到的教训,并提出了一些建议,如果我们有机会重新开始我们所学到的知识,我们可能会做得有所不同。