美文网首页Spark
Spark源码分析之Master的启动流程

Spark源码分析之Master的启动流程

作者: 叫我不矜持 | 来源:发表于2019-06-13 21:07 被阅读0次

    准备

    本文主要对Master的启动流程源码进行分析。Spark源码版本为2.3.1。

    阅读源码首先从启动脚本入手,看看首先加载的是哪个类,我们看一下start-master.sh启动脚本中的具体内容。

    脚本代码

    可以看到这里加载的类是org.apache.spark.deploy.master.Master,好那我们的源码寻觅之旅就从这开始...

    源码分析

    打开源码,我们发现Master是伴生关系的一组类,我们直接定位到Master的main函数

    //主方法
      def main(argStrings: Array[String]) {
        Thread.setDefaultUncaughtExceptionHandler(new SparkUncaughtExceptionHandler(
          exitOnUncaughtException = false))
        Utils.initDaemon(log)
        val conf = new SparkConf
        val args = new MasterArguments(argStrings, conf)
        /**
          * 创建RPC 环境和Endpoint (RPC 远程过程调用),在Spark中 Driver, Master ,Worker角色都有各自的Endpoint,相当于各自的通信邮箱。
          *
          */
        val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)
        rpcEnv.awaitTermination()
      }
    

    发现该函数除了做了一些配置文件和args参数的准备之外,调用了startRpcEnvAndEndpoint函数,我们跟进去看看

    /**
       * Start the Master and return a three tuple of:
       *   (1) The Master RpcEnv
       *   (2) The web UI bound port
       *   (3) The REST server bound port, if any
       */
      def startRpcEnvAndEndpoint(
          host: String,
          port: Int,
          webUiPort: Int,
          conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
        val securityMgr = new SecurityManager(conf)
    
        /**
          * 创建RPC(Remote Procedure Call )环境  ,Remote Procedure Call
          * 这里只是创建准备好Rpc的环境,后面会向RpcEnv中注册 角色【Driver,Master,Worker,Executor】
          */
        val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
       .....
      }
    

    首先上面这段代码通过RpcEnv 构建了一个RPC通信环境,为之后的RPC通信做准备,Spark底层的通信是基于Netty的NIO模型,
    这里每个Rpc端点运行时依赖的上下文环境称之为RpcEnv。

    接下来我们直接跟踪到create方法中

    def create(
          name: String,
          bindAddress: String,
          advertiseAddress: String,
          port: Int,
          conf: SparkConf,
          securityManager: SecurityManager,
          numUsableCores: Int,
          clientMode: Boolean): RpcEnv = {
        val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port, securityManager,
          numUsableCores, clientMode)
        //创建NettyRpc 环境
        new NettyRpcEnvFactory().create(config)
      }
    

    可以看到,这里构建了NettyRpcEnvFactory来创建NettyRpc 环境,NettyRpcEnvFactory的create函数又做了那些事呢?

    def create(config: RpcEnvConfig): RpcEnv = {
        val sparkConf = config.conf
        // Use JavaSerializerInstance in multiple threads is safe. However, if we plan to support
        // KryoSerializer in future, we have to use ThreadLocal to store SerializerInstance
        val javaSerializerInstance =
          new JavaSerializer(sparkConf).newInstance().asInstanceOf[JavaSerializerInstance]
        /**
          * 创建nettyRPC通信环境。
          * 当new NettyRpcEnv时会做一些初始化:
          *   Dispatcher:这个对象中有存放消息的队列和消息的转发
          *   TransportContext:可以创建了NettyRpcHandler
          */
        val nettyEnv =
          new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress,
            config.securityManager, config.numUsableCores)
        if (!config.clientMode) {
          //启动nettyRPCEnv
          val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
            nettyEnv.startServer(config.bindAddress, actualPort)
            (nettyEnv, nettyEnv.address.port)
          }
          try {
            //以上  startNettyRpcEnv 匿名函数在此处会最终被调用,当匿名函数被调用时,重点方法是483行 nettyEnv.startServer 方法
            Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1
          } catch {
            case NonFatal(e) =>
              nettyEnv.shutdown()
              throw e
          }
        }
        nettyEnv
      }
    

    可以看到上面首先构建了一个序列化的实例对象,然后开始着手构架nettyRPC通信环境。在new NettyRpcEnv()时会做一些初始化的工作,如下所示

     /**
        * dispatcher 这个对象中有消息队列和消息的循环获取转发
        */
      private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)
    
     /**
        * TransportContext 中会创建 NettyPpcHandler
        * TransportContext 这个对象中参数类型 RpcHandler  就是这里的 NettyRpcHandler
        */
      private val transportContext = new TransportContext(transportConf,
        new NettyRpcHandler(dispatcher, this, streamManager))
    

    这里的Dispatcher是一个消息分发器,针对于RPC端点需要发送消息或者从远程RPC接收到的消息,分发至对应的指令收件箱/发件箱。

    这里的transportContext 是构建传输的上下文环境,用于创建TransportServer和TransportClientFactory同时,通过TransportChannelHandler来设置Netty的Channel pipelines。

    下面我们回到NettyRpcEnvFactory的create方法

    val nettyEnv =
          new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress,
            config.securityManager, config.numUsableCores)
        if (!config.clientMode) {
          //启动nettyRPCEnv
          val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
            nettyEnv.startServer(config.bindAddress, actualPort)
            (nettyEnv, nettyEnv.address.port)
          }
          try {
            //以上  startNettyRpcEnv 匿名函数在此处会最终被调用,当匿名函数被调用时,重点方法是483行 nettyEnv.startServer 方法
            Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1
          } catch {
            case NonFatal(e) =>
              nettyEnv.shutdown()
              throw e
          }
        }
    

    nettyEnv 准备好了之后,会构建一个函数startNettyRpcEnv,然后startServiceOnPort在会调用startNettyRpcEnv,进而调用nettyEnv的startServer函数,来启动Server

    //启动Rpc 服务
      def startServer(bindAddress: String, port: Int): Unit = {
        val bootstraps: java.util.List[TransportServerBootstrap] =
          if (securityManager.isAuthenticationEnabled()) {
            java.util.Arrays.asList(new AuthServerBootstrap(transportConf, securityManager))
          } else {
            java.util.Collections.emptyList()
          }
    
        /**
          * transportContext已经被创建,这里createServer 就会绑定地址和端口,启动Netty Rpc 服务
          */
        server = transportContext.createServer(bindAddress, port, bootstraps)
        dispatcher.registerRpcEndpoint(
          RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))
      }
    

    由于直接初始化时transportContext就已经被创建,这里createServer 就会绑定地址和端口,启动Netty Rpc 服务,createServer最在构建TransportServer实例时,会调用init初始化方法,在这里以前了解过Netty的同学就会非常熟悉了.

    private void init(String hostToBind, int portToBind) {
    
        IOMode ioMode = IOMode.valueOf(conf.ioMode());
        EventLoopGroup bossGroup =
          NettyUtils.createEventLoop(ioMode, conf.serverThreads(), conf.getModuleName() + "-server");
        EventLoopGroup workerGroup = bossGroup;
    
        PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator(
          conf.preferDirectBufs(), true /* allowCache */, conf.serverThreads());
    
        bootstrap = new ServerBootstrap()
          .group(bossGroup, workerGroup)
          .channel(NettyUtils.getServerChannelClass(ioMode))
          .option(ChannelOption.ALLOCATOR, allocator)
          .childOption(ChannelOption.ALLOCATOR, allocator);
    
        this.metrics = new NettyMemoryMetrics(
          allocator, conf.getModuleName() + "-server", conf);
    
        if (conf.backLog() > 0) {
          bootstrap.option(ChannelOption.SO_BACKLOG, conf.backLog());
        }
    
        if (conf.receiveBuf() > 0) {
          bootstrap.childOption(ChannelOption.SO_RCVBUF, conf.receiveBuf());
        }
    
        if (conf.sendBuf() > 0) {
          bootstrap.childOption(ChannelOption.SO_SNDBUF, conf.sendBuf());
        }
    
        bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
          @Override
          protected void initChannel(SocketChannel ch) {
            RpcHandler rpcHandler = appRpcHandler;
            for (TransportServerBootstrap bootstrap : bootstraps) {
              rpcHandler = bootstrap.doBootstrap(ch, rpcHandler);
            }
            //初始化网络通信管道
            context.initializePipeline(ch, rpcHandler);
          }
        });
    
        InetSocketAddress address = hostToBind == null ?
            new InetSocketAddress(portToBind): new InetSocketAddress(hostToBind, portToBind);
        channelFuture = bootstrap.bind(address);
        channelFuture.syncUninterruptibly();
    
        port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();
        logger.debug("Shuffle server started on port: {}", port);
    }
    

    init 方法就是去初始化一个绑定的主机和端口,创建nettyRPC通信,通过之前的rpcHandler来初始化网络通信管道,同时会调用createChannelHandler函数,创建处理消息的 channelHandler用于处理客户端请求消息和服务端回应消息

    private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler rpcHandler) {
        TransportResponseHandler responseHandler = new TransportResponseHandler(channel);
        TransportClient client = new TransportClient(channel, responseHandler);
        TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client,
          rpcHandler, conf.maxChunksBeingTransferred());
        /**
         *   由以上  responseHandler   client    requestHandler  三个handler构建 TransportChannelHandler
         *   new TransportChannelHandler 这个对象中有 【channelRead() 方法】,用于读取接收到的消息
         */
        return new TransportChannelHandler(client, responseHandler, requestHandler,
          conf.connectionTimeoutMs(), closeIdleConnections);
      }
    

    最终Rpc的环境就准备好了,后面会向RpcEnv中注册 角色 Driver,Master,Worker,Executor。我们回到Master的startRpcEnvAndEndpoint函数

     /**
          * 向RpcEnv 中 注册Master
          *
          * rpcEnv.setupEndpoint(name,new Master)
          * 这里new Master 的Master 是一个伴生类,继承了 ThreadSafeRpcEndpoint,归根结底继承到了 Trait 接口  RpcEndpoint
        val masterEndpoint: RpcEndpointRef = rpcEnv.setupEndpoint(ENDPOINT_NAME,
          new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
        val portsResponse = masterEndpoint.askSync[BoundPortsResponse](BoundPortsRequest)
        (rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
    

    这里会向RpcEnv中注册Master,调用了setupEndpoint函数,传入了Master的实例对象,这里的Master 是一个伴生类,继承了 ThreadSafeRpcEndpoint,也是RpcEndpoint的实现类,而什么是RpcEndpoint?

    RpcEndpoint:RPC端点 ,Spark针对于每个节点(Client/Master/Worker)都称之一个Rpc端点 ,且都实现RpcEndpoint接口,内部根据不同端点的需求,设计不同的消息和不同的业务处理,如果需要发送(询问)则调用Dispatcher。

    EndPoint中存在

    • onstart() :启动当前Endpoint
    • receive() :负责收消息
    • receiveAndReply():接受消息并回复

    同时Endpoint 还有各自的引用,方便其他Endpoint发送消息,直接引用对方的EndpointRef 即可找到对方的Endpoint,上面源码中的masterEndpoint 就是Master的Endpoint引用 RpcEndpointRef 。

    RpcEndpointRef中存在

    • send():发送消息
    • ask() :请求消息,并等待回应。

    接下来我们看看NettyRpcEnv中的setupEndpoint函数

    override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = {
        dispatcher.registerRpcEndpoint(name, endpoint)
      }
    

    上面直接调用了dispatcher实例,注册RpcEndpoint

    def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
        val addr = RpcEndpointAddress(nettyEnv.address, name)
        val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
        synchronized {
          if (stopped) {
            throw new IllegalStateException("RpcEnv has been stopped")
          }
          //这里 new EndpointData使用到了endpoint,当new Inbox 时向消息队列中放入OnStart样例类标识
          if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) {
            throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")
          }
          //获取刚刚封装的EndPointData
          val data: EndpointData = endpoints.get(name)
          endpointRefs.put(data.endpoint, data.ref)
    
          /**
            * receivers 这个消息队列中放着应该去哪个Endpoint 中获取Message 处理
            * 这里其实就是进入 Dispatcher 当前这个类中的 MessageLoop 方法。这个方法当new Dispatcher后会一直运行。
            * 将消息放入待处理的消息队列中,消息首先找到对应的Endpoint ,再会获取当前Endpoint的Inbox 中message,使用process 方法处理
            */
          receivers.offer(data)  // for the OnStart message
        }
        endpointRef
      }
    

    上面的new EndpointData使用到了endpoint,EndpointData其实就是对endpoint和endpointRef的封装,同时内部还构建了Inbox

    private class EndpointData(
          val name: String,
          val endpoint: RpcEndpoint,
          val ref: NettyRpcEndpointRef) {
        //将endpoint封装到Inbox中
        val inbox = new Inbox(ref, endpoint)
      }
    

    这里的Inbox是其实就是消息收件箱,一个本地端点对应一个收件箱,Dispatcher在每次向Inbox存入消息时,都将对应EndpointData加入内部待Receiver Queue中,另外Dispatcher创建时会启动一个单独线程进行轮询Receiver Queue,进行收件箱消息消费。

    其实Inbox实例在构建过程中也会有消息的存入。

    private[netty] class Inbox(
        val endpointRef: NettyRpcEndpointRef,
        val endpoint: RpcEndpoint)
      extends Logging {
    ....
    @GuardedBy("this")
      protected val messages = new java.util.LinkedList[InboxMessage]()
    
      // OnStart should be the first message to process
      //当注册endpoint时都会调用这个异步方法,messags中放入一个OnStart样例类消息对象
      inbox.synchronized {
        messages.add(OnStart)
      }
    ....
    

    可以看到,构建inbox时会向messages中加入一个OnStart消息,该消息会被inbox类中的process所消费,只不过触发时机还在后面。

    回到Dispatcher的registerRpcEndpoint函数

          if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) {
            throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")
          }
          //获取刚刚封装的EndPointData
          val data: EndpointData = endpoints.get(name)
          endpointRefs.put(data.endpoint, data.ref)
    
          /**
            * receivers 这个消息队列中放着应该去哪个Endpoint 中获取Message 处理
            * 这里其实就是进入 Dispatcher 当前这个类中的 MessageLoop 方法。这个方法当new Dispatcher后会一直运行。
            * 将消息放入待处理的消息队列中,消息首先找到对应的Endpoint ,再会获取当前Endpoint的Inbox 中message,使用process 方法处理
            */
          receivers.offer(data)  // for the OnStart message
        }
        endpointRef
    

    这里发现刚才构建的EndpointData会被放入到endpoints中,endpoints和endpointRefs的类型都是ConcurrentHashMap,对这两个容器进行相应的操作之后,就会调用receivers.offer(data)。

    receivers是Dispatcher中的一个消息队列,这个消息队列还有一个线程池来进行消息的消费,整个模式构成一个生产者消费者模式,因此这里将data消息加入到消息队列中,会触发线程消费。

      private val threadpool: ThreadPoolExecutor = {
        val availableCores =
          if (numUsableCores > 0) numUsableCores else Runtime.getRuntime.availableProcessors()
        val numThreads = nettyEnv.conf.getInt("spark.rpc.netty.dispatcher.numThreads",
          math.max(2, availableCores))
        val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "dispatcher-event-loop")
        for (i <- 0 until numThreads) {
          pool.execute(new MessageLoop)
        }
        pool
      }
    
    /** Message loop used for dispatching messages. */
      private class MessageLoop extends Runnable {
        override def run(): Unit = {
          try {
            while (true) {
              try {
                //take 出来消息一直处理
                val data: EndpointData = receivers.take()
                if (data == PoisonPill) {
                  // Put PoisonPill back so that other MessageLoops can see it.
                  receivers.offer(PoisonPill)
                  return
                }
                //调用process 方法处理消息
                data.inbox.process(Dispatcher.this)
              } catch {
                case NonFatal(e) => logError(e.getMessage, e)
              }
            }
          } catch {
            case ie: InterruptedException => // exit
          }
        }
      }
    

    可以看到上面的MessageLoop中,通过消息队列receivers取出一条消息,该消息的类型为EndpointData,内部都封装了inbox实例,因此直接调用该实例的process函数去处理消息,我们之前在inbox实例构建过程中,发现会向inbox内部的消息容器中放入一条onStart消息,因此我们看一下process函数是如何处理该消息的。

    def process(dispatcher: Dispatcher): Unit = {
    
    case OnStart =>
                //调用Endpoint 的onStart函数
                endpoint.onStart()
                if (!endpoint.isInstanceOf[ThreadSafeRpcEndpoint]) {
                  inbox.synchronized {
                    if (!stopped) {
                      enableConcurrent = true
                    }
                  }
                }
    }
    

    process中对onStart的处理为,调用endpoint的onStart函数,而endpoint我们还记得是啥吗,这里是我们直接构建的Master实例,因此我们回到Master中去看看onStart()函数的处理过程。

    override def onStart(): Unit = {
    .....
     val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
          case "ZOOKEEPER" =>
            logInfo("Persisting recovery state to ZooKeeper")
            val zkFactory =
              new ZooKeeperRecoveryModeFactory(conf, serializer)
            (zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
          case "FILESYSTEM" =>
            val fsFactory =
              new FileSystemRecoveryModeFactory(conf, serializer)
            (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
          case "CUSTOM" =>
            val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
            val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
              .newInstance(conf, serializer)
              .asInstanceOf[StandaloneRecoveryModeFactory]
            (factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
          case _ =>
            (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
        }
        persistenceEngine = persistenceEngine_
        leaderElectionAgent = leaderElectionAgent_
    }
    

    onStart函数涉及到webUI的启动,applicationMetricsSystem和masterMetricsSystem的启动过程,如果设置了restServer还涉及到启动过程。

    同时Master的onStart函数的部分代码如上所示,我么可以看到,这里涉及到如何在HA的环境下选主的过程。内部会根据配置决定采用哪种方式,是ZOOKEEPER还是文件系统的方式来进行。

    生产环境下一般采用Zookeeper做HA,Zookeeper会自动化管理 Master的切换;

    采用Zookeeper做HA的时候,Zookeeper会负责保存整个Spark集群运行时候的元数据:workers、Drivers、Applications、Executors;

    Zookeeper遇到当前Active级别的Master出现故障的时候会从StandbyMaster中选取一台作为Active Master,但是要注意,被选举后到成为真正的ActiveMaster之间需要从Zookeeper中获取集群当前运行状态的元数据信息并进行恢复;

    在Master切换的过程中,所有的已经在运行的程序皆正常运行!因为Spark Application在运行前就已经通过ClusterManager获得了计算资源,所以在运行时Job本身的调度和处理和Master是没有任何关系的!

    在Master的切换过程中唯一的影响是不能提交新的job:一方面不能提交新的应用程序给集群,因为只有ActiveMaster才能接受新的程序的提交请求;另外一方面,已经运行的程序中也不能够因为Action操作触发新的Job的提交请求;

    相关文章

      网友评论

        本文标题:Spark源码分析之Master的启动流程

        本文链接:https://www.haomeiwen.com/subject/cbjefctx.html