当处理到来的消息流时,我们需要一个actor来引导消息路由到目标actor,从而提高消息的分配效率。在Akka中这个 actor就是Router。它所管理的一些目标actor叫做routees
根据不同的情况需要,Akka提供了几种路由策略。当然也可以创建自己的路由及策略。
一个简单的Router
如何使用Router 还有如何管理routees。
Akka提供的路由策略如下:(详细见下文路由的使用)
akka.routing.RoundRobinRoutingLogic 轮询
akka.routing.RandomRoutingLogic 随机
akka.routing.SmallestMailboxRoutingLogic 空闲
akka.routing.BroadcastRoutingLogic 广播
akka.routing.ScatterGatherFirstCompletedRoutingLogic 分散聚集
akka.routing.TailChoppingRoutingLogic 尾部断续
akka.routing.ConsistentHashingRoutingLogic 一致性哈希
创建Router Actor
创建router actor 有两种方式:
1. Pool(池)——routees都是router 的子actor,如果routees终止,router将把它们移除
2.Group(群组)——routees都创建在router的外部,router通过使用actor来选择将消息发送到指定路径,但不监管routees是否终止。
Router actor 向 routees 发送消息,与向普通actor发送消息一样通过其ActorRef。Router actor 不会改变消息的发送人,routees 回复消息时发送回原始发件人,而不是Router actor。
Pool(池)
可以通过配置并使用代码在配置中获取的方法来实现 (例如创建一个轮询Router向5个routees发送消息)
也可以只使用编程的方式来实现
远程部署Router
既可以创建本地actor来作为Router,也可以命令Router在任一远程主机上部署子actor。
需要将路由配置放在RemoteRouterConfig下,在远程部署的路径类中要添加akka-remote模块
发送者
默认情况下,当一个routee发送消息,它隐式地设置自己为发送者。
sender() ! x // replies will go to this actor
但是通常将routee的父Router作为发送者更有用处。这样可以将routees的信息隐藏起来。
sender().tell("reply", context.parent) // replies will go back to parent
sender().!("reply")(context.parent) // alternative syntax (beware of the parens!)
监管
在Pool中,routees是Router 的子actor,所以Router也负责监管。
可以使用Pool 的supervisorStrategy属性来配置监管策略。如果没有提供配置,默认策略是“always escalate”(总是上升)。就是错误都会上传到Router的监管者进行处理。但是Router的监管者会认为是Router本身发生错误,因此会重启Router,所有的routees也将被重启,这样导致监管的效率变低。所以我们要指定监管策略。
Group(群组)
有时我们需要单独的创建routees,然后提供一个Router来供其使用。可以通过将routees的路径传递给Router的配置,消息将通过ActorSelection来发送到这些路径。
同样Group像Pool一样有两种方法创建。(为三个routee actor创建Router)
编程方式实现:
路由的使用
(具体配置见Routing)
1.RoundRobinPool 和 RoundRobinGroup
Router对routees使用轮询机制
2.RandomPool 和 RandomGroup
Router随机选择routees发送消息
3.BalancingPool
尝试从繁忙的routee重新分配任务到空闲routee,所有的routee共享一个mailbox
4.SmallestMailboxPool
Router创建的所有routees中谁邮箱中的消息最少发给谁
5.BroadcastPool 和 BroadcastGroup
广播的路由器将接收到的消息转发到它所有的routee。
6.ScatterGatherFirstCompletedPool 和 ScatterGatherFirstCompletedGroup
将消息发送给所有的routees,然后等待到收到第一个回复,将结果发送回原始发送者。其他的回复将被丢弃
7.TailChoppingPool 和 TailChoppingGroup
将首先发送消息到一个随机挑取的routee,短暂的延迟后发给第二个routee(从剩余的routee中随机挑选),以此类推。它等待第一个答复,并将它转回给原始发送者。其他答复将被丢弃。
此Router的目标是通过查询到多个routee来减少延迟,假设其他的actor可能比第一个actor更快响应。
8.ConsistentHashingPool 和 ConsistentHashingGroup
对消息使用一致性哈希(consistent hashing)选择routee
有三种方式定义哪些数据作为一致性哈希键
· 你可以定义路由的hashMapping,将传入的消息映射到它们一致哈希键。这使决策对发送者透明。
· 这些消息可能会实现akka.routing.ConsistentHashingRouter.ConsistentHashable。键是消息的一部分,并很方便地与消息定义一起定义。
· 消息可以被包装在一个akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope中,来定义哪些数据可以用来做一致性哈希。发送者知道要使用的键。
特殊处理的消息
Broadcast消息
用于向Router所有的routee发送一条消息,不管该Router通常是如何路由消息的。
PoisonPill消息
无论哪个actor收到PosionPill消息都会被停止。但是对于PoisonPill消息Router不会将其传给routees。但仍然能影响到routees,因为Router停止时它的子actor也会停止,就可能会造成消息未处理。因此我们可以将PoisonPill包装到Broadcast消息中。这样Router所管理的所有routees将会处理完消息后再处理PoisonPill并停止。
Kill消息
当Kill消息被发送到Router,Router将内部处理该消息,并且不会将它发送到其routee。Router将抛出ActorKilledException并失败,然后Router根据监管的策略,被恢复、重启或终止。
Router的子routee也将被暂停,也受Router监管的影响,但是独立在Router外部创建的routee将不会被影响。
Managagement消息
发送akka.routing.GetRoutees到一个Router actor,使其回送一个包含当前使用routee的akka.routing.Routees消息。
发送akka.routing.AddRoutee到一个Router actor会将那个routee添加到其routee集合中。
发送akka.routing.RemoveRoutee到一个Router actor将从其routee集合删除该routee。
发送akka.routing.AdjustPoolSize到一个Pool (Router actor)将从其routee集合中添加或删除该数目的routee。
动态改变大小的池
大多数Pool的routees数量是固定的,但是可以制定调整策略来动态改变routees的数量
两种调整方式(resizer): 默认的Resizer和OptimalSizeExploringResizer
Resizer:是根据压力的大小来调整的,繁忙的routees 占总数的百分比高于或低于某个阈值就会进行调整
OptimalSizeExploringResizer :将池的大小调整为最佳大小,来提供最多的消息吞吐量。
它跟踪每个Pool 的消息吞吐量来定期执行以下三个调整操作。
· 如果在一段时间内没有看到所有routees被充分利用的,就缩小规模。
· 随机探索附近的池大小,以尝试收集吞吐量指标。
· 对附近的池大小进行优化(比其他任何附近池的)吞吐量指标更好。
Akka中的路由是如何设计的
从表面看,Router就像普通的actor,但是它们实际实现是不同的。路由器在收消息和快速发消息给routee被设计的极度优化。
Router可以通过优化原有消息处理pipeline来支持多线程,从而达到更高的吞吐量。通过将路由策略直接嵌入到其ActorRef ,发送到路由器ActorRef 的消息可以直接被路由到routees,直接跳过了单线程的Router actor。
自定义Router
在你创建自己的Router之前,要考虑到普通的actor性能不如Router actor高,因此你的程序可以接受较低的消息吞吐量。
假设你想获得最大的性能,请参考
配置调度器
为子Pool创建调度器将从Props中取,在调度器中有描述
为了可以很容易地定义池中routee的调度器,你可以在配置的部署一节中定义内联调度器。
启用一个池专用调度器,你唯一需要做的Router的“头”,不能总在相同的调度器上运行,因为它处理各种各样不同类型的消息,因此这个“头”是个特殊的actor,不使用Props验证的调度器。但是在RouterConfig中的 routerDispatcher 是标准Router actor默认调度器,可以在Router 的构造器和工厂方法中配置。 即使是自定义的Router也必须要实现默认调度器。
网友评论