Command模型
在基于CQRS的应用程序中,领域模型(如Eric Evans和Martin Fowler所定义的)可以是一个非常强大的机制,用于处理状态更改验证和执行过程中涉及的复杂性。虽然典型的领域模型有大量的构建块,但是其中一个在应用于CQRS中的命令处理时扮演主导角色:聚合。应用程序中对状态更改的命令以Command开头。
命令描述了你想要做什么以及基于该意图采取行动所需的信息。命令模型用于处理传入的命令,以验证它并处理结果。在这个模型中,一个Command Handler负责处理某种类型的命令,并根据其中包含的信息采取行动。
聚合
聚合是始终保持一致状态的实体或实体组。聚集根是负责维护此一致状态的聚合树顶部的对象。这使得Aggregate成为在任何基于CQRS的应用程序中实现命令模型的主要构建模块。
注意:术语“集合”是指Evans在领域驱动设计中定义的集合:“一组关联对象,作为数据更改的单元。 外部引用仅限于Aggregate的一个成员——它被指定为聚合根, 并且,聚合还包含了一组一致性规则被应用于其内部。“
例如,“联系人”聚合可以包含两个实体:联系人和地址。为了保持整个聚合状态一致,向联系人添加地址应通过联系人实体完成。在这种情况下,联系人实体是指定的聚合根。在Axon中,聚合由一个聚合标识来标识。聚合可以是任何对象,但是对于标识对象本身需要有几条准则,
- 它必须实现equals和hashCode以确保与其他实例进行唯一性区分;
- 实现一个提供一致结果的toString()方法(相同的标识符应该提供一个相等的toString()结果)和
- 是可序列化的。
测试组件(请参阅测试章节)将验证这些条件来验证聚合是否使用不兼容的标识符。 String、UUID和数字类型的标识符总是合适的。不要使用原始类型作为标识符,因为它们不允许延迟初始化,并且在某些情况下,Axon可能会错误地将原始类型的默认值假定为标识符的值。
使用随机生成的标识符被认为是一个好习惯。不应该按顺序生成标识符,因为使用顺序标识符会大大降低应用程序的可伸缩性,使用UUID会使得发生冲突的机会非常小。使用具有业务含义的数据作为标识符时要小心,他们有随业务变化而变化的趋势,导致您很难相应地调整你的应用程序。
聚合实现
一个聚合总是通过一个称为聚合根的实体来访问。通常,该实体的名称与聚合的名称完全相同。例如,一个订单集合可以由一个订单实体组成,该实体引用多个订单行实体。订单和订单一起,形成聚合。
聚集是一个常规的对象,它包含改变状态的状态和方法,虽然根据CQRS原则不完全正确。也可以通过读取方法获取聚合的状态。
聚合根必须声明包含聚合标识符的字段。该标识符必须最迟在第一个事件发布前被初始化。该标识符字段必须由@AggregateIdentifier批注注释。如果您使用JPA并在聚合上使用JPA批注,则Axon也可以使用JPA提供的@Id注解。
聚集可以使用AggregateLifecycle.apply()方法来注册要发布的事件。与EventBus不同的是,EventBus的消息内容需要包装在EventMessage中,而apply()允许您直接传递消息内容。
import static org.axonframework.commandhandling.model.AggregateLifecycle.apply; @Entity // Mark this aggregate as a JPA Entity public class MyAggregate{ @Id //When annotating with JPA @Id, the @AggregateIdentifier annotation is not necessary private Stringid; // fields containing state... @CommandHandler public MyAggregate(CreateMyAggregateCommand command){// ... update state apply(newMyAggregateCreatedEvent(...)); } // constructor needed by JPA protected MyAggregate(){ } }
通过定义@EventHandler批注的方法,Aggregate中的实体可以监听Aggregate发布的事件。 这些方法将在EventMessage发布时被调用(在任何外部处理程序发布之前)
事件源聚合
除了存储Aggregate的当前状态之外,还可以根据它过去发布的Events来重建Aggregate的状态。为此,所有的状态改变必须由一个Event来表示。
总的来说,事件源集合类似于“常规”的集合:它们必须声明一个标识符并且可以使用apply方法来发布事件。但是,事件源集合中的状态更改(即字段值的任何更改)必须在@EventSourcingHandler注解的专门方法中执行。这包括设置聚合标识符。
请注意,聚合标识符必须在聚合发布的第一个事件的@EventSourcingHandler中设置。这通常是创建事件。
Event Sourced 聚合的聚合根源也必须包含无参数构造函数。 Axon Framework在通过事件初始化它之前会使用此构造函数创建一个空的Aggregate实例。加载聚合时,未能提供此构造函数将导致异常。
public class MyAggregateRoot{ @AggregateIdentifier private StringaggregateIdentifier; // fields containing state... @CommandHandler public MyAggregateRoot(CreateMyAggregate cmd){ apply(newMyAggregateCreatedEvent(cmd.getId())); } // constructor needed forreconstruction protected MyAggregateRoot(){ } @EventSourcingHandler private void handleMyAggregateCreatedEvent(MyAggregateCreatedEvent event){
// make sure identifier is alwaysinitialized properly
this.aggregateIdentifier = event.getMyAggregateIdentifier();
// ... update state
}}
@EventSourcingHandler注解的方法使用特定的规则进行处理,这些规则对于@EventHandler注解的方法是相同的,并且在“注解事件处理器”.章节中有详细的解释。
注意事件处理程序方法可以是私有的,只要JVM的安全设置允许Axon框架更改方法的可访问性即可。这使您可以清楚地将聚合的public方法(暴露该方法用于生成领域事件)与处理事件的内部逻辑的private方法分开。如果具有特定注释的private方法出现“未使用私有方法”警告,大多数IDE都可以选择忽略。如果不会忽略,您可以添加一个@SuppressWarnings(“UnusedDeclaration”)注释来确保你不会意外删除这些私有方法。
由于在通过事件的回放来重建聚合状态时,也会调用事件处理程序方法,因此必须采取特殊的预防措施避免这些回放的事件被不该处理的程序处理,例如对客户的消息通知。
可以在Event Sourcing Handler的方法内部调用apply()来发布新事件。这使得实体B可以通过apply一个事件来与实体A交互完成某事。 重放历史事件时,Axon将暂时忽略apply()中的事件,这些事件将仅仅在所有实体都接收到第一个事件(重放事件)之后才会被发布给它们。如果需要发布更多事件,根据当前apply事件后的实体状态,可以使用 apply(...).andThenApply(...);
你也可以使用静态的AggregateLifecycle.isLive()方法来检查聚合是否是“活的”。基本上,如果聚合完成重放历史事件,则认为聚合是“活的”。在重播这些事件时,isLive()将返回false。使用这个isLive()方法,您可以执行只在非重放的事件中完成的活动。
复杂的聚合结构
复杂的业务逻辑通常需要的不仅仅是聚合根可以提供的聚合。在这种情况下,将复杂性分散到聚合中的许多实体是很重要的。在使用event sourcing时,不仅聚合根需要使用事件来触发状态转换,而且聚合中的每个实体也是如此。
注意,一个关于聚合不应该暴露状态的常见误解是:聚合中的任何实体都不应该含有属性访问方法,不是的。实际上,如果聚合中的实体向同一聚合中的其他实体暴露状态,则聚合可能会受益匪浅。但是,建议不要把状态暴露到聚合之外。
Axon为复杂聚合结构中的event sourcing提供支持。声明子实体的字段必须使用@AggregateMember进行注释。 此注释告诉Axon注释的字段包含应该检查命令和事件处理程序的类。当一个实体(包括聚集根)应用一个事件时,它首先由聚合根处理,然后通过所有@AggregateMember注释字段向下传递到其子实体。
可能包含子实体的字段必须使用@AggregateMember进行注释。 此注释可用于多种字段类型:
- 实体类型,在字段中直接引用;
- 集合类型(包含所有集合,如Set,List等);
- java.util.Map类型;
处理聚合中的命令
建议直接在包含命令所要处理状态的聚合中定义命令处理程序,因为命令处理程序需要该聚合的状态来完成其工作。
要在聚合中定义命令处理程序,只需使用@CommandHandler注释命令处理方法即可。@CommandHandler批注方法的规则与任何处理程序方法相同。 但是,命令不仅可以根据其有效负载进行路由,还可以通过消息的名称,该名称默认为Command对象的全限定类名称。
默认情况下,@ CommandHandler注释的方法允许使用以下参数类型:
- 第一个参数是命令消息的有效载荷。它可以是Message或CommandMessage类型,如果@CommandHandler注解没有明确定义处理程序可以处理的命令的名称,则默认情况下,命令名称是命令有效负载的完全限定类名称。
- 使用@MetaDataValue注解的参数将使用注解中给的value作为键去查找消息元数据中的值。如果对这个元数据并非一定要存在(默认),则当不存在时返回null。如果必须要存在,则命令的解析器不会把命令匹配到这个处理方法,以防止在元数据值不存在时调用该方法。
- MetaData类型的参数将注入一个CommandMessage的整个MetaData。
- UnitOfWork类型的参数获取当前注入的工作单元。这允许命令处理程序对要在工作单元的特定阶段执行的操作进行注册,或获取对其注册的资源的访问。
- 类型为Message或CommandMessage的参数将获得完整的消息,同时包含有效内容和元数据。如果一个方法需要多个元数据字段或消息封装的其他属性,这很有用。
为了让Axon知道Aggregate类型的哪个实例应该处理Command消息,必须使用@TargetAggregateIdentifier来注释Command对象中携带AggregateIdentifier的属性。 注释可以放在字段或访问器方法(例如getter)上。
创建Aggregate实例的命令不需要标识目标集合标识符,但建议也在其上注解Aggregate标识符。如果您希望使用其他机制来路由命令,则可以通过提供自定义行为来覆盖CommandTargetResolver的行为。 该类应根据给定的命令返回聚合标识符和预期版本(如果有)。
注意:当@CommandHandler注释放置在一个Aggregate的构造函数中时,相应的命令将创建该聚合的一个新实例并将其添加到存储库。这些命令不需要定位特定的聚合实例。因此,这些命令不需要任何@TargetAggregateIdentifier或@TargetAggregateVersion注释,也不会为这些命令调用自定义的CommandTargetResolver。当一个命令创建一个聚合实例时,该命令的回调将在该命令成功执行时收到聚合标识符。
import static org.axonframework.commandhandling.model.AggregateLifecycle.apply; public class MyAggregate{ @AggregateIdentifier private String id; @CommandHandler public MyAggregate(CreateMyAggregateCommand command){ apply(newMyAggregateCreatedEvent(IdentifierFactory.getInstance().generateIden tifier())); } // no-arg constructor for Axon MyAggregate() { } @CommandHandler public void doSomething(DoSomethingCommand command){ // do something... } // code omittedfor brevity. The event handler for MyAggregateCreatedEvent must se t the id field } public class DoSomethingCommand{ @TargetAggregateIdentifier private StringaggregateId; // code omitted for brevity }
Axon配置API可用于配置Aggregate。例如:
Configurer configurer = ... // to use defaults: configurer.configureAggreate(MyAggregate.class); // allowing customizations: configurer.configureAggregate( AggregateConfigurer.defaultConfiguration(MyAggregate.class).configureCommandTargetResolver(c -> new CustomCommandTargetResolver()) );
@CommandHandler注释不限于聚合根。 将所有命令处理程序放在根中有时会导致聚合根上的大量方法,而其中许多方法只是将调用转发给其中一个基础实体。 如果是这种情况,您可以将@CommandHandler注释放在其中一个底层实体的方法中。 为了让Axon找到这些带注释的方法,在聚合根中声明实体的字段必须用@AggregateMember标记。 请注意,注释@AggregateMember的字段会被用于对CommandHandlers的查找。 如果传入命令时该字段值为空,则会引发异常。
public class MyAggregate{ @AggregateIdentifier privateString id; @AggregateMember privateMyEntity entity; @CommandHandler public MyAggregate(CreateMyAggregateCommand command){apply(new MyAggregateCreatedEvent(...); } // no-arg constructor for Axon MyAggregate() { } @CommandHandler public void doSomething(DoSomethingCommand command){ // do something... } // code omitted for brevity. The event handler forMyAggregateCreatedEvent must se t the id field // and somewhere in the lifecycle, a value for "entity"must be assigned to be abl eto accept // DoSomethingInEntityCommandcommands. } public class MyEntity{ @CommandHandler public void handleSomeCommand(DoSomethingInEntityCommandcommand){ // do something } }
请注意,每个命令在聚合中必须只有一个处理程序。这意味着你不能使用@CommandHandler注解多个实体(无论是否聚合根)处理相同的命令类型。如果您需要有条件地将命令路由到实体,则这些实体的父级应处理该命令,并根据所应用的条件转发该命令。该字段的运行时类型不必完全是声明的类型。 但是,只有@AggregateMember所注释字段的声明类型才被用于查找其内部的@CommandHandler方法。
也可以使用@AggregateMember注解包含实体的Collection和Map。在后一种情况下,Map的value应包含实体,而key包含一个引用值。
由于需要将命令路由到正确的实例,因此必须正确标识这些实例。他们的“id”字段必须用@EntityId注释。
命令中有一个属性用于查找应该路由哪个实体,它的值默认为注释字段的名称。例如,当注释字段“myEntityId”时,该命令必须定义具有相同名称的属性。这意味着必须存在getMyEntityId或myEntityId()方法。如果字段的名称和路由属性不同,则可以使用@EntityId(routingKey =“customRoutingProperty”)显式提供一个值。
如果在带注释的集合或映射中找不到实体,则Axon会抛出IllegalStateException;显然,聚合在该此时无法处理该命令。
注意:Collection或Map类型的字段声明应包含适当的泛型,以允许Axon识别Collection或Map中包含的实体的类型。 如果无法在声明中添加泛型(例如因为您正在使用已定义泛型类型的自定义实现),则必须在@AggregateMember注释中指定entityType属性中使用的实体类型。
外部命令处理程序
在某些情况下,不可能或不希望将命令直接路由到聚合实例。 在这种情况下,可以注册一个Command Handler对象。
Command Handler对象是一个简单的(常规)对象,具有@CommandHandler注释的方法。 与Aggregate的情况不同,Command Handler对象只有一个实例,它处理它在其方法中声明的所有类型的命令。
public class MyAnnotatedHandler{ @CommandHandler public void handleSomeCommand(SomeCommandcommand, @MetaDataValue("userId")StringuserId) { // whatever logic here } @CommandHandler(commandName = "myCustomCommand") public void handleCustomCommand(SomeCommand command){ // handling logic here } }把注解的处理器注册到命令总线中:
Configurer configurer = ... configurer.registerCommandHandler(c-> new MyAnnotatedHandler());
从命令处理程序返回结果
在某些情况下,调度Command的组件需要有关Command的处理结果的信息。 命令处理程序方法可以返回一个值。 该值将作为命令的结果提供给发送者。一个例外是Aggregate构造函数上的@CommandHandler。 在这种情况下,不是返回方法的返回值(它是Aggregate本身),而是返回@ AggregateIdentifier注解字段的值;
注意:虽然可以从命令返回结果,但应该少用。 命令的意图不应该是获取值,因为这将表明该消息应该被设计为查询消息。 命令返回结果的典型情况是新建的实体的标识符。