美文网首页
Spark源码分析十二-RPC之Dispatcher

Spark源码分析十二-RPC之Dispatcher

作者: 无色的叶 | 来源:发表于2020-07-28 14:58 被阅读0次

    Dispatcher类图属性

    image.png
    • endpoints是一个ConcurrentMap[String, MessageLoop],负责存储 endpoint name 和MessageLoop的映射关系。
    • endpointRefs: ConcurrentMap[RpcEndpoint, RpcEndpointRef] 包含了 RpcEndpoint 和 RpcEndpointRef 的映射关系。
    • stopped 标志 Dispatcher 是否已经停止了
      -sharedLoop是SharedMessageLoop实例对象,负责处理收到的消息

    RPC 服务端启动时调用setupEndpoint

    override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = {
        dispatcher.registerRpcEndpoint(name, endpoint)
      }
    

    实际调用的是Dispatcher的registerRpcEndpoint方法

    def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
        // 构造RpcEndpointAddress
        val addr = RpcEndpointAddress(nettyEnv.address, name)
      // 构造endpointRef 引用
        val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
        synchronized {
          if (stopped) {
            throw new IllegalStateException("RpcEnv has been stopped")
          }
          if (endpoints.containsKey(name)) {
            throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")
          }
    
          // This must be done before assigning RpcEndpoint to MessageLoop, as MessageLoop sets Inbox be
          // active when registering, and endpointRef must be put into endpointRefs before onStart is
          // called.
        // 保存endpoint和endpointRef引用对应关系
          endpointRefs.put(endpoint, endpointRef)
    
          var messageLoop: MessageLoop = null
          try {
            messageLoop = endpoint match {
              case e: IsolatedRpcEndpoint =>
                new DedicatedMessageLoop(name, e, this)
              case _ =>
                // SharedMessageLoop 构造Inbox对象,维护name和endpoint映射关系
                sharedLoop.register(name, endpoint)
                sharedLoop
            }
            endpoints.put(name, messageLoop)
          } catch {
            case NonFatal(e) =>
              endpointRefs.remove(endpoint)
              throw e
          }
        }
        endpointRef
      }
    

    Dispatcher处理消息

    最终都是调用postMessage方法进行消息处理

     private def postMessage(
          endpointName: String,
          message: InboxMessage,
          callbackIfStopped: (Exception) => Unit): Unit = {
        val error = synchronized {
          val loop = endpoints.get(endpointName)
          if (stopped) {
            Some(new RpcEnvStoppedException())
          } else if (loop == null) {
            Some(new SparkException(s"Could not find $endpointName."))
          } else {
            loop.post(endpointName, message)
            None
          }
        }
        // We don't need to call `onStop` in the `synchronized` block
        error.foreach(callbackIfStopped)
      }
    

    主要是loop.post(endpointName, message)方法,进一步调度SharedMessageLoop的post方法,比较简单,就是把message放入到LinkedBlockingQueue队列中

    override def post(endpointName: String, message: InboxMessage): Unit = {
        val inbox = endpoints.get(endpointName)
        inbox.post(message)
        setActive(inbox)
      }
    
     protected final def setActive(inbox: Inbox): Unit = active.offer(inbox)
    

    SharedMessageLoop会启动一个线程池执行receiveLoopRunnable run方法

    /** Thread pool used for dispatching messages. */
      override protected val threadpool: ThreadPoolExecutor = {
        val numThreads = getNumOfThreads(conf)
        val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "dispatcher-event-loop")
        for (i <- 0 until numThreads) {
          pool.execute(receiveLoopRunnable)
        }
        pool
      }
    
    
    protected val receiveLoopRunnable = new Runnable() {
        override def run(): Unit = receiveLoop()
      }
    
    private def receiveLoop(): Unit = {
        try {
          while (true) {
            try {
              val inbox = active.take()
              if (inbox == MessageLoop.PoisonPill) {
                // Put PoisonPill back so that other threads can see it.
                setActive(MessageLoop.PoisonPill)
                return
              }
              inbox.process(dispatcher)
            } catch {
              case NonFatal(e) => logError(e.getMessage, e)
            }
          }
        } catch {
          case _: InterruptedException => // exit
            case t: Throwable =>
              try {
                // Re-submit a receive task so that message delivery will still work if
                // UncaughtExceptionHandler decides to not kill JVM.
                threadpool.execute(receiveLoopRunnable)
              } finally {
                throw t
              }
        }
      }
    }
    

    PoisonPill 是一个空的EndpointData对象,起了一个标志位的作用,如果想要停止 Diapatcher ,会把PoisonPill 喂给 receiver 吃,当threadpool 执行 MessageLoop 任务时, 吃到了毒药,马上退出,线程也就死掉了。再调用 inbox.process(dispatcher)方法,处理message

     /**
       * Process stored messages.
       */
      def process(dispatcher: Dispatcher): Unit = {
        var message: InboxMessage = null
        inbox.synchronized {
          if (!enableConcurrent && numActiveThreads != 0) {
            return
          }
          message = messages.poll()
          if (message != null) {
            numActiveThreads += 1
          } else {
            return
          }
        }
        while (true) {
          safelyCall(endpoint) {
            message match {
              case RpcMessage(_sender, content, context) =>
                try {
                  endpoint.receiveAndReply(context).applyOrElse[Any, Unit](content, { msg =>
                    throw new SparkException(s"Unsupported message $message from ${_sender}")
                  })
                } catch {
                  case e: Throwable =>
                    context.sendFailure(e)
                    // Throw the exception -- this exception will be caught by the safelyCall function.
                    // The endpoint's onError function will be called.
                    throw e
                }
    
              case OneWayMessage(_sender, content) =>
                endpoint.receive.applyOrElse[Any, Unit](content, { msg =>
                  throw new SparkException(s"Unsupported message $message from ${_sender}")
                })
    
              case OnStart =>
                endpoint.onStart()
                if (!endpoint.isInstanceOf[ThreadSafeRpcEndpoint]) {
                  inbox.synchronized {
                    if (!stopped) {
                      enableConcurrent = true
                    }
                  }
                }
    
              case OnStop =>
                val activeThreads = inbox.synchronized { inbox.numActiveThreads }
                assert(activeThreads == 1,
                  s"There should be only a single active thread but found $activeThreads threads.")
                dispatcher.removeRpcEndpointRef(endpoint)
                endpoint.onStop()
                assert(isEmpty, "OnStop should be the last message")
    
              case RemoteProcessConnected(remoteAddress) =>
                endpoint.onConnected(remoteAddress)
    
              case RemoteProcessDisconnected(remoteAddress) =>
                endpoint.onDisconnected(remoteAddress)
    
              case RemoteProcessConnectionError(cause, remoteAddress) =>
                endpoint.onNetworkError(cause, remoteAddress)
            }
          }
    
          inbox.synchronized {
            // "enableConcurrent" will be set to false after `onStop` is called, so we should check it
            // every time.
            if (!enableConcurrent && numActiveThreads != 1) {
              // If we are not the only one worker, exit
              numActiveThreads -= 1
              return
            }
            message = messages.poll()
            if (message == null) {
              numActiveThreads -= 1
              return
            }
          }
        }
      }
    

    Diapatcher 的职责变得单一,只需要把数据分发就可以了。具体分发数据要如何处理的问题留给了 Inbox,Inbox 把关注点放在了 如何处理这些消息上。考虑并解决了 一次性批量处理消息问题、多线程安全问题、异常抛出问题,多消息分支处理问题等等问题

    Outbox和Inbox类似,两者区别endpoint和endpointRef是在同一个机器上,message放入Inbox上,否则放入到OutBox上


    image.png

    参考:
    https://www.cnblogs.com/huanghanyu/p/12989074.html

    https://www.cnblogs.com/johnny666888/p/11128634.html

    相关文章

      网友评论

          本文标题:Spark源码分析十二-RPC之Dispatcher

          本文链接:https://www.haomeiwen.com/subject/hejfrktx.html