美文网首页
002 Spark shell 是怎么一回事

002 Spark shell 是怎么一回事

作者: Whaatfor | 来源:发表于2020-10-30 20:54 被阅读0次

    所有脚本和代码以 Spark 3.0.1 为准,Scala 版本为 2.12~

    (作为一个强迫症患者,为什么不选 3.0.0,因为 3.0.1 是稳定版本)

    Spark shell 是怎么一回事

    前文探究了 spark-submit 脚本是怎么工作的,现在来看看 Spark shell 都发生了些什么。

    熟悉 Scala 的朋友应该知道 Scala 有自己的 shell。其实简单点说,Spark shell 就是使用了 Scala shell 的解释模块,在初始化 shell 时同时初始化了一个 SparkSession 对象 spark,和一个 SparkContext 对象 sc。在 2.0 之前我记得只有 SparkContext 对象 sc,2.0 之后引入了 SparkSession 对象 spark。所以我们在 Spark shell 中可以直接通过这两个预置的对象进行简单的 Spark 应用开发。

    Spark shell 的本质就是一个 Spark 应用程序,但是通过 Scala shell 提供了交互式编程的接口,并预先初始化了上下文对象。

    spark-shell

    • 其实就是调用了 spark-submit 脚本,传入的主类是 org.apache.spark.repl.Main

    文件:${SPARK_HOME}/bin/spark-shell

     ...
     
     # line28:进入 posix模式
     set -o posix
    
     # line31:如果没有 SPARK_HOME 环境变量,就通过 bin 目录下的 find-spark-home 脚本查找
     if [ -z "${SPARK_HOME}" ]; then
       source "$(dirname "$0")"/find-spark-home
     fi
    
     ...
    
     # line45:Scala 默认不会使用 java 的 classpath,需要手动设置 "-Dscala.usejavacp=true";只对 Spark shell 指定该参数,因为 Scala REPL 有自己的类加载器,通过 spark.driver.extraClassPath 添加额外的 classpath 不会自动生效
     SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Dscala.usejavacp=true"
    
     # line47:主函数
     function main() {
       if $cygwin; then
         # Workaround for issue involving JLine and Cygwin
         # (see http://sourceforge.net/p/jline/bugs/40/).
         # If you're using the Mintty terminal emulator in Cygwin, may need to set the
         # "Backspace sends ^H" setting in "Keys" section of the Mintty options
         # (see https://github.com/sbt/sbt/issues/562).
         stty -icanon min 1 -echo > /dev/null 2>&1
         export SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Djline.terminal=unix"
         "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@"
         stty icanon echo > /dev/null 2>&1
       else
         export SPARK_SUBMIT_OPTS
         # line60:其实还是调用了 spark-submit 脚本
         "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@"
       fi
     }
    
     ...
    
     # line92:执行 main 函数
     main "$@"
     
     ...
    

    org.apache.spark.repl.Main

    既然启动的主程序是 org.apache.spark.repl.Main,那就来看看里面的 main 方法都做了些什么。

    • 调用 doMain 方法,传入 Scala 解释器 SparkILoop 对象
    • 调用 SparkILoopprocess 方法,运行解释器

    文件:${spark-project}/repl/src/main/scala/org/apache/spark/repl/Main.java

    // line56:main 方法调用了 doMain 方法,并传入了一个 SparkILoop 对象,该对象就是 Scala 解释器
    def main(args: Array[String]): Unit = {
      isShellSession = true
      doMain(args, new SparkILoop)
    }
    
    // line62:真正的主函数,前面都是一些准备工作,最主要的方法是调用了 interp.process(),启动解释器
    private[repl] def doMain(args: Array[String], _interp: SparkILoop): Unit = {
      interp = _interp
      val jars = Utils.getLocalUserJarsForShell(conf)
      // Remove file:///, file:// or file:/ scheme if exists for each jar
      .map { x => if (x.startsWith("file:")) new File(new URI(x)).getPath else x }
      .mkString(File.pathSeparator)
      val interpArguments = List(
        "-Yrepl-class-based",
        "-Yrepl-outdir", s"${outputDir.getAbsolutePath}",
        "-classpath", jars
      ) ++ args.toList
    
      val settings = new GenericRunnerSettings(scalaOptionError)
      settings.processArguments(interpArguments, true)
    
      if (!hasErrors) {
        interp.process(settings) // Repl starts and goes in loop of R.E.P.L
        Option(sparkContext).foreach(_.stop)
      }
    }
    
    // line83:创建 SparkSession 对象的方法,会在 SparkILoop 的初始化语句中调用
    def createSparkSession(): SparkSession = {
      ...
    }
    

    org.apache.spark.repl.SparkILoop

    该类继承自 scala.tools.nsc.interpreter.ILoop,对其作了一些修改。主要关注 process 方法。

    • 初始化 SplashLoop 对象,这个对象是用来触发解释器进入循环的,它实现了 Runnable 接口,并且内部有一个 Thread 对象封装了自己
    • 初始化 SparkSession 对象 sparkSparkContext 对象 sc
    • 打印 Welcome 信息
    • 启动 SplashLoop 线程,等待用户输入代码
    • 编译用户输入的代码并执行代码,执行完之后等待下一次的输入,这个逻辑在 scala.tools.nsc.interpreter.ILoop 中,下面会展示

    文件:${spark-project}/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala

    // line45:初始化 spark 编程入口对象的语句,传给 Scala 的解释器,所以在 Spark shell 启动之后我们就可以使用这两个对象了,可以看到其中还引入了一些包
    val initializationCommands: Seq[String] = Seq(
      """
        @transient val spark = if (org.apache.spark.repl.Main.sparkSession != null) {
            org.apache.spark.repl.Main.sparkSession
          } else {
            org.apache.spark.repl.Main.createSparkSession()
          }
        @transient val sc = {
          val _sc = spark.sparkContext
          if (_sc.getConf.getBoolean("spark.ui.reverseProxy", false)) {
            val proxyUrl = _sc.getConf.get("spark.ui.reverseProxyUrl", null)
            if (proxyUrl != null) {
              println(
                s"Spark Context Web UI is available at ${proxyUrl}/proxy/${_sc.applicationId}")
            } else {
              println(s"Spark Context Web UI is available at Spark Master Public URL")
            }
          } else {
            _sc.uiWebUrl.foreach {
              webUrl => println(s"Spark context Web UI available at ${webUrl}")
            }
          }
          println("Spark context available as 'sc' " +
            s"(master = ${_sc.master}, app id = ${_sc.applicationId}).")
          println("Spark session available as 'spark'.")
          _sc
        }
        """,
      "import org.apache.spark.SparkContext._",
      "import spark.implicits._",
      "import spark.sql",
      "import org.apache.spark.sql.functions._"
    )
    // line79:调用解释器 intp 初始化 Spark
    def initializeSpark(): Unit = {
      if (!intp.reporter.hasErrors) {
        // `savingReplayStack` removes the commands from session history.
        savingReplayStack {
          initializationCommands.foreach(intp quietRun _)
        }
      } else {
        throw new RuntimeException(s"Scala $versionString interpreter encountered " +
                                   "errors during initialization")
      }
    }
    // line92:打印 Welcome 信息
    /** Print a welcome message */
    override def printWelcome(): Unit = {
      import org.apache.spark.SPARK_VERSION
      echo("""Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /___/ .__/\_,_/_/ /_/\_\   version %s
          /_/
             """.format(SPARK_VERSION))
      val welcomeMsg = "Using Scala %s (%s, Java %s)".format(
        versionString, javaVmName, javaVersion)
      echo(welcomeMsg)
      echo("Type in expressions to have them evaluated.")
      echo("Type :help for more information.")
    }
    
    // line136:该方法有一些注释,大意是说目前 Scala 2.12 的解释器 ILoop 会首先打印 Welcome 信息,并且暂时没有更改的方式,但是 Spark 想首先打印 Spark URL,与之前的 Spark 版本保持一致,所以就复用了 ILoop 里面的 process 方法的大部分代码,以符合先打印 Spark URL 再打印 Welcome 信息的逻辑
    override def process(settings: Settings): Boolean = {
    
      ...
        
      // line141:SplashLoop 实现了 Runnable 接口,并且内部有一个 Thread 对象封装了自己,所以在后面可以调用 start 方法启动
      /** Reader to use before interpreter is online. */
      def preLoop = {
        val sr = SplashReader(newReader) { r =>
          in = r
          in.postInit()
        }
        in = sr
        SplashLoop(sr, prompt)
      }
    
      // line153:SplashLoop 创建之后在 Scala 解释器中初始化 SparkSession 对象 spark 和 SparkContext 对象 sc
      def loopPostInit(): Unit = mumly {
        ...
        initializeSpark()
        ...
      }
      
      ...
      
      // line201:启动解释器
      def startup(): String = withSuppressedSettings {
        // 初始化 Scala 解释器 SplashLoop 对象
        // let them start typing
        val splash = preLoop
    
        // while we go fire up the REPL
        try {
          // don't allow ancient sbt to hijack the reader
          savingReader {
            createInterpreter()
          }
          intp.initializeSynchronous()
    
          val field = classOf[ILoop].getDeclaredFields.filter(_.getName.contains("globalFuture")).head
          field.setAccessible(true)
          field.set(this, Future successful true)
    
          if (intp.reporter.hasErrors) {
            echo("Interpreter encountered errors during initialization!")
            null
          } else {
            // line221:SplashLoop 对象创建之后初始化 SparkSession 对象 spark 和 SparkContext 对象 sc
            loopPostInit()
            // line222:打印 Welcome 信息
            printWelcome()
            // line222:启动 SplashLoop,等待用户输入代码
            splash.start()
    
            // line225:读取用户输入的代码并返回,停止 SplashLoop 线程
            val line = splash.line           // what they typed in while they were waiting
            if (line == null) {              // they ^D
              try out print Properties.shellInterruptedString
              finally closeInterpreter()
            }
            line
          }
        } finally splash.stop()
      }
    
      this.settings = settings
      startup() match {
        case null => false
        case line =>
          // line239:解释执行用户输入的代码,执行完之后等待下一次的输入
          try loop(line) match {
            case LineResults.EOF => out print Properties.shellInterruptedString
            case _ =>
          }
        catch AbstractOrMissingHandler()
        finally closeInterpreter()
        true
      }
    }
    

    scala.tools.nsc.interpreter.ILoop

    为了搞清楚解释器是怎么运行的,还是需要看一下 scala.tools.nsc.interpreter.ILoop 的代码,主要关注上面执行的 loop 方法。

    • 通过 processLine 方法解释执行输入的代码,具体的执行逻辑就不细究了,有兴趣的朋友可以自行探索
    • 成功之后继续解释下一次的输入
    • 如果执行出错就返回 ERR

    文件:${maven-repository}/org/scala-lang/scala-compiler/2.12.10/scala-compiler-2.12.10.jar!/scala/tools/nsc/interpreter/ILoop.class

    // line482:这是个递归方法,通过 processLine 方法解释执行输入的代码,成功之后继续解释下一次的输入
    @tailrec final def loop(line: String): LineResult = {
      import LineResults._
      if (line == null) EOF
      else if (try processLine(line) catch crashRecovery) loop(readOneLine())
      else ERR
    }
    

    Summary

    所以 Spark shell 的流程是这样的

    • spark-shell 脚本调用了 spark-submit 脚本,启动程序 org.apache.spark.repl.Main
    • org.apache.spark.repl.Main 中会初始化解释器 org.apache.spark.repl.SparkILoop,其中会初始化 SparkSession 对象 sparkSparkContext 对象 sc,所以后面输入的代码可以直接使用这两个对象
    • 之后借助 Scala 中的解释器 scala.tools.nsc.interpreter.ILoop 来对输入的代码进行解释执行

    相关文章

      网友评论

          本文标题:002 Spark shell 是怎么一回事

          本文链接:https://www.haomeiwen.com/subject/koqjvktx.html