Spark 源码阅读 1

作者: Avanpourm | 来源:发表于2016-07-07 00:42 被阅读784次

    最近对音视频转码系统进行重构,尝试使用spark作分布式并发转码任务框架。对于不熟悉的事物,使用起来毕竟心里没底。所以便有了这次源码的阅读。

    Master 启动过程

    master的启动命令是:

    ./sbin/start-master.sh
    

    于是我们从这个脚本出发。开始跟踪Spark的启动流程。
    我们只抓主线,其它一些支节先忽略,先了解整体流程。
    阅读start-master.sh 发现实际执行语句为:

    ${SPARK_HOME}/sbin"/spark-daemon.sh start $CLASS 1 \
     --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port  $SPARK_MASTER_WEBUI_PORT \
     $ORIGINAL_ARGS
    

    其中CLASS为:

    \# NOTE: This exact class name is matched downstream by SparkSubmit.
    \# Any changes need to be reflected there.
    CLASS="org.apache.spark.deploy.master.Master"
    
    • 其它参数主要是端口信息有一些启动参数,可以先忽略。
      精简为:
    spark-daemon.sh start  org.apache.spark.deploy.master.Master 1
    

    查看/spark-daemon.sh
    关键语句为:

    nohup nice -n "$SPARK_NICENESS" "${SPARK_HOME}"/bin/spark-class $command "$@" >> "$log" 2>&1 < /dev/null
    

    其中command 为start
    查看:/bin/spark-class
    找到真正入口:

    CMD=()                       
    while IFS= read -d '' -r ARG; do
      CMD+=("$ARG")              
    done < <("$RUNNER" -cp "$LAUNCH_CLASSPATH"   org.apache.spark.launcher.Main "$@")
    exec "${CMD[@]}"             
    

    $RUNNER$LAUNCH_CLASSPATH 分别是java 路径及类路径。
    实际调用:org.apache.spark.launcher.Main 生成java命令重定向输入到$CMD中,并使用exec执行$CMD。在$CMD中主要执行类为上面提到的**org.apache.spark.deploy.master.Master **
    到这里找到程序的实际真正入口:

    org.apache.spark.deploy.master.Master
    

    文件所在位置:

    core/src/main/scala/org/apache/spark/deploy/master/Master.scala

    入口函数:private[deploy] object Master extends Logging
    如下:

    private[deploy] object Master extends Logging {
      val SYSTEM_NAME = "sparkMaster"
      val ENDPOINT_NAME = "Master"
    
      def main(argStrings: Array[String]) {
        SignalLogger.register(log)
        val conf = new SparkConf
        val args = new MasterArguments(argStrings, conf)
        val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)
    rpcEnv.awaitTermination()
      }
    
      /**
       * Start the Master and return a three tuple of:
       *   (1) The Master RpcEnv
       *   (2) The web UI bound port
       *   (3) The REST server bound port, if any
       */
      def startRpcEnvAndEndpoint(
          host: String,
          port: Int,
          webUiPort: Int,
          conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
        val securityMgr = new SecurityManager(conf)
        val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)//创建rpcEnv使用Netty
        val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
      new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
    val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)
    (rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
      }
    }
    

    这里主要是创建了一个rpcEnv,并将master数作为一个endpoint注入其中。
    跟入: RpcEnv.create

      def create(
          name: String,
          host: String,
          port: Int,
          conf: SparkConf,
          securityManager: SecurityManager,
          clientMode: Boolean = false): RpcEnv = {
        // Using Reflection to create the RpcEnv to avoid to depend on Akka directly
        val config = RpcEnvConfig(conf, name, host, port, securityManager, clientMode)
        getRpcEnvFactory(conf).create(config)
      }
    

    这里使用了getRpcEnvFactory(conf).create(config) 创建一个rpcEnv返回。

      private def getRpcEnvFactory(conf: SparkConf): RpcEnvFactory = {
        val rpcEnvNames = Map(
          "akka" -> "org.apache.spark.rpc.akka.AkkaRpcEnvFactory",
          "netty" -> "org.apache.spark.rpc.netty.NettyRpcEnvFactory")
        val rpcEnvName = conf.get("spark.rpc", "netty")
        val rpcEnvFactoryClassName = rpcEnvNames.getOrElse(rpcEnvName.toLowerCase, rpcEnvName)
        Utils.classForName(rpcEnvFactoryClassName).newInstance().asInstanceOf[RpcEnvFactory]
      }
    

    实际使用中,我们使用了netty作为异步NIO框架。故这里使用的是
    org.apache.spark.rpc.netty.NettyRpcEnvFactory
    工厂类用于生成 rpcEnv
    路径:

    core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala

    看一下这个工厂类。create方法。

    private[netty] class NettyRpcEnvFactory extends RpcEnvFactory with Logging {
    
      def create(config: RpcEnvConfig): RpcEnv = {
        val sparkConf = config.conf
        // Use JavaSerializerInstance in multiple threads is safe. However, if we plan to support
        // KryoSerializer in future, we have to use ThreadLocal to store SerializerInstance
        val javaSerializerInstance =
          new JavaSerializer(sparkConf).newInstance().asInstanceOf[JavaSerializerInstance]
        val nettyEnv =
          new NettyRpcEnv(sparkConf, javaSerializerInstance, config.host, config.securityManager)
        if (!config.clientMode) {
          val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
            nettyEnv.startServer(actualPort)
            (nettyEnv, nettyEnv.address.port)
          }
          try {
            Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1
          } catch {
            case NonFatal(e) =>
              nettyEnv.shutdown()
              throw e
          }
        }
        nettyEnv
      }
    }
    

    rpcEnv的实现是NettyRpcEnv

    使用

     Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1
    

    启动服务: nettyEnv.startServer(actualPort)

      def startServer(port: Int): Unit = {
        val bootstraps: java.util.List[TransportServerBootstrap] =
          if (securityManager.isAuthenticationEnabled()) {
            java.util.Arrays.asList(new SaslServerBootstrap(transportConf, securityManager))
          } else {
            java.util.Collections.emptyList()
          }
        server = transportContext.createServer(host, port, bootstraps)
        dispatcher.registerRpcEndpoint(
          RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))
      }
    

    回到Master.scala startRpcEnvAndEndpoint中

      val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
      new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
    

    将Master注册进入rpcEnv中获得masterEndpoint
    Netty中通过dispatcher派发消息
    我们进入Dispatcher.scala定位到消息派发函数:

      /** Message loop used for dispatching messages. */
      private class MessageLoop extends Runnable {
        override def run(): Unit = {
          try {
            while (true) {
              try {
                val data = receivers.take()
                if (data == PoisonPill) {
                  // Put PoisonPill back so that other MessageLoops can see it.
                  receivers.offer(PoisonPill)
                  return
                }
                data.inbox.process(Dispatcher.this)
              } catch {
                case NonFatal(e) => logError(e.getMessage, e)
              }
            }
          } catch {
            case ie: InterruptedException => // exit
          }
        }
      }
    

    消息通过

     data.inbox.process(Dispatcher.this) 
    

    处理
    跟入:

    core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala

    定位:

       /**
       * Process stored messages.
       */
      def process(dispatcher: Dispatcher): Unit = {
        var message: InboxMessage = null
        inbox.synchronized {
          if (!enableConcurrent && numActiveThreads != 0) {
            return
          }
          message = messages.poll()
          if (message != null) {
            numActiveThreads += 1
          } else {
            return
          }
        }
        while (true) {
          safelyCall(endpoint) {
            message match {
              case RpcMessage(_sender, content, context) =>
                try {
                  endpoint.receiveAndReply(context).applyOrElse[Any, Unit](content, { msg =>
                    throw new SparkException(s"Unsupported message $message from ${_sender}")
                  })
                } catch {
                  case NonFatal(e) =>
                    context.sendFailure(e)
                    // Throw the exception -- this exception will be caught by the safelyCall function.
                    // The endpoint's onError function will be called.
                    throw e
                }
    
              case OneWayMessage(_sender, content) =>
                endpoint.receive.applyOrElse[Any, Unit](content, { msg =>
                  throw new SparkException(s"Unsupported message $message from ${_sender}")
                })
    
              case OnStart =>
                endpoint.onStart()
                if (!endpoint.isInstanceOf[ThreadSafeRpcEndpoint]) {
                  inbox.synchronized {
                    if (!stopped) {
                      enableConcurrent = true
                    }
                  }
                }
    
              case OnStop =>
                val activeThreads = inbox.synchronized { inbox.numActiveThreads }
                    assert(activeThreads == 1,
                  s"There should be only a single active thread but found $activeThreads threads.")
                dispatcher.removeRpcEndpointRef(endpoint)
                endpoint.onStop()
                assert(isEmpty, "OnStop should be the last message")
    
              case RemoteProcessConnected(remoteAddress) =>
                endpoint.onConnected(remoteAddress)
    
              case RemoteProcessDisconnected(remoteAddress) =>
                endpoint.onDisconnected(remoteAddress)
    
              case RemoteProcessConnectionError(cause, remoteAddress) =>
                endpoint.onNetworkError(cause, remoteAddress)
            }
          }
    
          inbox.synchronized {
            // "enableConcurrent" will be set to false after `onStop` is called, so we should check it
            // every time.
            if (!enableConcurrent && numActiveThreads != 1) {
              // If we are not the only one worker, exit
              numActiveThreads -= 1
              return
            }
            message = messages.poll()
            if (message == null) {
              numActiveThreads -= 1
              return
            }
          }
        }
      }
    

    可看出:
    启动时调用了:

    endpoint.onStart()
    

    启动后提供rpc调用,并通过receiveAndReply处理:

    endpoint.receiveAndReply
    

    这里endpoint 为我们的 Master
    到Master中查看这两个函数。

    • 先看onStart()
      override def onStart(): Unit = {
        logInfo("Starting Spark master at " + masterUrl)
        logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
        webUi = new MasterWebUI(this, webUiPort)
        webUi.bind()
        masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort
        checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable {
          override def run(): Unit = Utils.tryLogNonFatalError {
            self.send(CheckForWorkerTimeOut)
          }
        }, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
    
        if (restServerEnabled) {
          val port = conf.getInt("spark.master.rest.port", 6066)
          restServer = Some(new StandaloneRestServer(address.host, port, conf, self, masterUrl))
        }
        restServerBoundPort = restServer.map(_.start())
    
        masterMetricsSystem.registerSource(masterSource)
        masterMetricsSystem.start()
        applicationMetricsSystem.start()
        // Attach the master and app metrics servlet handler to the web ui after the metrics systems are
        // started.
        masterMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
        applicationMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
    
        val serializer = new JavaSerializer(conf)
        val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
          case "ZOOKEEPER" =>
            logInfo("Persisting recovery state to ZooKeeper")
            val zkFactory =
              new ZooKeeperRecoveryModeFactory(conf, serializer)
            (zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
          case "FILESYSTEM" =>
            val fsFactory =
              new FileSystemRecoveryModeFactory(conf, serializer)
            (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
          case "CUSTOM" =>
            val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
            val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
              .newInstance(conf, serializer)
              .asInstanceOf[StandaloneRecoveryModeFactory]
            (factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
          case _ =>
            (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
        }
        persistenceEngine = persistenceEngine_
        leaderElectionAgent = leaderElectionAgent_
      }
    

    主要动作是启动了web ui界面,启动了监控,设置了master的高可用。

    • 再看另一函数:receiveAndReply
      这个是master的主要工作函数。

    首先其分为多个case项。
    先看第一个。

    case RegisterWorker
    

    这个主要是当有新worker启动时,worker的注册函数。
    看一下主体部分:

    //创建worker信息类

     val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
              workerRef, workerUiPort, publicAddress)
     if (registerWorker(worker)) {
        //注册worker
       persistenceEngine.addWorker(worker)
       context.reply(RegisteredWorker(self, masterWebUiUrl))
       schedule() //重新调度,平衡集群
     } 
    
    • 重点看一下schedule()
     /**
       * Schedule the currently available resources among waiting apps. This method will be called
       * every time a new app joins or resource availability changes.
       */
      private def schedule(): Unit = {
        if (state != RecoveryState.ALIVE) {
          return
        }
        // Drivers take strict precedence over executors
        val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
        val numWorkersAlive = shuffledAliveWorkers.size
        var curPos = 0
        for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
          // We assign workers to each waiting driver in a round-robin fashion. For each driver, we
          // start from the last worker that was assigned a driver, and continue onwards until we have
          // explored all alive workers.
          var launched = false
          var numWorkersVisited = 0
          while (numWorkersVisited < numWorkersAlive && !launched) {
            val worker = shuffledAliveWorkers(curPos)
            numWorkersVisited += 1
            if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
              launchDriver(worker, driver)
              waitingDrivers -= driver
              launched = true
            }
            curPos = (curPos + 1) % numWorkersAlive
          }
        }
        startExecutorsOnWorkers()
      }
    

    整个调度过程还是比较简单的。
    首先取出workers集合状态为alive的worker
    然后遍历driver等待队列,将driver 加载到满足资源要求的worker中。
    最后遍历Apps等待队列,过滤出可用的wokers,apps并发度没达到预设值时,将app放到对应的worker中,增加app并发度。

    这里startExecutorsOnWorkers() 如下:

      /**
       * Schedule and launch executors on workers
       */
      private def startExecutorsOnWorkers(): Unit = {
        // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
        // in the queue, then the second app, etc.
        for (app <- waitingApps if app.coresLeft > 0) {
          val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor
          // Filter out workers that don't have enough resources to launch an executor
          val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
            .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
              worker.coresFree >= coresPerExecutor.getOrElse(1))
            .sortBy(_.coresFree).reverse
          val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
    
          // Now that we've decided how many cores to allocate on each worker, let's allocate them
          for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
            allocateWorkerResourceToExecutors(
              app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
          }
        }
      }
    

    简单的FIFO方式。
    scheduleExecutorsOnWorkers()返回对应worker需要扩展的executor记录
    allocateWorkerResourceToExecutors()进行资源分配

    到这里启过程基本完成,但仍有两处不明白。driver与app 分别是怎么动作机制。代码是如何提交上来的。

    相关文章

      网友评论

        本文标题:Spark 源码阅读 1

        本文链接:https://www.haomeiwen.com/subject/hdakjttx.html