美文网首页程序员
Akka中的Scheduler

Akka中的Scheduler

作者: _张逸_ | 来源:发表于2016-07-21 15:28 被阅读2125次

    Actor之间进行通信时,通常都会立即发送消息,然后即刻返回,不会阻塞。但是,AKKA也支持我们对Actor设置Scheduler,用以控制发送消息的次数和时间。

    Scheduler由ActorSystem提供,可以对Actor的消息发送进行调度。可以通过ActorContext获得当前的ActorSystem,进而获得Scheduler。如下方法是定期在5秒后执行一次:

    import scala.concurrent.ExecutionContext
    import scala.concurrent.duration._
    
    import context.dispatcher
    context.system.scheduler.scheduleOnce(5 seconds, targetActorRef, RequestMessage)
    

    AKKA还支持消息的定期重复执行,方法为schedule()。例如设置在2秒后,间隔5秒重复执行:

    import context.dispatcher
    context.system.scheduler.schedule(2 seconds, 5 seconds, targetActorRef, RequestMessage)
    

    如果传入schedule函数的第一个参数为0 seconds,则表示消息会及时发送,然后每隔5秒钟再重复发送。

    注意,在上述的两段代码中,调用schedule()或scheduleOnce()方法之前都必须导入context.dispatcher,因为这两个方法都定义了类型为ExecutionContext的隐式参数,如下为AKKA中schedule函数的定义:

      final def schedule(
        initialDelay: FiniteDuration,
        interval: FiniteDuration,
        receiver: ActorRef,
        message: Any)(implicit executor: ExecutionContext,
                      sender: ActorRef = Actor.noSender): Cancellable =
        schedule(initialDelay, interval, new Runnable {
          def run = {
            receiver ! message
            if (receiver.isTerminated)
              throw new SchedulerException("timer active for terminated actor")
          }
        })
    

    如果调用schedule()方法,只要不终止context或者actor,消息就会一直被发送。如果希望指定发送消息的条数,就无能为力了。例如,我希望每5秒钟发送一次消息,一共发送100次,然后终止receiver。期望的调用为:

    context.system.scheduler.schedule(0 seconds, 5 seconds, 100, targetActorRef, RequestMessage)
    

    好在AKKA已经将基本的调度工作完成了,我们可以自定义这样的接口,在重用现有schedule()方法的基础上,引入执行次数。为了让接口更加自然,我们可以通过引入隐式转换为Scheduler类增加这个新的方法,实现代码如下:

      implicit class SchedulerWrapper(scheduler: Scheduler) {
        def schedule(initialDelay: FiniteDuration,
                     interval: FiniteDuration,
                     repeatTimes: Int,
                     receiver: ActorRef,
                     message: Any)(implicit executor: ExecutionContext,
                                   sender: ActorRef = Actor.noSender): Cancellable = {
          var count = 0
          var repeatScheduler: Option[Cancellable] = None
          repeatScheduler = Some(
            scheduler.schedule(initialDelay, interval) {
              receiver ! message
              count += 1
              if (receiver.isTerminated && count < repeatTimes)
                throw new Exception("timer active for terminated actor")
    
              if (count >= repeatTimes) {
                repeatScheduler.foreach(_.cancel())
              }
          })
          repeatScheduler.get
        }
      }
    

    一旦发送消息的次数超过指定的repeatTimes,schedule的过程就会被取消。

    即使与Actor无关,也可以利用Scheduler来执行一些需要反复重试的工作。例如,在Spark中,AppClient中的ClientActor需要与Master这个Remote Actor通信,从而注册所有的Spark Master。由于注册过程中牵涉到远程通信,可能会因为网络原因导致通信错误,因此需要引入重试的机会。代码实现为:

      private val REGISTRATION_TIMEOUT = 20.seconds
      private val REGISTRATION_RETRIES = 3
    
       def tryRegisterAllMasters() {
          for (masterAkkaUrl <- masterAkkaUrls) {
            logInfo("Connecting to master " + masterAkkaUrl + "...")
            val actor = context.actorSelection(masterAkkaUrl)
            actor ! RegisterApplication(appDescription)
          }
        }
    
        def registerWithMaster() {
          tryRegisterAllMasters()
          import context.dispatcher
          var retries = 0
          registrationRetryTimer = Some {
            context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) {
              Utils.tryOrExit {
                retries += 1
                if (registered) {
                  registrationRetryTimer.foreach(_.cancel())
                } else if (retries >= REGISTRATION_RETRIES) {
                  markDead("All masters are unresponsive! Giving up.")
                } else {
                  tryRegisterAllMasters()
                }
              }
            }
          }
        }
    

    tryRegisterAllMasters()方法会通过masterAkkUrl获得remote actor(即Master),然后发送RegisterApplication消息。Master收到该消息后,会进行注册工作,具体代码如下:

        case RegisterApplication(description) => {
          if (state == RecoveryState.STANDBY) {
            // ignore, don't send response
          } else {
            logInfo("Registering app " + description.name)
            val app = createApplication(description, sender)
            registerApplication(app)
            logInfo("Registered app " + description.name + " with ID " + app.id)
            persistenceEngine.addApplication(app)
            sender ! RegisteredApplication(app.id, masterUrl)
            schedule()
          }
        }
    

    Master中的sender实际就是ClientActor,在注册成功后,通过sender发送RegisteredApplication消息。在ClientActor中的实现为:

        override def receiveWithLogging: PartialFunction[Any, Unit] = {
          case RegisteredApplication(appId_, masterUrl) =>
            appId = appId_
            registered = true
            changeMaster(masterUrl)
            listener.connected(appId)
            //其他代码
        }
    

    收到RegisteredApplicaition消息即意味着注册成功,则标志变量registered就设置为true。在结合前面代码中的registerWithMaster()方法。如果registered为true,就会取消。对应代码为:

    registrationRetryTimer.foreach(_.cancel())
    

    schedule()返回的类型的类型为Cancellable,因此,registrationRetryTimer类型就为Some[Cancellable]。如果registered为false,就继续尝试去注册,直到注册成功或者重试次数超过指定次数。

    相关文章

      网友评论

        本文标题:Akka中的Scheduler

        本文链接:https://www.haomeiwen.com/subject/nupbjttx.html