A Router Actor
-
有时, 需要把相同类型的消息发给一个 actor 集合, 以便消息可以并行处理 - 单个 actor 一次只能处理一个消息
router
角色是一个自包含 router 对象的 actor 类, 可以管理其下的 routee -
有两种类型的 actor
-
Pool
: router 创建出routee
作为自己的子 actor. 当发现子 actor terminate 时, 将其从 router 中删除 -
Group
:routee
是在router
外部创建的, 并通过使用 actor selection 选择发送到哪个 routee, 无法监控其 termination.
-
-
Pool 的方式:
- 通过 router 发送消息给 routee 的方式和 发送给普通 actor 的方式没有差别, 只要将消息发送给 router 的
ActorRef
; router actor 会把消息发送给 routee 并不改变源发送者, 当 routee 回应消息时, 回应会直接发给源发送者, 而不是发给 router actor - 关于 sender
当 routee 使用sender() ! x
回复消息时, 消息会略过 router actor, 直接发送给 router 的父 actor.
但有时想让 routees 把消息回复给 router, 这样做的目的是隐藏 router 后面的 routee细节. 可以在 routee 中使用如下片段回复消息给 router
context.parent ! WorkerReplay()
- demo
/** * 第一种创建router的方式 (qucik start) , 同时测试 router 的消息透传 */ object TestRouter { def main(args: Array[String]): Unit = { val system = ActorSystem("MyTest") // akka://MyTest/system akka://MyTest/user val starter = system.actorOf(Props[Starter], "starter-Entrance") starter ! 10 // 告诉 master 发送10个消息给worker } } class Starter extends Actor { val master: ActorRef = context.actorOf(Props[Master], "master-Entrance") override def receive: Receive = { case cnt: Int => { println(s"发送${cnt}条消息给master") for (i <- 1 to cnt) { master ! Work() Thread.sleep(200) } } case _: MasterReplay => println(s"starter收到master的回复: " + sender().path) case _: WorkerReplay => println(s"starter收到worker的回复: " + sender().path) case _ => } } class Master extends Actor { //TODO 第一种创建router的方式 var router = { var i = 1 val routees = Vector.fill(5) { val r = context.actorOf(Props(classOf[Worker], i)) // 子actor i = i + 1 context.watch(r) ActorRefRoutee(r) } Router(RoundRobinRoutingLogic(), routees) } override def receive: Receive = { case w: Work => // println(s"master 收到消息") router.route(w, sender()) // println(s"发给 master 的 sender 是${sender().path}") // sender 是 starter case Terminated(a) => router = router.removeRoutee(a) val r = context.actorOf(Props[Worker]) context.watch(r) router = router.addRoutee(r) case _: WorkerReplay => { println("master 收到 worker 的回复") } } } class Worker(idx: Int) extends Actor { override def receive: Receive = { case _: Work => { println(s"worker " + idx + s" 号工作 (${self.path}), " + s"收到${sender().path}的消息") //TODO 切换下面两种 routee 回复消息的方式, 体会 router 的透传 // sender() ! WorkerReplay() // worker 会回复给started, router(master) 在消息回应时被隐藏 context.parent ! WorkerReplay() // worker 会回复给 router(master) } case _ => } } case class Work() case class MasterReplay() case class WorkerReplay()
- 通过 router 发送消息给 routee 的方式和 发送给普通 actor 的方式没有差别, 只要将消息发送给 router 的
-
Group 的方式:
-
routee
并不是 router 的子actor, 而是在外部产生的. 通过指定外部 routee 的 path
val paths = List("/user/workers/w1", "/user/workers/w2", "/user/workers/w3") val router4: ActorRef = context.actorOf(RoundRobinGroup(paths).props(), "router4")
-
Router usage
-
像创建 actor 一样创建 router
/** * 第二种创建 router 的方式: 像普通actor一样创建 */ class Parent extends Actor { val router: ActorRef = context.actorOf(RoundRobinPool(5).props(Props[ParentWorker]), "router2") override def receive: Receive = { case x:Int => { for (_ <- 1 to x) { router ! Work() Thread.sleep(200) } } } } class ParentWorker extends Actor { override def receive: Receive = { case _: Work => println(s"worker收到${sender().path}的消息 -- [${self.path}]") case _ => } } object TestRouter2 { def main(args: Array[String]): Unit = { val system = ActorSystem("TestRouter2") val p = system.actorOf(Props[Parent], "parent") p ! 10 } }
-
SmallestMailboxPool
这种 actor 有限发送给未挂起的, mailbox 中消息最少的 routee .其选择以如下顺序进行:- 选择一个空闲的(未在处理消息), mailbox 消息数为0的 routee
- 选择一个 mailbox 消息数为0的 routee
- 选择一个 mailbox 中 pending message 最少的 routee
这种 router 只有 pool 类型的, 因为要通过子 actor 的方式拿到 routee 的内部状态.
val router12: ActorRef =
context.actorOf(SmallestMailboxPool(5).props(Props[Worker]), "router12")
Specially Handled Messages
大多数发送给 router actor 的消息都会根据 routing 逻辑转发(forward), 有少几个类型的消息需要特殊动作
-
Broadcast Messages
- router 收到这种类型的消息, 会将其转发给所有 routee
router ! Broadcast(x)
-
管理型消息
- Sending akka.routing.GetRoutees to a router actor will make it send back its currently used routees in a akka.routing.Routees message.
- Sending akka.routing.AddRoutee to a router actor will add that routee to its collection of routees.
- Sending akka.routing.RemoveRoutee to a router actor will remove that routee to its collection of routees.
- Sending akka.routing.AdjustPoolSize to a pool router actor will add or remove that number of routees to its collection of routees.
class Parent extends Actor {
val router: ActorRef =
context.actorOf(RoundRobinPool(5).props(Props[ParentWorker]), "router2")
override def receive: Receive = {
case "check" => router ! GetRoutees // Management Messages
case x:Routees => println(x.routees) // router会返回GetRoutees的消息
}
}
class ParentWorker extends Actor {
override def receive: Receive = {
case _ =>
}
}
object TestRouter2 {
def main(args: Array[String]): Unit = {
val system = ActorSystem("TestRouter2")
val p = system.actorOf(Props[Parent], "parent")
p ! "check"
}
}
Resizer
- default resizer
val resizer = DefaultResizer(lowerBound = 2, upperBound = 15)
val router30: ActorRef =
context.actorOf(RoundRobinPool(5, Some(resizer)).props(Props[Worker]), "router30")
reference: https://github.com/lj72808up/Codes/blob/master/Actor_Demo/src/main/scala/com/test/TestRouter.scala
网友评论