Driver进程启动

作者: code_solve | 来源:发表于2018-06-08 22:00 被阅读1次

    前言

    以Standalone 的 cluster 提交方式为例,从源码角度解析 driver 的启动流程

    1. 当我们敲下脚本执行 spark submit指令之后,查看脚本可知,会启动一个 org.apache.spark.deploy.SparkSubmit 提交进程。

    2. main() 方法会进入到 submit() 方法

    def main(args: Array[String]): Unit = {
        val appArgs = new SparkSubmitArguments(args)
        if (appArgs.verbose) {
          // scalastyle:off println
          printStream.println(appArgs)
          // scalastyle:on println
        }
        appArgs.action match {
          case SparkSubmitAction.SUBMIT => submit(appArgs)
          case SparkSubmitAction.KILL => kill(appArgs)
          case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
        }
      }
    
    1. submit() 方法主要有以下两个方法:一个是 prepareSubmitEnvironment() 另外一个是 runMain()
    private def submit(args: SparkSubmitArguments): Unit = {
        val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)
    
        def doRunMain(): Unit = {
          if (args.proxyUser != null) {
            val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
              UserGroupInformation.getCurrentUser())
            try {
              proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
                override def run(): Unit = {
                  runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
                }
              })
            } catch {
              case e: Exception =>
                // Hadoop's AuthorizationException suppresses the exception's stack trace, which
                // makes the message printed to the output by the JVM not very helpful. Instead,
                // detect exceptions with empty stack traces here, and treat them differently.
                if (e.getStackTrace().length == 0) {
                  // scalastyle:off println
                  printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
                  // scalastyle:on println
                  exitFn(1)
                } else {
                  throw e
                }
            }
          } else {
            runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
          }
    
    • prepareSubmitEnvironment():主要就是下面4个方面的作用

    Prepare the environment for submitting an application.
    This returns a 4-tuple:
    (1) the arguments for the child process,
    (2) a list of classpath entries for the child,
    (3) a map of system properties, and
    (4) the main class for the child

    • 为子进程准备提交的环境,主要包含以下几点 (该子进程我们这里叫 client 进程,但是不是通常我们说的 AppClient)
      (1)运行所需参数
      (2) 运行时 classpath 列表
      (3) 系统属性的映射
      (4) 运行的入口函数 main

    其中有如下一段代码我们可以看到 standaloneCluster 启动方式指定的 client 进程为 org.apache.spark.deploy.Client

        ...
       if (args.isStandaloneCluster) {
          if (args.useRest) {
            childMainClass = "org.apache.spark.deploy.rest.RestSubmissionClient"
            childArgs += (args.primaryResource, args.mainClass)
          } else {
            // In legacy standalone cluster mode, use Client as a wrapper around the user class
            childMainClass = "org.apache.spark.deploy.Client"
            if (args.supervise) { childArgs += "--supervise" }
            Option(args.driverMemory).foreach { m => childArgs += ("--memory", m) }
            Option(args.driverCores).foreach { c => childArgs += ("--cores", c) }
            childArgs += "launch"
            childArgs += (args.master, args.primaryResource, args.mainClass)
          }
          if (args.childArgs != null) {
            childArgs ++= args.childArgs
          }
        }
        ...
    
    • runMain() :该方法顾名思义就是运行 main 方法,这个 main 方法就是我们上面 client 进程的 main 方法,值得一说的是,这里是通过反射的方法来运行 main 函数,而不是重新启动了一个进程。
    1. 接下来我们就去看看 org.apache.spark.deploy.Client 的 main 方法吧
    object Client {
      def main(args: Array[String]) {
        // scalastyle:off println
        if (!sys.props.contains("SPARK_SUBMIT")) {
          println("WARNING: This client is deprecated and will be removed in a future version of Spark")
          println("Use ./bin/spark-submit with \"--master spark://host:port\"")
        }
        // scalastyle:on println
    
        val conf = new SparkConf()
        val driverArgs = new ClientArguments(args)
    
        if (!driverArgs.logLevel.isGreaterOrEqual(Level.WARN)) {
          conf.set("spark.akka.logLifecycleEvents", "true")
        }
        conf.set("spark.rpc.askTimeout", "10")
        conf.set("akka.loglevel", driverArgs.logLevel.toString.replace("WARN", "WARNING"))
        Logger.getRootLogger.setLevel(driverArgs.logLevel)
    
        val rpcEnv =
          RpcEnv.create("driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
    
        val masterEndpoints = driverArgs.masters.map(RpcAddress.fromSparkURL).
          map(rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, _, Master.ENDPOINT_NAME))
        rpcEnv.setupEndpoint("client", new ClientEndpoint(rpcEnv, driverArgs, masterEndpoints, conf))
    
        rpcEnv.awaitTermination()
      }
    }
    
    • 整段代码比较简单,主要做了一些参数设置,其中最主要的是创建了 ClientEndpoint,并传入了masterEndpoints,接下来就都是 client都是通过 ClientEndpoint 与 master 通讯,那么我们就可以去看看ClientEndpoint是怎么样的。
    1. 现在我们看到 ClientEndpointonstart()方法。如下:
    override def onStart(): Unit = {
        driverArgs.cmd match {
          case "launch" =>
            // TODO: We could add an env variable here and intercept it in `sc.addJar` that would
            //       truncate filesystem paths similar to what YARN does. For now, we just require
            //       people call `addJar` assuming the jar is in the same directory.
            val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"
    
            val classPathConf = "spark.driver.extraClassPath"
            val classPathEntries = sys.props.get(classPathConf).toSeq.flatMap { cp =>
              cp.split(java.io.File.pathSeparator)
            }
    
            val libraryPathConf = "spark.driver.extraLibraryPath"
            val libraryPathEntries = sys.props.get(libraryPathConf).toSeq.flatMap { cp =>
              cp.split(java.io.File.pathSeparator)
            }
    
            val extraJavaOptsConf = "spark.driver.extraJavaOptions"
            val extraJavaOpts = sys.props.get(extraJavaOptsConf)
              .map(Utils.splitCommandString).getOrElse(Seq.empty)
            val sparkJavaOpts = Utils.sparkJavaOpts(conf)
            val javaOpts = sparkJavaOpts ++ extraJavaOpts
            val command = new Command(mainClass,
              Seq("{{WORKER_URL}}", "{{USER_JAR}}", driverArgs.mainClass) ++ driverArgs.driverOptions,
              sys.env, classPathEntries, libraryPathEntries, javaOpts)
    
            val driverDescription = new DriverDescription(
              driverArgs.jarUrl,
              driverArgs.memory,
              driverArgs.cores,
              driverArgs.supervise,
              command)
            ayncSendToMasterAndForwardReply[SubmitDriverResponse](
              RequestSubmitDriver(driverDescription))
    
          case "kill" =>
            val driverId = driverArgs.driverId
            ayncSendToMasterAndForwardReply[KillDriverResponse](RequestKillDriver(driverId))
        }
      }
    
    • 这段代码的几个关键点
    1. val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"指定了我们 driver 进程的启动类
    2. ayncSendToMasterAndForwardReply[SubmitDriverResponse]( RequestSubmitDriver(driverDescription)) 这里就是将启动 driver 的各种参数封装到 RequestSubmitDriver提交给 master ,并会异步的传回启动的信息。

    本文到这里就结束了,后面的事情,网上的博客比较多,我就不乱哔哔了,写本篇文章主要是笔者被人问及driver启动,但是究其根的说清楚的在网上没有很清楚的说法,当然也可能是我们自己没找到,嘿嘿。

    相关文章

      网友评论

        本文标题:Driver进程启动

        本文链接:https://www.haomeiwen.com/subject/xlqzsftx.html