ENode框架Conference案例分析系列之 - 事件溯源如何处理重构问题

时间:2021-12-13 03:44:02

前言

本文可能对大多数不太了解ENode的朋友来说,理解起来比较费劲,这篇文章主要讲思路,而不是一上来就讲结果。我写文章,总是希望能把自己的思考过程尽量能表达出来,能让大家知道每一个设计背后的思考的东西。我觉得,任何设计的结果可能看起来很高大上,一张图即可,但背后的思考,才是更有价值的东西。

本篇文章想写一下ENode如何处理由于业务需求的变化而导致的模型重构的问题。DDD之所以能解决复杂的业务问题是因为DDD是一种模型驱动的软件设计方法。用领域模型来捕获业务需求,根据业务需求,抽象出满足需求的领域模型,并能让模型随着业务需求的不断变化而不断跟着精炼、演进。我想,归纳起来,对领域模型的重构修改主要为以下四种情况:

  1. 只需要修改业务逻辑而不需要修改模型结构,比如只是修改某个业务规则的判断逻辑;
  2. 需要对模型结构做一些小的改动,比如新增、改名、删除一个属性;
  3. 需要对模型做大手术,比如需要将原来的一个聚合拆分为两个独立的聚合;比如电子商务系统中,原本商品的库存信息可能在商品聚合根上,但后来由于库存管理的复杂,需要把库存独立到新的上下文(库存上下文),并独立维护;
  4. 还有一些情况可能会出现把一个聚合根降级为另一个聚合根下的子实体,或者把一个实体升级为一个独立聚合根的情况。这种情况我个人认为一般不会出现,如果出现,一般是原来的领域模型设计出现了大问题导致,而这种情况一般也可以通过新增独立聚合根来解决;

传统应用面对以上重构的处理方式

传统的应用是指没有使用Event Sourcing(ES),比如经典三层架构或经典DDD的四层架构,这些架构通过数据库存放所有业务数据的最新状态。这种架构的处理方式是:

  1. 只修改业务逻辑的情况:直接修改逻辑即可,数据库不需要做修改;
  2. 新增、改名、删除模型属性的情况:对于新增属性的情况,我们只需要在db对应的表中新增字段,然后新增的字段使用默认值即可;对于属性改名的情况,我觉得只需要在代码层面修改属性的名称即可,而DB中表字段的名称无需修改(如果要改字段名,就是大工程了);对于删除模型属性的情况,一般也只需要修改代码即可,DB表中的字段无需删除,或者你一定要删除也无影响;
  3. 对于一个聚合根拆分为两个聚合根的情况:处理比较复杂,一般需要通过新建表、数据迁移、代码逻辑切换几个步骤;
  4. 对于聚合根降级或升级的情况:有时比较简单,因为虽然代码层面,聚合之间的关系改变了,但DB层面,表的结构和关系并没有改变(一般只要我们的数据库表设计时是面向第三范式的,那数据库往往是稳定的)。这种情况,比较简单,直接重构代码即可;但是有时可能出现也需要对表结构做修改甚至大改的情况,这种情况,也需要要像3一样的处理方式;

对于3、4两种场景中需要做数据迁移和切换的情况,一般都是系统需要大改了,一般不会经常发生。如果出现了,那我们要坐下来,根据实际的情况,好好想想要怎么解决。架构师要设计好代码重构方案、数据迁移方案,以及数据访问的切换方案。如果切换失败,还要支持快速的回滚;总之,这是一个大工程。好在我们现在已经有一些成熟的甚至自动化的数据迁移方案(可复用的)或工具,可以帮助我们极大的简化重构的改造成本。

使用ES(Event Sourcing,事件溯源)的架构的处理方式

使用ES的系统,应该承认,重构时要面对的问题要比传统的应用要复杂很多。比如,数据库中除了存储了聚合根的最新状态,还多了一个EventStore,EventStore中存储了每个聚合根产生的所有历史事件。另外,由于是EDA(事件驱动架构),所以,数据不仅仅在DB中,还有一部分消息(command或event)还在队列里,还没有被处理。所以,我们还要处理这些还留在队列里的消息。所以,ES架构的应用,在数据迁移或切换时,除了要迁移CQRS读库中的聚合根的最新状态的数据,还要考虑额外的两个东西:1)EventStore中事件的迁移;2)Queue中未被处理的消息的处理;所以,ES架构的应用,在处理这种大改动时,要做的方案可能比传统的要复杂很多。这也是我想写本文的目的,希望通过一个虚构的例子(实际生活中也可能会发生),来分析ES架构下,我们怎么处理这种情况,希望能给未来使用ES架构开发应用的朋友提供一些借鉴意义。

然后针对上面的4种重构的情况,1比较简单,也只需要改动产生事件的地方的逻辑即可;2也比较简单,只需要修改对应的事件,在事件类中增加或删除属性即可,反正JSON序列化和反序列化都是兼容的;3,4两种情况比较复杂,我们数据迁移时要考虑的东西比传统的架构要多。我上面已经提到了。

拆分聚合根的例子

经过前面几篇文章的介绍,我们知道conference案例中,conferenceManagement上下文(会议管理上下文)负责管理会议的基本信息和会议的所有座位的库存信息。但是假设我们未来随着库存管理的复杂,希望把库存管理的逻辑剥离到独立的上下文,会议座位库存管理上下文(Bounded Context, BC)。这种情况,相当于把原来的conference聚合根拆分为两个聚合根,原conference聚合根只负责维护会议的基本信息,不再维护座位的库存信息;然后库存信息交给一个独立的聚合根(conferenceSeatInventory)维护。一个conference聚合根会有一个唯一的conferenceSeatInventory聚合根对应,但conferenceSeatInventory聚合根的ID可以独立,然后conference聚合根的ID可以作为conferenceSeatInventory聚合根的一个属性。就像Payment聚合根上有一个OrderId一样,呵呵。这样拆分后,我们的会议管理的上下文只需要关注会议本身的基本信息,包括UI的设计也是一样。然后专门有库存管理的系统(包括UI),负责管理会议的座位的库存信息。

这个拆分聚合根的场景是我假想出来的,实际也许并不会发生,也许会发生,取决于我们的业务如何发展。就像阿里的库存中心也许最早的时候并没有独立出来,而是和商品中心在一起的。但后来随着对库存管理的重视或者库存管理(尤其是减库存)的本身业务的不断复杂,我们把库存管理独立出来了。

模型重构

在做数据迁移之前,我们先要把新的库存管理的上下文做好并发布上线。但是此时,这个上下文还不会有什么命令过来。

数据迁移和切换的思路

使用ENode开发的应用程序是一个CQRS+EDA+ES的架构。然后,我们也知道了,这种架构下,我们要迁移的数据有以下三种:

  1. EventStore中的事件;
  2. CQRS读库中的数据;
  3. 分布式队列(EQueue)中还没有被消费的消息:命令、事件;

我的思路是:

  1. 首先把数据迁移的影响范围降低到最小。所以,我们可以先把所有目前和conference聚合根的库存变化相关的命令和事件消息隔离到独立的队列中去。这个事情很简单,因为ENode目前使用的分布式消息队列是EQueue,而EQueue发送消息或订阅消息都是基于Topic的。当我们要发送一个消息时,需要告诉EQueue当前要发送到哪个Topic下的哪个Queue;当我们要订阅消息时,也是告诉EQueue要订阅哪些Topic。所以,我们可以为conference聚合根和库存变化相关的命令、事件配置独立的topic即可(比如叫ChangeConferenceInventoryCommandTopic,ConferenceInventoryChangedEventTopic)。这样一来,我们只需要从这两个独立的topic着手进行数据迁移即可。
  2. 新增一个独立的Event Handler,它负责订阅conference聚合根的库存变化相关的事件,即订阅topic为ConferenceInventoryChangedEventTopic的事件。它的处理逻辑是根据事件,创建并持久化新的和新的聚合根(conferenceSeatInventory)对应的库存修改事件到EventStore。但是这里的情况比较复杂,我需要详细展开下。比如当前我们收到一个事件(事件版本号可能为10),然后发现这个事件对应的conference聚合根还没有对应的conferenceSeatInventory对应的任何事件,因为是第一次同步。那此时我们需要把conference聚合根的所有以前的历史事件(版本号为1到10的所有事件)都从EventStore中拿出来,然后把和库存变化相关的事件转换为对应的新的事件,然后把这些新的事件全部保存到EventStore中。这里有一个问题,就是我们怎么知道当前聚合根的之前的事件是否都已经同步好了呢?一个简单的办法是,每次一个事件过来,我们都从EventStore中查询当前是否已经把这个事件所有之前的事件都同步过了,如果是,那就可以直接同步当前这个事件了。如果不是,那就需要先把之前的事件同步完成。但是这样无疑是低效的。实际上我们可以在当前机器的内存中保存每个聚合根当前已经同步了的事件的版本号。也就是用一个ConcurrentDictionary来保存每个聚合根的已同步事件的版本号。然后,判断时,只需要根据这个字典进行判断即可,这样无疑效率提高很多。但是为何只需要把同步进度缓存在当前机器的内存中呢?为何不是采用分布式缓存呢?因为ENode中的所有的命令或事件消息在路由时,都是根据聚合根ID进行路由的。同一个聚合根ID的消息都是总是被路由到同一个Queue的。而根据EQueue的架构,同一个Queue的消费者总是同一个。所以,我们可以这样做。
  3. 上面的Event Handler只处理了conference聚合根产生的新事件。但是那些以前产生过库存修改事件,但是近期没产生过的聚合根怎么办呢?我们需要通过开发一个事件同步任务(Event Sync Task),该任务扫描EventStore中所有conference聚合根的所有和库存修改相关的事件,并把这些事件转换为新的聚合根的库存修改的事件。可以很容易发现,这个任务和上面的Event Handler可能会出现并发冲突。这种并发冲突是预期之内的,我们可以通过技术手段来解决。比如我们可以通过以被同步的conference聚合根的事件的版本号来做乐观并发控制。当目标的EventStore中持久化同一个目标聚合根(conferenceSeatInventory)的相同版本的事件时,就抛出ConcurrentException即可。然后我们代码中,当遇到这种并发异常时,只需要查询最新以同步的事件版本,然后尝试同步这个事件版本之后的事件即可。这里有一个问题需要提一下,就是EventStore数据库的Events表可能存放的是所有的聚合根的所有的事件,这样这个Events表中的数据是非常多的。所以,我们通常在设计EventStore的Events表时,尽量先做一层业务层面的垂直拆分。即把不同类型的聚合根所产生的事件隔离存储,比如conference聚合根的事件都放在conferenceEvents表里;payment聚合根的所有事件都放在paymentEvents表里。这样一来,我们的事件表里的记录就不会那么多了。但是即便是一个聚合根的所有事件,可能也是非常多的。那我们就需要采用水平分割了,即把同一个聚合根的所有的事件再进行分库分表。这个是题外话了,这本只是提一下。我们这里的目的是关注在同步已有的conferenceEvents表中的所有的事件,将这些事件转换为新的聚合根的事件,并持久化到新的事件表中。惊喜:写到这里突然发现,有着这个事件同步的任务,那上面这个Event Handler就可以不需要了哦,因为我本来计划这个同步任务只负责一次性同步所有的历史事件,而不同步后续新增的事件的。但是通过我刚才的描述,大家知道这个同步任务是会定时同步后续新增的事件的。这就意味着上面的Event Handler没有存在的必要了哦,大家觉得是不是呢?
  4. 经过2,3两步,我们确保了最后conferenceEvents表中的所有历史事件和不断新增的增量事件都会自动同步到新的事件表(conferenceSeatInventoryEvents)了。也就是我们上面的目标中的目标1;接下来我们开始思考如何生成库存聚合根的CQRS的读库数据的同步。库存聚合根的读库数据的更新比较简单,只需要设计另一个独立的事件扫描任务(Event Scan Task),该任务的职责是扫描conferenceSeatInventoryEvents表,扫描当前所有未扫描过的后续事件,并逐个更新到读库。我们需要在内存中记录当前扫描到哪条记录了(并需要定时把扫描进度保存起来,比如写到某个文件里,已应付这个扫描任务意外停止或者机器意外关机的情况,这样的话我们就可以从文件加载上次扫描的最后一个位置,从那个位置之后再继续扫描),然后每次从哪条记录之后继续扫描即可。一次可以扫描最多1000条记录,间隔10s扫描一次。这些我们可以自己配置。看过ENode事件表设计的朋友应该知道,ENode设计的事件表,有一个自增的Sequence字段,该字段设计的目的就是用于事件的增量扫描或同步的,呵呵。如果没有Sequence字段,我们就无法知道记录添加进表的全局顺序了。好了,通过这个扫描任务,我们也就可以通过异步的方式,最终确保conferenceSeatInventory聚合根的读库会更新了。
  5. 接下来我们思考如何处理队列中还未被消费的命令和事件。为什么要考虑这个问题?因为我们的要求是希望可以做到尽量不停机发布的。也就是说,这个数据的迁移以及切换的过程尽量对用户透明。而要在做切换时,我们必须确保新老聚合根的库存相关的事件以及读库的状态必须完全一致的。否则,当切换到新的聚合根时,由于老的聚合根中还有一些命令或事件还没被处理,那就会导致我们切换到新的聚合根后,用的是旧(过时)数据,这样就不对了。那怎么解决这个问题呢?由于前面第一步,我们已经把相关的topic独立了出来,有:ChangeConferenceInventoryCommandTopic、ConferenceInventoryChangedEventTopic这两个topic。那我们只要确保这两topic下的所有的消息都消费完成就行了。但是只要还有新的消息进入到这两个topic的queue,那就不可能有这个时候。幸好EQueue在设计之初,就考虑了这种情况,EQueue支持禁用某个Queue,禁用后,就不允许消息发送者往这个Queue中发消息了,但是这个Queue中的消息还可以允许被消费。所以,当我们想开始做切换前,可以先把这两个topic的所有Queue都禁用掉。然后就确保了不会有新的消息能发送成功。然后只需要等待短暂的几秒,等待这两个topic下的所有的消息都被消费完(可以通过查看EQueue管理控制台上队列的消费进度知道是否消息都消费完了),且上面所说的2,3,4三个步骤也都处理了所有的事件,那就意味着新老聚合根的所有库存相关的事件和读库数据都已经完全一致了。然后我们就可以进行切换了。
  6. 怎么切换呢?只需要把发送命令的源头进行切换即可。就是把原来发给conference聚合根的修改库存的命令,现在改为发送给conferenceSeatInventory聚合根的修改库存的命令。我们可以把所有可能会发送相关这些命令的服务器部署新代码然后重新发布一下即可。当然这个发布过程可能也需要几分钟时间。所以,大家可以看到,整个切换的过程,可能需要几分钟左右的时间。

总结

上面的思路我想了好几天,真是费了我不少的脑细胞。传统的面向DB的数据迁移方案,一般也有全量数据迁移和增量数据迁移,以及最后的切换操作。

而ENode在处理这种情况时,由于整个架构是EDA的架构,在数据迁移时我们正好可以利用EDA架构的优点(当什么事件发生时,外部可以被通知到,并且我们可以随时新增额外的Event Handler来做额外的事情)。上面的几步思路,就是利用了这种思路,从而可以做到对这个程序无侵入的前提下完成数据的迁移和系统的切换。

另外一点,不知道大家发现没,上面的数据切换其实也没想象中那么复杂,仔细想一下,我们只需要同步事件,并把同步后的事件再进一步更新读库,最后禁用消息队列,再等待消息队列中的消息都消费完成,最后切换即可。为什么呢?因为事件的不变性。我们的事件表里的记录是绝对不会变的,不像传统的DB的数据迁移方案,表中的每一行记录都有可能变化或者删除,这就会数据迁移带来很大的障碍。而ES的架构,因为事件记录的不变性,所以也让我们在进行数据迁移时,能够变得简单。而且,我们也合理的利用了CQRS架构的特性,即读库的更新是完全根据事件表的。所以,只要我们确保事件表都绝对完全同步完成了,那总能确保最后的读库也能同步完成。这点也是非常爽的。

最后我在思考的问题是,是否有可能把切换过程缩短到几秒呢?呵呵。应该是可以的,就是我们可以预先把所有要修改的代码先改好,然后通过某个配置项来决定当前要运行哪个代码,发送哪个命令。然后当需要切换时,我们只需要修改配置项即可。这个可以通过zookeeper等工具实现即可。这样整个切换过程就只需要短短几秒即可,对用户不会造成很大影响。而且如果发现切换过去有问题,还可以随时再切换回来。做到方案的可回滚性。

最后,大家可以再想想这个数据迁移和切换的方案,哪些地方可以被复用呢?