Messaging concepts

作者: water_lang | 来源:发表于2018-01-22 11:31 被阅读31次

    Axon的核心概念之一就是通讯。组件之间的所有通信都使用消息对象完成。这为这些组件提供了位置透明性,以便在必要的时候能够进行扩展和分发这些组件。

    尽管所有这些消息都实现了Message接口,但不同类型的消息及他们的处理方式有很大的区别。

    所有消息都包含有payload(主体),元数据和唯一标识符。消息里的payload就代表消息要做什么事的具体描述。这个对象的类名和它携带的数据的组合描述了消息在应用程序中的含义。元数据可以记录这个消息的上下文(注:比如他是从哪里发送的,发送时间等)。例如,您可以存储跟踪信息,以便跟踪消息的来源或原因。您还可以存储信息来表示正在执行命令的安全上下文。

    注意:所有的消息都是不可变的。在Message中存储数据实际上意味着根据前一个Message创建一个新的Message,并添加额外的信息。这保证了消息在多线程和分布式环境中的使用是线程安全的。

    Commands

    Commands的意图就是为了改变应用程序的状态。它们其实是一个(最好设置为只读)POJO,只不过他们是用CommandMessage的一个实现来进包装的。

    Commands总是只有一个目的地。尽管发送者并不关心哪个组件会处理它或者组件所在的位置,但是他可能会对最后的结果感兴趣。这就是为什么通过命令总线发送的命令消息允许返回结果的原因。

    Events

    Events描述应用程序中发生的事件的对象。事件的通常来源是聚合。当Aggregate(聚合)内发生了一些重要的事,就会产生事件。在Axon框架中,事件可以是任何对象。强烈建议您确保所有的事件都是可序列化的。

    当事件被分发出去后,Axon会将其封装成一个EventMessage。使用的消息的实际类型取决于事件的来源。当Aggregate聚合产生一个事件,Axon会将其封装成一个DomainEventMessage(他继承于EventMessage)。所有其他事件都会被封装成一个EventMessage。除了常见的消息属性,如唯一标识符,EventMessage还包含一个时间戳。DomainEventMessage额外包含了产生该事件的聚合的类型及标识符。它还包含聚集事件流中事件的序列号,它允许重现事件的顺序。此外它还包含聚合产生的事件流中事件的序列号,以便可以进行事件溯源(也就是事件是有序的)。

    注意:

    尽管DomainEventMessage包含了对Aggregate Identifier(聚合唯一标识)的引用,但是你也应始终在实际的Event中包含这个标识符。EventStore使用DomainEventMessage的Identifier来存储这些事件。

    原始的Event对象存储的EventMessage的Payload。并且,可以将payload外的其他信息放进Event Message的Meta Data(元数据)中。元数据的目的是存储有关事件的附加信息,而不是业务信息。审计信息就是一个典型的例子。它允许您查看在哪些情况下引发了事件,比如是哪个用户帐户处理的,或者处理该事件的机器的名称。

    注意:

    一般来说,您不应该根据事件消息的元数据中的信息来做出业务决策。如果是这样的话,你可能会有附加的信息,那么他应该成为事件本身的一部分。元数据通常用于报告,审计和跟踪。

    虽然不是强制的,但是最佳实践的做法是将领域事件定义成不可变的(注:也就是只提供getter方法),事件里的所有字段都是通过构造函数的方式进行初始化。如果构造方法比较复杂(注:比如字段太多什么的),可以考虑使用Builder模式来做。

    注意:

    尽管领域事件从在技术上来说是代表聚合状态的变化,但你也应该尝试在事件中捕捉到状态的意图。一个比较好的方法就是抽象出一个上层的事件抽象类,他代表事情已经发生的这种事实。并且使用该抽象类的一个具体的子实现来表示这个改变的意图。例如,你有一个抽象类AddressChangedEvent代表已经发生的事实,以及两个实现ContactMovedEvent和AddressCorrectedEvent的代表状态改变的意图。一些监听器不关心这些意图(例如,数据库更新事件监听器),将监听抽象类型。而其他监听器则刚关心的这些意图,它们将监听具体的子类(例如发送一个地址变更确认电子邮件给消费者)。

    下图的虚线上面代表状态改变的事实,虚线的下面代表状态要改变的意图

    图片.png

    当你发布一个Event到EventBus(事件总线)上时,你须要将其包装成一个Event Message。GenericEventMessage类可以帮助你完成这个梦想。您可以使用他的构造函数或静态的asEventMessage()方法来完成。后者会检查给定的参数是否已经实现了消息接口。如果是这样,它要么直接返回(如果它实现了EventMessage),要么返回一个使用给定消息的payload和元数据来实例化的GenericEventMessage。如果一个事件是被Axon框架中的Aggregate发布的,Axon会自动把该事件包装在一个DomainEventMessage中并且包含聚合的标识符,类型和序列号。

    查询

    查询描述了对信息或状态的请求。一个查询可以有多个handler。当发布查询时,客户端会表明他是想要一个handler还是来自所有可用的handler的结果。

    工作单元

    Unit of Work(工作单元)是Axon框架中的一个重要概念,尽管在大多数情况下,您不可能直接与他直行交互. 消息的处理过程被视为一个单元。工作单元的目的是协调消息处理(命令,事件或查询)期间的操作。组件可以在工作单元注册每个阶段要执行的操作,例如onPrepareCommit或onCleanup。

    您不太可能需要直接访问工作单元。它主要由Axon提供的构建块使用。如果您确实需要访问它,无论出于何种原因,有几种方法可以获得它。Handler可以在处理函数中通过一个参数来接收工作单元。如果你使用注解的话,你可以将一个UnitOfWork注解参数添加到你的方法上面。如果你想在其他地方使用他,可以通过调用CurrentUnitOfWork.get()来获取绑定到当前线程的工作单元。请注意,如果没有工作单元绑定到当前线程,则此方法将引发异常。你可以使用CurrentUnitOfWork.isStarted()来检查他是否可用。

    要求访问当前工作单元的一个原因是在消息处理过程中附加需要多次重用的资源,或者在工作单元完成时需要清理创建的资源。在这种情况下,unitOfWork.getOrComputeResource()和onRollback()及afterCommit()、onCleanup()等生命周期回调方法允许您注册资源并声明在处理本工作单元期间要执行的操作。

    注意:须注意,工作单元只是一个变化的缓冲区,而不是能事务的替代品。虽然当工作单元在committed时,代表工作单元内所有的阶段更改已经committed,但是这种行为不是原子性的操作。这意味着,当一个提交失败时,一些已经发生状态改变的已经被持久化了,而另一些则没有(注:就导致数据不一致)。最佳实践的方式是一个Command不应包含多个操作。如果你坚持这种做法,一个工作单元只包含一个单一的行为,从而将使数据状态最终处于一致性。如果您在工作单元中有多个行为操作,那么您可以考虑将事务附加到工作单元的提交。使用unitOfWork.onCommit(..)方法来注册工作单元commit时需要采取的操作。

    在处理消息时,您的处理程序(handler)可能会抛出异常. 默认情况下,未经检查的异常将导致UnitOfWork回滚所有的更改。这样就不会有状态不一致的行为产生。

    Axon提供了一些开箱即用的回滚策略:

    RollbackConfigurationType.NEVER 无论发生什么,都将始终提交工作单元,

    RollbackConfigurationType.ANY_THROWABLE只要发生异常就进行回滚,

    RollbackConfigurationType.UNCHECKED_EXCEPTIONS将在Errors和Runtime Exception(运行时异常)上回滚

    RollbackConfigurationType.RUNTIME_EXCEPTION,只在Runtime Exceptions运行时异常(但不是在Errors上)上回滚

    使用Axon组件处理消息时,工作单元的生命周期将自动为您管理。如果您选择不使用这些组件,而是实现自己的处理,则需要以编程方式启动并提交(或回滚)工作单元。

    在大多数情况下,DefaultUnitOfWork将为您提供所需的功能。他希望是在单线程中处理整个过程。如果要在工作单元执行一个任务,只需在一个新的DefaultUnitOfWork实例上调用UnitOfWork.execute(Runnable)或UnitOfWork.executeWithResult(Callable)即可。工作单元将在任务完成时启动并提交,如果任务失败则回滚。如果您需要更多控制,您也可以选择手动启动,提交或回滚工作单元。

    通常用法如下:

    UnitOfWork uow = DefaultUnitOfWork.startAndGet(message);

    // then, either use the autocommit approach:

    uow.executeWithResult(() -> ... logic here);

    // or manually commit or rollback:

    try {

    // business logic comes here

    uow.commit();

    } catch (Exception e) {

    uow.rollback(e);

    // maybe rethrow...

    }

    一个工作单元了解各个阶段。每当他从一阶段转变到另一个阶段时,都会通知UnitOfWork Listeners(UnitOfWork监听器)。

    • 激活阶段:工作单元通常在此阶段通过当前线程(通过CurrentUnitOfWork.set(UnitOfWork))注册。随后,消息通常在此阶段由消息handler处理。
    • 提交阶段:在完成消息处理之后但在提交工作单元之前,会调用onPrepareCommit监听器。如果工作单元绑定了事务,则会调用onCommitt监听器来提交事务。当提交成功时,调用afterCommit监听器。如果提交中有任何步骤失败,则会调用onRollbackt监听器。消息处理结果包含在工作单元的ExecutionResult中。
    • 清理阶段:在这个阶段,该工作单元(如锁)将所持有的任何资源进行释放。如果有多个工作单元且是嵌套的话,那么清理阶段将被推迟到外部的单元工作都准备好清理为止。

    消息处理过程可以被认为是一个原子性的;要么完全处理,要么根本不处理。 Axon Framework使用Unit Of Work来跟踪消息处理程序执行的操作。处理程序完成后,Axon将尝试执行在Unit of Work中注册的操作。

    你可以将一个事务绑定到工作单元上。许多组件(如CommandBus和QueryBus实现以及所有异步处理事件处理器)都允许您配置事务管理器。该事务管理器将被用于创建事务,以绑定到用于管理消息处理的工作单元。

    当应用程序组件在消息处理的不同阶段需要资源时(比如,数据库连接或实体管理器),这些资源能被附加到工作单元中。unitOfWork.getResources()函数允许你访问附加到当前工作单元中的资源。unitOfWork.getResources()函数允许你访问附加到当前工作单元中的资源。

    当嵌套的工作单元需要访问资源时,建议将其注册到工作单元的根目录,可以使用unitOfWork.root()进行访问。如果一个工作单位是根源,它就会简单地返回自己。

    相关文章

      网友评论

        本文标题:Messaging concepts

        本文链接:https://www.haomeiwen.com/subject/ghklaxtx.html