美文网首页
Akka Routing

Akka Routing

作者: lj72808up | 来源:发表于2020-10-22 20:12 被阅读0次

A Router Actor

  1. 有时, 需要把相同类型的消息发给一个 actor 集合, 以便消息可以并行处理 - 单个 actor 一次只能处理一个消息
    router角色是一个自包含 router 对象的 actor 类, 可以管理其下的 routee

  2. 有两种类型的 actor

    • Pool: router 创建出 routee 作为自己的子 actor. 当发现子 actor terminate 时, 将其从 router 中删除
    • Group: routee 是在 router 外部创建的, 并通过使用 actor selection 选择发送到哪个 routee, 无法监控其 termination.
  3. Pool 的方式:

    • 通过 router 发送消息给 routee 的方式和 发送给普通 actor 的方式没有差别, 只要将消息发送给 router 的 ActorRef; router actor 会把消息发送给 routee 并不改变源发送者, 当 routee 回应消息时, 回应会直接发给源发送者, 而不是发给 router actor
    • 关于 sender
      当 routee 使用 sender() ! x 回复消息时, 消息会略过 router actor, 直接发送给 router 的父 actor.
      但有时想让 routees 把消息回复给 router, 这样做的目的是隐藏 router 后面的 routee细节. 可以在 routee 中使用如下片段回复消息给 router
    context.parent ! WorkerReplay() 
    
    • demo
    /**
     *  第一种创建router的方式 (qucik start) , 同时测试 router 的消息透传
     */
    object TestRouter {
      def main(args: Array[String]): Unit = {
        val system = ActorSystem("MyTest") // akka://MyTest/system  akka://MyTest/user
        val starter = system.actorOf(Props[Starter], "starter-Entrance")
        starter ! 10 // 告诉 master 发送10个消息给worker
      }
    }
    
    class Starter extends Actor {
      val master: ActorRef = context.actorOf(Props[Master], "master-Entrance")
      override def receive: Receive = {
        case cnt: Int => {
          println(s"发送${cnt}条消息给master")
          for (i <- 1 to cnt) {
            master ! Work()
            Thread.sleep(200)
          }
        }
        case _: MasterReplay => println(s"starter收到master的回复: " + sender().path)
        case _: WorkerReplay => println(s"starter收到worker的回复: " + sender().path)
        case _ =>
      }
    }
    
    class Master extends Actor {
      //TODO 第一种创建router的方式
      var router = {
        var i = 1
        val routees = Vector.fill(5) {
          val r = context.actorOf(Props(classOf[Worker], i)) // 子actor
          i = i + 1
          context.watch(r)
          ActorRefRoutee(r)
        }
        Router(RoundRobinRoutingLogic(), routees)
      }
    
      override def receive: Receive = {
        case w: Work =>
          //      println(s"master 收到消息")
          router.route(w, sender())
        //      println(s"发给 master 的 sender 是${sender().path}")   // sender 是 starter
        case Terminated(a) =>
          router = router.removeRoutee(a)
          val r = context.actorOf(Props[Worker])
          context.watch(r)
          router = router.addRoutee(r)
        case _: WorkerReplay => {
          println("master 收到 worker 的回复")
        }
      }
    }
    
    class Worker(idx: Int) extends Actor {
      override def receive: Receive = {
        case _: Work => {
          println(s"worker " + idx + s" 号工作 (${self.path}), " + s"收到${sender().path}的消息")
          //TODO 切换下面两种 routee 回复消息的方式, 体会 router 的透传
          // sender() ! WorkerReplay()   // worker 会回复给started, router(master) 在消息回应时被隐藏
          context.parent ! WorkerReplay() // worker 会回复给 router(master)
        }
        case _ =>
      }
    }
    
    case class Work()
    
    case class MasterReplay()
    
    case class WorkerReplay()
    
  4. Group 的方式:

    • routee 并不是 router 的子actor, 而是在外部产生的. 通过指定外部 routee 的 path
    val paths = List("/user/workers/w1", "/user/workers/w2", "/user/workers/w3")
    val router4: ActorRef =
      context.actorOf(RoundRobinGroup(paths).props(), "router4")
    

Router usage

  1. 像创建 actor 一样创建 router

    /**
     * 第二种创建 router 的方式: 像普通actor一样创建
     */
    class Parent extends Actor {
      val router: ActorRef =
        context.actorOf(RoundRobinPool(5).props(Props[ParentWorker]), "router2")
      override def receive: Receive = {
        case x:Int => {
          for (_ <- 1 to x) {
            router ! Work()
            Thread.sleep(200)
          }
        }
      }
    }
    
    class ParentWorker extends Actor {
      override def receive: Receive = {
        case _: Work => println(s"worker收到${sender().path}的消息 -- [${self.path}]")
        case _ =>
      }
    }
    
    object TestRouter2 {
      def main(args: Array[String]): Unit = {
        val system = ActorSystem("TestRouter2")
        val p = system.actorOf(Props[Parent], "parent")
        p ! 10
      }
    }
    
  2. SmallestMailboxPool
    这种 actor 有限发送给未挂起的, mailbox 中消息最少的 routee .其选择以如下顺序进行:

    • 选择一个空闲的(未在处理消息), mailbox 消息数为0的 routee
    • 选择一个 mailbox 消息数为0的 routee
    • 选择一个 mailbox 中 pending message 最少的 routee

这种 router 只有 pool 类型的, 因为要通过子 actor 的方式拿到 routee 的内部状态.

val router12: ActorRef =
  context.actorOf(SmallestMailboxPool(5).props(Props[Worker]), "router12")

Specially Handled Messages

大多数发送给 router actor 的消息都会根据 routing 逻辑转发(forward), 有少几个类型的消息需要特殊动作

  1. Broadcast Messages

    • router 收到这种类型的消息, 会将其转发给所有 routee
    router ! Broadcast(x)
    
  2. 管理型消息

    • Sending akka.routing.GetRoutees to a router actor will make it send back its currently used routees in a akka.routing.Routees message.
    • Sending akka.routing.AddRoutee to a router actor will add that routee to its collection of routees.
    • Sending akka.routing.RemoveRoutee to a router actor will remove that routee to its collection of routees.
    • Sending akka.routing.AdjustPoolSize to a pool router actor will add or remove that number of routees to its collection of routees.
class Parent extends Actor {
  val router: ActorRef =
    context.actorOf(RoundRobinPool(5).props(Props[ParentWorker]), "router2") 
  override def receive: Receive = {
    case "check" => router ! GetRoutees   // Management Messages
    case x:Routees => println(x.routees)  // router会返回GetRoutees的消息
  }
}

class ParentWorker extends Actor {
  override def receive: Receive = {
    case _ =>
  }
}

object TestRouter2 {
  def main(args: Array[String]): Unit = {
    val system = ActorSystem("TestRouter2")
    val p = system.actorOf(Props[Parent], "parent")
    p ! "check"
  }
}

Resizer

  1. default resizer
val resizer = DefaultResizer(lowerBound = 2, upperBound = 15)
val router30: ActorRef =
  context.actorOf(RoundRobinPool(5, Some(resizer)).props(Props[Worker]), "router30")

reference: https://github.com/lj72808up/Codes/blob/master/Actor_Demo/src/main/scala/com/test/TestRouter.scala

相关文章

网友评论

      本文标题:Akka Routing

      本文链接:https://www.haomeiwen.com/subject/uwvpmktx.html