美文网首页Akka
Akka 2.5.11 路由 Routing

Akka 2.5.11 路由 Routing

作者: mango_knight | 来源:发表于2018-03-14 10:42 被阅读0次

    当处理到来的消息流时,我们需要一个actor来引导消息路由到目标actor,从而提高消息的分配效率。在Akka中这个 actor就是Router它所管理的一些目标actor叫做routees

    根据不同的情况需要,Akka提供了几种路由策略。当然也可以创建自己的路由及策略。

    一个简单的Router

    如何使用Router 还有如何管理routees。

    Akka提供的路由策略如下:(详细见下文路由的使用)

    akka.routing.RoundRobinRoutingLogic   轮询

    akka.routing.RandomRoutingLogic    随机

    akka.routing.SmallestMailboxRoutingLogic   空闲

    akka.routing.BroadcastRoutingLogic   广播

    akka.routing.ScatterGatherFirstCompletedRoutingLogic   分散聚集 

    akka.routing.TailChoppingRoutingLogic      尾部断续 

    akka.routing.ConsistentHashingRoutingLogic    一致性哈希


    创建Router Actor

    创建router actor 有两种方式:

                1. Pool(池)——routees都是router 的子actor,如果routees终止,router将把它们移除

                2.Group(群组)——routees都创建在router的外部,router通过使用actor来选择将消息发送到指定路径,但不监管routees是否终止。

    Router actor 向 routees 发送消息,与向普通actor发送消息一样通过其ActorRef。Router actor 不会改变消息的发送人,routees 回复消息时发送回原始发件人,而不是Router actor。

    Pool(池)

    可以通过配置并使用代码在配置中获取的方法来实现  (例如创建一个轮询Router向5个routees发送消息)

    也可以只使用编程的方式来实现

    远程部署Router

    既可以创建本地actor来作为Router,也可以命令Router在任一远程主机上部署子actor。

    需要将路由配置放在RemoteRouterConfig下,在远程部署的路径类中要添加akka-remote模块

    发送者

    默认情况下,当一个routee发送消息,它隐式地设置自己为发送者。

    sender() ! x // replies will go to this actor

    但是通常将routee的父Router作为发送者更有用处。这样可以将routees的信息隐藏起来。

    sender().tell("reply", context.parent) // replies will go back to parent

    sender().!("reply")(context.parent) // alternative syntax (beware of the parens!)

    监管

    在Pool中,routees是Router 的子actor,所以Router也负责监管。

    可以使用Pool 的supervisorStrategy属性来配置监管策略。如果没有提供配置,默认策略是“always escalate”(总是上升)。就是错误都会上传到Router的监管者进行处理。但是Router的监管者会认为是Router本身发生错误,因此会重启Router,所有的routees也将被重启,这样导致监管的效率变低。所以我们要指定监管策略。

    Group(群组)

    有时我们需要单独的创建routees,然后提供一个Router来供其使用。可以通过将routees的路径传递给Router的配置,消息将通过ActorSelection来发送到这些路径。

    同样Group像Pool一样有两种方法创建。(为三个routee actor创建Router)

    编程方式实现:


    路由的使用

    (具体配置见Routing

    1.RoundRobinPool RoundRobinGroup

    Router对routees使用轮询机制

    2.RandomPool RandomGroup

    Router随机选择routees发送消息

    3.BalancingPool

    尝试从繁忙的routee重新分配任务到空闲routee,所有的routee共享一个mailbox

    4.SmallestMailboxPool

    Router创建的所有routees中谁邮箱中的消息最少发给谁

    5.BroadcastPool BroadcastGroup

    广播的路由器将接收到的消息转发到它所有的routee。

    6.ScatterGatherFirstCompletedPool 和 ScatterGatherFirstCompletedGroup

    将消息发送给所有的routees,然后等待到收到第一个回复,将结果发送回原始发送者。其他的回复将被丢弃

    7.TailChoppingPool 和 TailChoppingGroup

    将首先发送消息到一个随机挑取的routee,短暂的延迟后发给第二个routee(从剩余的routee中随机挑选),以此类推。它等待第一个答复,并将它转回给原始发送者。其他答复将被丢弃。

    此Router的目标是通过查询到多个routee来减少延迟,假设其他的actor可能比第一个actor更快响应。

    8.ConsistentHashingPool ConsistentHashingGroup

    对消息使用一致性哈希(consistent hashing)选择routee

    有三种方式定义哪些数据作为一致性哈希键

    ·    你可以定义路由的hashMapping,将传入的消息映射到它们一致哈希键。这使决策对发送者透明。

    ·    这些消息可能会实现akka.routing.ConsistentHashingRouter.ConsistentHashable。键是消息的一部分,并很方便地与消息定义一起定义。

    ·    消息可以被包装在一个akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope中,来定义哪些数据可以用来做一致性哈希。发送者知道要使用的键。


    特殊处理的消息

    Broadcast消息

    用于向Router所有的routee发送一条消息,不管该Router通常是如何路由消息的。

    PoisonPill消息

    无论哪个actor收到PosionPill消息都会被停止。但是对于PoisonPill消息Router不会将其传给routees。但仍然能影响到routees,因为Router停止时它的子actor也会停止,就可能会造成消息未处理。因此我们可以将PoisonPill包装到Broadcast消息中。这样Router所管理的所有routees将会处理完消息后再处理PoisonPill并停止。

    Kill消息

    当Kill消息被发送到Router,Router将内部处理该消息,并且不会将它发送到其routee。Router将抛出ActorKilledException并失败,然后Router根据监管的策略,被恢复、重启或终止。

    Router的子routee也将被暂停,也受Router监管的影响,但是独立在Router外部创建的routee将不会被影响。

    Managagement消息

    发送akka.routing.GetRoutees到一个Router actor,使其回送一个包含当前使用routee的akka.routing.Routees消息。

    发送akka.routing.AddRoutee到一个Router actor会将那个routee添加到其routee集合中。

    发送akka.routing.RemoveRoutee到一个Router actor将从其routee集合删除该routee。

    发送akka.routing.AdjustPoolSize到一个Pool (Router actor)将从其routee集合中添加或删除该数目的routee。


    动态改变大小的池

    大多数Pool的routees数量是固定的,但是可以制定调整策略来动态改变routees的数量

    两种调整方式(resizer): 默认的ResizerOptimalSizeExploringResizer

    Resizer是根据压力的大小来调整的,繁忙的routees 占总数的百分比高于或低于某个阈值就会进行调整

    OptimalSizeExploringResizer :将池的大小调整为最佳大小,来提供最多的消息吞吐量。

    它跟踪每个Pool 的消息吞吐量来定期执行以下三个调整操作。

        ·    如果在一段时间内没有看到所有routees被充分利用的,就缩小规模。

        ·    随机探索附近的池大小,以尝试收集吞吐量指标。

        ·    对附近的池大小进行优化(比其他任何附近池的)吞吐量指标更好。


    Akka中的路由是如何设计的

    从表面看,Router就像普通的actor,但是它们实际实现是不同的。路由器在收消息和快速发消息给routee被设计的极度优化。

    Router可以通过优化原有消息处理pipeline来支持多线程,从而达到更高的吞吐量。通过将路由策略直接嵌入到其ActorRef 发送到路由器ActorRef 的消息可以直接被路由到routees,直接跳过了单线程的Router actor。


    自定义Router

    在你创建自己的Router之前,要考虑到普通的actor性能不如Router actor高,因此你的程序可以接受较低的消息吞吐量。

    假设你想获得最大的性能,请参考


    配置调度器

    为子Pool创建调度器将从Props中取调度器中有描述

    为了可以很容易地定义池中routee的调度器,你可以在配置的部署一节中定义内联调度器。

    启用一个池专用调度器,你唯一需要做的

    Router的“头”,不能总在相同的调度器上运行,因为它处理各种各样不同类型的消息,因此这个“头”是个特殊的actor,不使用Props验证的调度器。但是在RouterConfig中的 routerDispatcher 是标准Router actor默认调度器,可以在Router 的构造器和工厂方法中配置。 即使是自定义的Router也必须要实现默认调度器。

    相关文章

      网友评论

        本文标题:Akka 2.5.11 路由 Routing

        本文链接:https://www.haomeiwen.com/subject/yyvxqftx.html