Spark 2.0 RPC通信层设计原理分析

    Spark RPC层设计概况





    def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef


    RpcEndpoint定义了RPC通信过程中的通信端对象,除了具有管理一个RpcEndpoint生命周期的操作(constructor-> onStart -> receive* ->onStop),并给出了通信过程中一个RpcEndpoint所具有的基于事件驱动的行为(连接、断开、网络异常),实际上对于Spark框架来说RpcEndpoint主要是接收消息并处理。

    def  receive:PartialFunction[Any, Unit]={
        case_ =>throw newSparkException(self +" does not implement 'receive'")
    def  receiveAndReply(context:RpcCallContext):PartialFunction[Any, Unit]={
      case_ => context.sendFailure(newSparkException(self +" won't reply anything"))




    private[spark] abstract class RpcEndpointRef(conf: SparkConf)  extends Serializable with Logging { 
      private[this] val maxRetries = RpcUtils.numRetries(conf)
      private[this] val retryWaitMs = RpcUtils.retryWaitMs(conf)
      private[this] val defaultAskTimeout = RpcUtils.askRpcTimeout(conf)
      def address: RpcAddress
      def name: String
      def send(message: Any): Unit
      def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T]
      def ask[T: ClassTag](message: Any): Future[T] = ask(message, defaultAskTimeout)
      def askWithRetry[T: ClassTag](message: Any): T = askWithRetry(message, defaultAskTimeout)
      def askWithRetry[T: ClassTag](message: Any, timeout: RpcTimeout): T = {
        ... ...


    Driver Spark Env中NettyRpcEnv创建

    Driver Spark Env是Spark Application中Driver的运行环境,其需要创建很多组件,比如SecurityManager、rpcEnv、broadcastManager、mapOutputTracker、memoryManager、blockTransferService、blockManagerMaster、blockManager、metricsSystem等,由于本文是介绍Spark RPC机制的,估只介绍rpcEnv创建过程及服务启动过程。从NettyRpcEnv.scala的NettyRpcEnvFactory的Create方法说起

    private[rpc] class NettyRpcEnvFactory extends RpcEnvFactory with Logging {
      def create(config: RpcEnvConfig): RpcEnv = {
        val sparkConf = config.conf
        val javaSerializerInstance = new JavaSerializer(sparkConf).newInstance().asInstanceOf[JavaSerializerInstance]
        //new 一个NettyRpcEnv实例
        val nettyEnv =  new NettyRpcEnv(sparkConf, javaSerializerInstance, config.host, config.securityManager)
        if (!config.clientMode) {
          val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = {
            actualPort => nettyEnv.startServer(actualPort)
            (nettyEnv, nettyEnv.address.port)
          try {
            // 根据指定的端口号和主机,启动Driver Rpc服务
            Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1
          catch {
            case NonFatal(e) =>
              throw e


    1. 创建NettyRpcEnv
    private[netty] class NettyRpcEnv(val conf: SparkConf, javaSerializerInstance: JavaSerializerInstance, host: String, securityManager: SecurityManager)  extends RpcEnv(conf) with Logging {  
      // 创建transportConf
      private[netty] val transportConf = SparkTransportConf.fromSparkConf(conf.clone.set("spark.rpc.io.numConnectionsPerPeer", "1"), "rpc", conf.getInt("spark.rpc.io.threads", 0))
      private val dispatcher: Dispatcher = new Dispatcher(this)
      private val streamManager = new NettyStreamManager(this)  
      private val transportContext = new TransportContext(transportConf, new NettyRpcHandler(dispatcher, this, streamManager))
      private def createClientBootstraps(): java.util.List[TransportClientBootstrap] = {
        if (securityManager.isAuthenticationEnabled()) {
          java.util.Arrays.asList(new SaslClientBootstrap(transportConf, "", securityManager,        securityManager.isSaslEncryptionEnabled()))
        } else {
      // 声明一个clientFactory,用户创建通信的客户端
      private val clientFactory = transportContext.createClientFactory(createClientBootstraps())  
      * A separate client factory for file downloads. This avoids using the same RPC handler as   
      * the main RPC context, so that events caused by these clients are kept isolated from the   
      * main RPC traffic.   
      * It also allows for different configuration of certain properties, such as the number of   
      * connections per peer.   
      @volatile private var fileDownloadFactory: TransportClientFactory = _  
      val timeoutScheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor("netty-rpc-env-timeout")
      // Because TransportClientFactory.createClient is blocking, we need to run it in this thread pool  
      // to implement non-blocking send/ask.  
      // TODO: a non-blocking TransportClientFactory.createClient in future
      private[netty] val clientConnectionExecutor = ThreadUtils.newDaemonCachedThreadPool( "netty-rpc-connection",    conf.getInt("spark.rpc.connect.threads", 64))
      @volatile private var server: TransportServer = _  
      private val stopped = new AtomicBoolean(false)  
      * A map for [[RpcAddress]] and [[Outbox]]. When we are connecting to a remote [[RpcAddress]],   
      * we just put messages to its [[Outbox]] to implement a non-blocking `send` method.  
      private val outboxes = new ConcurrentHashMap[RpcAddress, Outbox]()  
      * Remove the address's Outbox and stop it.   
      private[netty] def removeOutbox(address: RpcAddress): Unit = {    
        val outbox = outboxes.remove(address)    
        if (outbox != null) {      
      def startServer(port: Int): Unit = {    
        val bootstraps: java.util.List[TransportServerBootstrap] =
        if(securityManager.isAuthenticationEnabled()) {  
          java.util.Arrays.asList(new SaslServerBootstrap(transportConf, securityManager)) 
        } else {   
        server = transportContext.createServer(host, port, bootstraps)
        dispatcher.registerRpcEndpoint(RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))  
      @Nullable  override lazy val address: RpcAddress = {    
        if (server != null) 
          RpcAddress(host, server.getPort()) 
      override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = {
        dispatcher.registerRpcEndpoint(name, endpoint)  
      def asyncSetupEndpointRefByURI(uri: String): Future[RpcEndpointRef] = {    
        val addr = RpcEndpointAddress(uri)    
        val endpointRef = new NettyRpcEndpointRef(conf, addr, this)    
        val verifier = new NettyRpcEndpointRef(conf, RpcEndpointAddress(addr.rpcAddress, RpcEndpointVerifier.NAME), this)    
        verifier.ask[Boolean](RpcEndpointVerifier.CheckExistence(endpointRef.name)).flatMap { find =>  if (find) {  Future.successful(endpointRef) } else { Future.failed(new RpcEndpointNotFoundException(uri)) } }(ThreadUtils.sameThread)  
      override def stop(endpointRef: RpcEndpointRef): Unit = {
      private def postToOutbox(receiver: NettyRpcEndpointRef, message: OutboxMessage): Unit = {    
        if (receiver.client != null) {  
        } else {      
          require(receiver.address != null, "Cannot send message to client endpoint with no listen address.")      
          val targetOutbox = { 
            val outbox = outboxes.get(receiver.address)        
            if (outbox == null) {          
              val newOutbox = new Outbox(this, receiver.address)          
              val oldOutbox = outboxes.putIfAbsent(receiver.address, newOutbox)          
              if (oldOutbox == null) { 
              } else {            
            } else {          
          if (stopped.get) {       
           // It's possible that we put `targetOutbox` after stopping. So we need to clean it.
          } else {        
      private[netty] def send(message: RequestMessage): Unit = {    
        val remoteAddr = message.receiver.address    
        if (remoteAddr == address) {      
          // Message to a local RPC endpoint.      
          try {        
          catch {        
            case e: RpcEnvStoppedException => logWarning(e.getMessage)      
        } else {      
          // Message to a remote RPC endpoint.      
          postToOutbox(message.receiver, OneWayOutboxMessage(serialize(message)))    
      private[netty] def createClient(address: RpcAddress): TransportClient = { clientFactory.createClient(address.host, address.port)  }  
      private[netty] def ask[T: ClassTag](message: RequestMessage, timeout: RpcTimeout): Future[T] = {
        val promise = Promise[Any]()    
        val remoteAddr = message.receiver.address    
        def onFailure(e: Throwable): Unit = {      
          if (!promise.tryFailure(e)) {        
            logWarning(s"Ignored failure: $e")      
        def onSuccess(reply: Any): Unit = reply match {      
          case RpcFailure(e) => onFailure(e)      
          case rpcReply => if (!promise.trySuccess(rpcReply)) { logWarning(s"Ignored message: $reply") }
          try {      
            if (remoteAddr == address) {        
              val p = Promise[Any]()        
              p.future.onComplete {          
                case Success(response) => onSuccess(response)          
                case Failure(e) => onFailure(e)        
              dispatcher.postLocalMessage(message, p)      
            } else {        
             val rpcMessage = RpcOutboxMessage(serialize(message), onFailure, (client, response) => onSuccess(deserialize[Any](client, response)))
             postToOutbox(message.receiver, rpcMessage)        
             promise.future.onFailure {
               case _: TimeoutException => rpcMessage.onTimeout()          
               case _ =>        
           val timeoutCancelable = timeoutScheduler.schedule(new Runnable { 
             override def run(): Unit = {          
               onFailure(new TimeoutException(s"Cannot receive any reply in ${timeout.duration}")) 
            }, timeout.duration.toNanos, TimeUnit.NANOSECONDS)
            promise.future.onComplete { v =>  
            } catch {      
              case NonFatal(e) => onFailure(e)    
      private[netty] def serialize(content: Any): ByteBuffer = {
      private[netty] def deserialize[T: ClassTag](client: TransportClient, bytes: ByteBuffer): T = {    
        NettyRpcEnv.currentClient.withValue(client) {      
          deserialize { 
            () =>  javaSerializerInstance.deserialize[T](bytes)      
      override def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef = {
      override def shutdown(): Unit = {    
      override def awaitTermination(): Unit = {    
      private def cleanup(): Unit = {    
        if (!stopped.compareAndSet(false, true)) {      
        val iter = outboxes.values().iterator()    
        while (iter.hasNext()) {      
          val outbox = iter.next()      
        if (timeoutScheduler != null) {      
        if (dispatcher != null) {      
        if (server != null) {     
        if (clientFactory != null) {      
        if (clientConnectionExecutor != null) {      
        if (fileDownloadFactory != null) {      
      override def deserialize[T](deserializationAction: () => T): T = {
        NettyRpcEnv.currentEnv.withValue(this) {      
      override def fileServer: RpcEnvFileServer = streamManager  
      override def openChannel(uri: String): ReadableByteChannel = {    
        val parsedUri = new URI(uri)    
        require(parsedUri.getHost() != null, "Host name must be defined.")
        require(parsedUri.getPort() > 0, "Port must be defined.")    
        require(parsedUri.getPath() != null && parsedUri.getPath().nonEmpty, "Path must be defined.")    
        val pipe = Pipe.open()    
        val source = new FileDownloadChannel(pipe.source())    
        try {      
          val client = downloadClient(parsedUri.getHost(), parsedUri.getPort())      
          val callback = new FileDownloadCallback(pipe.sink(), source, client)
          client.stream(parsedUri.getPath(), callback)    
        } catch {      
          case e: Exception =>  
            throw e    
      private def downloadClient(host: String, port: Int): TransportClient = {    
        if (fileDownloadFactory == null) 
          synchronized {      
            if (fileDownloadFactory == null) {        
              val module = "files"        
              val prefix = "spark.rpc.io."        
              val clone = conf.clone()        
              // Copy any RPC configuration that is not overridden in the spark.files namespace.        
              conf.getAll.foreach { 
                case (key, value) => 
                  if (key.startsWith(prefix)) {            
                    val opt = key.substring(prefix.length())
                    clone.setIfMissing(s"spark.$module.io.$opt", value)          
              val ioThreads = clone.getInt("spark.files.io.threads", 1)        
              val downloadConf = SparkTransportConf.fromSparkConf(clone, module, ioThreads)        
              val downloadContext = new TransportContext(downloadConf, new NoOpRpcHandler(), true)        
              fileDownloadFactory = downloadContext.createClientFactory(createClientBootstraps())      
        fileDownloadFactory.createClient(host, port)  
      private class FileDownloadChannel(source: ReadableByteChannel) extends ReadableByteChannel {    
        @volatile private var error: Throwable = _    
        def setError(e: Throwable): Unit = {      error = e      source.close()    }    
        override def read(dst: ByteBuffer): Int = {      
          Try(source.read(dst)) match {        
            case Success(bytesRead) => bytesRead        
            case Failure(readErr) =>          
              if (error != null) {            
               throw error          
              } else {            
                throw readErr          
          override def close(): Unit = source.close()    
          override def isOpen(): Boolean = source.isOpen()  
      private class FileDownloadCallback(sink: WritableByteChannel, source: FileDownloadChannel, client: TransportClient) extends StreamCallback {    
        override def onData(streamId: String, buf: ByteBuffer): Unit = {      
          while (buf.remaining() > 0) {        
        override def onComplete(streamId: String): Unit = {      
        override def onFailure(streamId: String, cause: Throwable): Unit = {      
          logDebug(s"Error downloading stream $streamId.", cause)


    1.1 Dispatcher介绍


    private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging {
      // Dispatcher的内部类,主要是声明一个
      private class EndpointData(val name: String,  val endpoint: RpcEndpoint,  val ref:   NettyRpcEndpointRef) {
        val inbox = new Inbox(ref, endpoint)  
      // 维护一个HaskMap,保存Name与EndpointData的关系
      private val endpoints = new ConcurrentHashMap[String, EndpointData]  
      // 维护一个HaskMap,保存RpcEndpoint与RpcEndpointRef的关系
      private val endpointRefs = new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]
      // Track the receivers whose inboxes may contain messages.  
      private val receivers = new LinkedBlockingQueue[EndpointData]  
      * True if the dispatcher has been stopped. Once stopped, all messages posted will be bounced immediately.   
      @GuardedBy("this")  private var stopped = false  
      // 根据Name和RPCEndpoint,在RpcEnv上进行注册
      def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
        val addr = RpcEndpointAddress(nettyEnv.address, name)
        val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
        synchronized {      
          if (stopped) {        
            throw new IllegalStateException("RpcEnv has been stopped")      
          if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) {
            throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")
          val data = endpoints.get(name)
          endpointRefs.put(data.endpoint, data.ref)
          // for the OnStart message
      def getRpcEndpointRef(endpoint: RpcEndpoint): RpcEndpointRef = endpointRefs.get(endpoint)
      def removeRpcEndpointRef(endpoint: RpcEndpoint): Unit = endpointRefs.remove(endpoint)  
      // Should be idempotent  private
      // 根据Name,取消其在NettyRpcEnv中注册的endpoint
      def unregisterRpcEndpoint(name: String): Unit = { 
        val   data = endpoints.remove(name)
        if (data != null) {
          // for the OnStop message    
        // Don't clean `endpointRefs` here because it's possible that some messages are being processed    
        // now and they can use `getRpcEndpointRef`. So `endpointRefs` will be cleaned in Inbox via    
        // `removeRpcEndpointRef`.  
      def stop(rpcEndpointRef: RpcEndpointRef): Unit = {    
        synchronized {      
          if (stopped) {
            // This endpoint will be stopped by Dispatcher.stop() method.        
      * Send a message to all registered [[RpcEndpoint]]s in this process.   
      * This can be used to make network events known to all end points (e.g. "a new node connected").   
      def postToAll(message: InboxMessage): Unit = {    
        val iter = endpoints.keySet().iterator()    
        while (iter.hasNext) {      
          val name = iter.next      
          postMessage(name, message, (e) => logWarning(s"Message $message dropped. ${e.getMessage}"))    
      /** Posts a message sent by a remote endpoint. */
      def postRemoteMessage(message: RequestMessage, callback: RpcResponseCallback): Unit = {
        val rpcCallContext =  new RemoteNettyRpcCallContext(nettyEnv, callback, message.senderAddress)
        val rpcMessage = RpcMessage(message.senderAddress, message.content, rpcCallContext)    
        postMessage(message.receiver.name, rpcMessage, (e) => callback.onFailure(e))  
      /** Posts a message sent by a local endpoint. */
      def postLocalMessage(message: RequestMessage, p: Promise[Any]): Unit = {    
        val rpcCallContext = new LocalNettyRpcCallContext(message.senderAddress, p)    
        val rpcMessage = RpcMessage(message.senderAddress, message.content, rpcCallContext)    
        postMessage(message.receiver.name, rpcMessage, (e) => p.tryFailure(e))  
      /** Posts a one-way message. */  
      def postOneWayMessage(message: RequestMessage): Unit = {
        postMessage(message.receiver.name, OneWayMessage(message.senderAddress, message.content),      (e) => throw e)  
      * Posts a message to a specific endpoint.   
      * @param endpointName name of the endpoint.   
      * @param message the message to post   
      * @param callbackIfStopped callback function if the endpoint is stopped.   
      private def postMessage(endpointName: String,  message: InboxMessage,  callbackIfStopped: (Exception) => Unit): Unit = {
        val error = synchronized { 
          // 根据endpointName获得对应的endpointData
          val data = endpoints.get(endpointName)
          if (stopped) {
            Some(new RpcEnvStoppedException())
          } else if (data == null) {
            Some(new SparkException(s"Could not find $endpointName."))
          } else {
        // We don't need to call `onStop` in the `synchronized` block
      def stop(): Unit = {    
        synchronized {      
          if (stopped) {        
          stopped = true    
        // Stop all endpoints. This will queue all endpoints for processing by the message loops.    
        // Enqueue a message that tells the message loops to stop.    receivers.offer(PoisonPill)    
      def awaitTermination(): Unit = {    
        threadpool.awaitTermination(Long.MaxValue, TimeUnit.MILLISECONDS)  
      * Return if the endpoint exists   
      def verify(name: String): Boolean = {    endpoints.containsKey(name)  }  
      /** Thread pool used for dispatching messages. */
      private val threadpool: ThreadPoolExecutor = {
        val numThreads = nettyEnv.conf.getInt("spark.rpc.netty.dispatcher.numThreads",      math.max(2, Runtime.getRuntime.availableProcessors()))
        val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "dispatcher-event-loop")
        for (i <- 0 until numThreads) {      
          pool.execute(new MessageLoop)    
      /** Message loop used for dispatching messages. */  
      private class MessageLoop extends Runnable {    
        override def run(): Unit = {      
          try {        
            while (true) {
              try {
                val data = receivers.take()
                //获取的元素如果是PoisonPill,将停止该线程,同时 将PoisonPill继续放回receivers中,以便停止所有线程
                if (data == PoisonPill) {
                // Put PoisonPill back so that other MessageLoops can see it.
              } catch {
                case NonFatal(e) => logError(e.getMessage, e)
          } catch {
            case ie: InterruptedException => // exit
      /** A poison endpoint that indicates MessageLoop should exit its message loop. */  
      private val PoisonPill = new EndpointData(null, null, null)}


    1.2 Inbox
    private[netty] class Inbox(val endpointRef: NettyRpcEndpointRef,  val endpoint: RpcEndpoint)  extends Logging {  
    inbox =>  
      // Give this an alias so we can use it more clearly in closures.
      // 声明一个InboxMessage类型的LinkedList,命名为message
      @GuardedBy("this")  protected val messages = new java.util.LinkedList[InboxMessage]()
      /** True if the inbox (and its associated endpoint) is stopped. */  
      @GuardedBy("this")  private var stopped = false  
      /** Allow multiple threads to process messages at the same time. */
      @GuardedBy("this")  private var enableConcurrent = false  
      /** The number of threads processing messages for this inbox. */
      @GuardedBy("this")  private var numActiveThreads = 0  
      // OnStart should be the first message to process
      inbox.synchronized {
      * Process stored messages.   
      def process(dispatcher: Dispatcher): Unit = {    
        var message: InboxMessage = null    
        inbox.synchronized {      
          if (!enableConcurrent && numActiveThreads != 0) {        
          message = messages.poll()
          if (message != null) {
            numActiveThreads += 1 
          } else {
        while (true) {      
          safelyCall(endpoint) {        
            message match {          
              case RpcMessage(_sender, content, context) =>  
                try {              
                  endpoint.receiveAndReply(context).applyOrElse[Any, Unit](content, { msg =>                
                    throw new SparkException(s"Unsupported message $message from ${_sender}")              
                } catch {              
                  case NonFatal(e) =>                
                    // Throw the exception -- this exception will be caught by the safelyCall function.  
                    // The endpoint's onError function will be called.                
                      throw e            
              case OneWayMessage(_sender, content) =>
                endpoint.receive.applyOrElse[Any, Unit](content, { msg =>              
                 throw new SparkException(s"Unsupported message $message from ${_sender}")            
              case OnStart =>            
                if (!endpoint.isInstanceOf[ThreadSafeRpcEndpoint]) {              
                  inbox.synchronized {                
                    if (!stopped) {                  
                      enableConcurrent = true               
              case OnStop =>            
                val activeThreads = inbox.synchronized { inbox.numActiveThreads }
                assert(activeThreads == 1,              
                  s"There should be only a single active thread but found $activeThreads threads.")            
                assert(isEmpty, "OnStop should be the last message")          
              case RemoteProcessConnected(remoteAddress) =>
              case RemoteProcessDisconnected(remoteAddress) =>
              case RemoteProcessConnectionError(cause, remoteAddress) =>
                endpoint.onNetworkError(cause, remoteAddress)        
          inbox.synchronized {        
            // "enableConcurrent" will be set to false after `onStop` is called, so we should check it  every time.        
            if (!enableConcurrent && numActiveThreads != 1) {          
              // If we are not the only one worker, exit          
              numActiveThreads -= 1          
            message = messages.poll()        
            if (message == null) {          
              numActiveThreads -= 1          
      def post(message: InboxMessage): Unit = inbox.synchronized {
        if (stopped) {      
          // We already put "OnStop" into "messages", so we should drop further messages
        } else {      
      def stop(): Unit = inbox.synchronized   {    
        // The following codes should be in `synchronized` so that we can make sure "OnStop" is the last    
        // message    
        if (!stopped) {      
          // We should disable concurrent here. Then when RpcEndpoint.onStop is called, it's the only      
          // thread that is processing messages. So `RpcEndpoint.onStop` can release its resources      
          // safely.      
          enableConcurrent = false      
          stopped = true      
          // Note: The concurrent events in messages will be processed one by one.    
      def isEmpty: Boolean = inbox.synchronized { messages.isEmpty }  
      * Called when we are dropping a message. Test cases override this to test message dropping.   
      * Exposed for testing.   
      protected def onDrop(message: InboxMessage): Unit = {    
        logWarning(s"Drop $message because $endpointRef is stopped")  
      * Calls action closure, and calls the endpoint's onError function in the case of exceptions.   
      private def safelyCall(endpoint: RpcEndpoint)(action: => Unit): Unit = {    
        try action catch {      
          case NonFatal(e) =>        
            try endpoint.onError(e) catch {          
              case NonFatal(ee) => logError(s"Ignoring error", ee)        


    1. 创建Dispatcher
    • 声明线程组,并监控receivers是否有新的EndpointData
      • 如果有消息,并且不为PoisonPill,调用相应EndpointData的Inbox的process方法进行消息处理
        1). 依次从相应的EndpointData的inbox的messages中获取第一个元素
        2). 匹配消息,并调用对应的endpoint的相应方法进行处理
      • 如果没有消息,则阻塞等待
      • 如果有消息,但是为PoisonPill,则将PoisonPill继续添加到receivers中,然后停止该线程
    1. 根据name和endpoint,在NettyRpcEnv进行注册
    • 根据nettyEnv.conf、RpcEndpointAddress和nettyEnv创建对应的NettyRpcEndpointRef
    • 根据name、endpoint、endpointRef创建新的EndpointData
    • 将name -> EndpointData添加到endpoints中
    • 将endpoint -> endpointRef添加到endpointRefs中
    • 将新建的EndpointData添加到receivers中
    1. 将InboxMessage消息分发到相应的EndpointData中进行处理
    • 根据Name获取EndpointData
    • 将Message添加到EndpointData的Inbox的messages中
    • 将EndpointData添加到receivers中


    1.3 NettyRpcEndpointRef

    private[netty] class NettyRpcEndpointRef( @transient private val conf: SparkConf,    endpointAddress: RpcEndpointAddress,    @transient @volatile private var nettyEnv: NettyRpcEnv)  extends RpcEndpointRef(conf) with Serializable with Logging { 
      @transient @volatile var client: TransportClient = _ 
      private val _address = if (endpointAddress.rpcAddress != null) endpointAddress else null
      private val _name = endpointAddress.name
      override def address: RpcAddress = if (_address != null) _address.rpcAddress else null
      private def readObject(in: ObjectInputStream): Unit = {    
        nettyEnv = NettyRpcEnv.currentEnv.value    
        client = NettyRpcEnv.currentClient.value  
      private def writeObject(out: ObjectOutputStream): Unit = {    
      override def name: String = _name  
      override def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T] = {
        nettyEnv.ask(RequestMessage(nettyEnv.address, this, message), timeout)  
      override def send(message: Any): Unit = {    
        require(message != null, "Message is null")
        nettyEnv.send(RequestMessage(nettyEnv.address, this, message))  
      override def toString: String = s"NettyRpcEndpointRef(${_address})"  
      def toURI: URI = new URI(_address.toString)  
      final override def equals(that: Any): Boolean = that match {    
        case other: NettyRpcEndpointRef => _address == other._address    
        case _ => false  
      final override def hashCode(): Int = if (_address == null) 0 else _address.hashCode()}

    至此,Spark RPC通信模块中的NettyRpcEnv、NettyRpcEndpoint、NettyRpcEndpointRef已经全部梳理完成。



