美文网首页spark
spark通信-源码分析

spark通信-源码分析

作者: 专职掏大粪 | 来源:发表于2021-09-18 14:54 被阅读0次

    driver的网络通信

    -SparkContext.SparkEnv
    private[spark] def env: SparkEnv = _env
    _env = createSparkEnv(_conf, isLocal, listenerBus)

    val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port.getOrElse(-1), conf,
    

    new NettyRpcEnvFactory().create(config)

    val nettyEnv =
    new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress,
    config.securityManager, config.numUsableCores)

    
    private[rpc] class NettyRpcEnvFactory extends RpcEnvFactory with Logging {
    
      def create(config: RpcEnvConfig): RpcEnv = {
      ... ...
        val nettyEnv =
          new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress,
            config.securityManager, config.numUsableCores)
        if (!config.clientMode) {
          val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
            nettyEnv.startServer(config.bindAddress, actualPort)
            (nettyEnv, nettyEnv.address.port)
          }
          try {
            //使用端口启动服务
            Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1
          } catch {
            ... ...
          }
        }
        nettyEnv
      }
    }
    

    -nettyEnv.startServer(config.bindAddress, actualPort)

    def startServer(bindAddress: String, port: Int): Unit = {
       ...  ...
        server = transportContext.createServer(bindAddress, port, bootstraps)
       //注册通信终端
        dispatcher.registerRpcEndpoint(
          RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))
      }
    

    RpcEndpoint 是用作做接受数据的receive*

    def receive: PartialFunction[Any, Unit] = {
       case _ => throw new SparkException(self + " does not implement 'receive'")
     }
    
     def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
       case _ => context.sendFailure(new SparkException(self + " won't reply anything"))
     }
    

    RpcEndpointRef 是用来做发送的send or ask

     final def self: RpcEndpointRef = {
        require(rpcEnv != null, "rpcEnv has not been initialized")
        rpcEnv.endpointRef(this)
      }
    

    注意的是RpcEndpoint 还有一个收件箱,
    inbox message list概念

    
    def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
        val addr = RpcEndpointAddress(nettyEnv.address, name)
        val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
        synchronized {
          if (stopped) {
            throw new IllegalStateException("RpcEnv has been stopped")
          }
          if (endpoints.containsKey(name)) {
            throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")
          }
    
          // This must be done before assigning RpcEndpoint to MessageLoop, as MessageLoop sets Inbox be
          // active when registering, and endpointRef must be put into endpointRefs before onStart is
          // called.
          endpointRefs.put(endpoint, endpointRef)
         //收件箱
          var messageLoop: MessageLoop = null
          try {
            messageLoop = endpoint match {
              case e: IsolatedRpcEndpoint =>
                new DedicatedMessageLoop(name, e, this)
              case _ =>
                sharedLoop.register(name, endpoint)
                sharedLoop
            }
            endpoints.put(name, messageLoop)
          } catch {
            case NonFatal(e) =>
              endpointRefs.remove(endpoint)
              throw e
          }
        }
        endpointRef
      }
    

    private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)
    //发件箱 根据RpcAddress会有多个
    private val outboxes = new ConcurrentHashMapRpcAddress, Outbox

    -transportContext.createServer(bindAddress, port, bootstraps)
    -new TransportServer(this, host, port, rpcHandler, bootstraps)
    //使用ipport启动服务
    -TransportServer.init

    bootstrap = new ServerBootstrap()
         .group(bossGroup, workerGroup)
         .channel(NettyUtils.getServerChannelClass(ioMode))
         .option(ChannelOption.ALLOCATOR, pooledAllocator)
         .option(ChannelOption.SO_REUSEADDR, !SystemUtils.IS_OS_WINDOWS)
         .childOption(ChannelOption.ALLOCATOR, pooledAllocator);
    
    //NettyUtils
     /** Returns the correct ServerSocketChannel class based on IOMode. */
     public static Class<? extends ServerChannel> getServerChannelClass(IOMode mode) {
       switch (mode) {
         case NIO:
           return NioServerSocketChannel.class;
         case EPOLL:
           //epoll方式模拟aio 异步io
           return EpollServerSocketChannel.class;
         default:
           throw new IllegalArgumentException("Unknown io mode: " + mode);
       }
     }
    

    excutor的 网络通信

    CoarseGrainedExecutorBackend[object]
    -val env = SparkEnv.createExecutorEnv
    -SparkEnv.create
    -val rpcEnv = RpcEnv.create

     env.rpcEnv.setupEndpoint("Executor",
            backendCreateFn(env.rpcEnv, arguments, env, cfg.resourceProfile))
    

    操作流程同driver ...

    相关文章

      网友评论

        本文标题:spark通信-源码分析

        本文链接:https://www.haomeiwen.com/subject/pmtygltx.html