Actor 系统
每个Akka应用程序都必须创建一个名为Actorsystem的类。这个类代表Actor系统,其中包含了Actor对象的层次结构,该系统中的所有Actor对象都会使用同一套配置。下面的代码在本地JVM中创建了一个名为ReactiveEnterprise的ActorSystem对象:
import akka,actor._
val system=ActorSystem("ReactiveEnterprise")
要直接在user守护对象下方创建Actor对象,可通过system.actorof()方法使用Actorsystem对象,如下面的例子所示:
import akka.actor._
val system=ActorSystem("ReactiveEnterprise")
//创建Actor对象并获取该对象的ActorRef引用
val processManagersRef:ActorRef=system.actorOf(
Props[ProcessManagers],"processManagers")
//使用ActorRef 引用向Actor对象发送消息
processManagersRef!BrokerForLoan(banks)
val system=ActorSystem("ReactiveEnterprise")
val selection=system.actorselection("/user/processManagers")
selection! BrokerForLoans(banks)
actorselection()方法会返回Actorselection选择路径,而不会返回ActorRef引用。使用Actorselection对象可以向该路径指向的Actor对象发送消息。然而,请注意,与使用ActorRef引用的方式相比,通过这种方式发送消息的速度较慢并且会占用更多资源。但是,actorselection()方法仍旧是一个优秀的工具,因为它可以执行查询由通配符代表的多个Actor对象的操作,从而使你能够向Actorselection选择路径指向的任意个Actor对象广播消息。
val system=ActorSystem("ReactiveEnterprise")
wal selection=system.actorselection("/user/*")
selection!FlushAll()
实现Actor对象
Akka 框架中的所有Actor对象都必须扩展akka.actor.Actor特征。你编写的Actor 对象至少要支持receive代码块。
import akka.actor._
class Shoppingcart extends Actor{
def receive={
case _=>
}
}
Actor系统的生命周期示例:
import akka.actor._
class ShoppingCart extends Actor{
override def postRestart(reason: Throwable): Unit{
override def postStop(): Unit{
override def preRestart(
reason: Throwable, message: Option[ Any]): Unit {
override def prestart(): Unit {
def receive={
case _=>
}
}
每个Actor对象都拥有一个ActorContext对象,通过Actor对象中的context()方法可以获得该对象。Actorcontext对象为它的所有者对象提供了一种处理方式,使它的所有者能够以安全的、非破坏性的方式使用它的所有者包含的部分基础实现代码。这个例子使用ActorContext对象创建子Actor对象Task:
import akka.actor._
class TaskManager extends Actor{
def nextTaskName():String={
"task-"+...
def receive={
case RunTask(definition)=>
val task =context.actorof(Props[Task],nextTaskName)
task!Run(definition)
...
case TaskCompleted=>
...
}
消息通道
消息
消息中既含有系统级信息也含有应用程序级信息。消息由两个基本部分构成。
- 头部:供消息传输系统使用的信息。
- 主体:被传送的数据。
在使用Actor模型时,消息仅含有第二个部分,即主体。换言之,消息仅是在Actor对象间传递应用程序级数据,如图所示。如果存在供基础系统使用的任何头部信息,那么应用程序中的Actor对象不会看到这些信息。
管道和过滤器
消息路由器
管道和过滤器环境中通常会用到消息路由器。然而,消息路由器并不仅限于在这类环境中使用。各种消息路由器都有可能会被用到,这些路由器包括:有状态和无状态路由器、基于环境的和基于上下文的路由器、回环路由器和基于内容的路由器。路由器的常规功能是当收到消息时,路由器会检查消息的某个属性(或消息本身的某个状态)、环境的某个属性或上述所有这些元素,并通过符合技术或业务条件的消息通道分发当前的消息。换言之,消息路由器起的是分支机制的作用,与if-else代码块和分支语句非常相似,但使用独立的消息通道选择条件路径
消息译码器
使用消息译码器可以将消息中包含的数据转换为与本地环境兼容的数据,如图所示。在使用领域驱动的设计方式时,可以通过两种方式处理这种转换。
- 在使用端口和适配器架构(也称为六角形架构)时[DDD],可以使用应用程序边界的适配器[GoF]转换对应用程序的核心服务兼容参数影响较小的消息。也许该转换仅是将基于文本的消息转换为标量数据(API能够接受的合法参数)。在这种情况中,端口的适配器是一种通道适配器,而且本质上是一种适宜的译码器。
- 在处理较复杂的转换情况时,通道适配器应该使用反腐化层[DDD]。某些数据可能比较复杂,需要使用更积极的译码方式。这可能需要使用多个适配器和特殊的译码器,将大量的原始标量数据转换为本地领域中的模型化概念。
消息端点
在消息系统中,消息端点是指用于传输消息的消息通道两端的消息发送者和接收者。在使用Actor模型时,因为Actor对象既是消息的发送者也是消息的接收者,所以一般而言,Actor对象就是消息端点,如图所示。因此,基于Actor的系统中可能含有数千、数百万或数亿个消息端点。
点对点通道
在使用Actor模型时,所有Actor对象专用的消息通道都是点对点通道。这并不是说Actor模型不支持发布一订阅通道。订阅指定主题的消息/交换信息的Actor对象,能够通过点对点通道获得发布者发送的主题消息/交换信息。由此可见,使用点对点通道可以定义Actor模型的基础语义,如图5.1所示。与此类似,当Actor对象收到消息时,它还是一个由事件驱动的消费者。
发布一订阅通道
观察者模式和发布一订阅模式的基础设计原则,是将对事件消息感兴趣的对象与提供事件消息的对象隔离开。在观察者模式中,这两个角色被称为观察者和被观察者。在发布一订阅模式中,这两个角色被称为订阅者和发布者。实际上,在评估典型的消息传输中间件系统时,可以将持久的和非持久的发布一订阅通道都视为自带功能。Actor模型不是典型的消息传输系统,也不专门支持发布一订阅模式。Actor模型着重支持的是点对点通道,但使用Actor 模型提供的基础工具实现发布一订阅模式不会有任何问题。实际上,Akka框架确实提供了几个内置的发布一订阅工具。下图展示了使用Akka框架中的EventBus类创建发布一订阅通道的方式。
本地事件流
Akka框架内置的几个发布一订阅通道,其中之一使用了EventBus特征。
标准的EventBus实例称为事件流,通过本地Actor系统的eventstream属性可以获得该对象。这种标准的EventBus对象可以在本地JVM中使用,而且使你能够很轻松地注册订阅者和使发布者发布事件。
分布式发布一订阅通道
akka框架中的第二种内置发布一订阅通道,专门用于Akka集群。这种发布一订阅通道提供了跨集群节点,根据主题向订阅者发布消息的功能。这种通道还支持在发送者不知道接收者在集群中的具体位置的情况下,向集群中的单个Actor对象发送消息,以及向分布在多个节点中的一个Actor对象发送消息。这是一种经过修改的发布一订阅通道,支持通过低耦合度方式直接发送消息。
Akka框架提供了支持这种集群化发布一订阅功能的Actor类Distributed-PubSubMediator,如图所示。这个起中间人作用的Actor对象,必须在所有参与指定发布一订阅主题或发送者一接收者协作的节点上启动。尽管这个中间人Actor对象能够通过多个独立的发布者和接收者逻辑分组,支持任意数量的主题,但最好为每个发布一订阅逻辑分组启动专用的DistributedPubSubMediator对象。例如,你可以为bids主题启动一个中间人Actor对象,为so1d主题启动另一个中间人Actor对象,每个中间人Actor对象专门负责一组订阅者。
数据类型通道
当接收者应用程序需要在不检验消息内容的情况下,就必须了解收到消息的数据类型时,可使用数据类型通道。可根据不同的数据类型通道,创建不同类型的Actor对象。数据类型通道既具有消息通道的特点,又具有消息端点的特点。
非法消息通道
在消息通道中传输的消息只能是接收者能够处理的消息。换言之,消息通道应具有数据类型通道的特点。在使用Actor模型时,为指定Actor类型传输消息的数据类型通道可以接收多种类型的消息。所有发送给Actor对象的消息都必须遵守Actor对象的协定。你可以使用非法消息通道(如图所示),处理不符合协定的消息。
死信通道通常用于为无法送达的消息提供路由,而非法消息通道用于为接收者无法处理的消息提供路由。即便如此,在使用Akka框架时,也可以将非法消息发送给死信通道。
死信通道
当消息系统无法将消息送达接收者时,可以选择将消息发送给死信通道。实际上这也是Akka框架处理死信消息的方式
确保送达机制
当需要确保将指定的消息送达接收者时,应使用确保送达机制。通过Akka框架使用这种模式,可以确保至少将消息向其接收者发送一次。要做到这一点,需要将消息存储到消息存储器中,然后定期发送该消息直到收到用于确定该消息被送达的回执(也会被存储在消息存储器中)为止。除非特意删除某条消息,否则所有消息都会永久地被保存。
消息桥
在普通企业中,会存在各种架构机制(包括各种数据库和消息系统)产生的数据负荷。如果将两个消息系统无法协同操作的应用程序整合到一起,会出现怎样的情况呢?是否应该使用其他方式整合它们,如使用文件或数据库?如果使用企业集成模式[EIP]能够将两个应用程序整合到一起,那么如何在这两个应用程序各自使用自己熟悉的消息传输机制的情况下,整合它们的消息传输功能?使用消息桥可以解决这些问题。
消息总线
企业中含有许多独立的业务系统,这些业务系统包括从购买日用品的应用程序,到为了提高竞争力用于帮助整合商务伙伴的自定义应用程序的各种系统。尽管这些系统在不同的平台上运行并且拥有各种服务接口,而且每组服务接口都有专用的数据模型,但你仍然需要使所有这些系统一起协同工作。有时通过创建实现简单的面向服务架构的消息总线,可以获得最佳效果(如图所示)。这种消息总线必须将各个已整合的应用程序中的所有服务接口联到一起,并且使这些服务接口都使用相同的规范化消息模型。
网友评论