美文网首页
001 从 spark-submit 说起

001 从 spark-submit 说起

作者: Whaatfor | 来源:发表于2020-10-30 20:52 被阅读0次

    所有脚本和代码以 Spark 3.0.1 为准,Scala 版本为 2.12~

    (作为一个强迫症患者,为什么不选 3.0.0,因为 3.0.1 是稳定版本)

    从 spark-submit 说起

    Spark 应用程序通常是用 spark-submit 脚本提交的,无论是本地模式还是集群模式。

    spark-submit

    • 如果需要,会通过脚本查找环境变量 SPARK_HOME
    • 调用 spark-class 脚本,这里传入的参数 org.apache.spark.deploy.SparkSubmit 在后面解析命令中会用到

    文件:${SPARK_HOME}/bin/spark-submit

     # line20:如果没有 SPARK_HOME 环境变量,就通过 bin 目录下的 find-spark-home 脚本查找
     if [ -z "${SPARK_HOME}" ]; then
       source "$(dirname "$0")"/find-spark-home
     fi
    
     # line25:禁用 Python 3.3+ 版本之后对字符串的随机哈希
     export PYTHONHASHSEED=0
     # line27:调用 spark-class 脚本
     exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
    

    find-spark-home

    • 如果安装了 PySpark,就用 Python 脚本查找 SPARK_HOME
    • 否则就将 SPARK_HOME 设置为当前目录的父目录

    文件:${SPARK_HOME}/bin/find-spark-home

     # line22:查找 SPARK_HOME 的 Python 脚本
     FIND_SPARK_HOME_PYTHON_SCRIPT="$(cd "$(dirname "$0")"; pwd)/find_spark_home.py"
    
     # line25:如果环境变量已经设置了,就退出
     if [ ! -z "${SPARK_HOME}" ]; then
        exit 0
     elif [ ! -f "$FIND_SPARK_HOME_PYTHON_SCRIPT" ]; then
       # 如果所在的目录不存在 find_spark_home.py 文件,也就是说没有通过 pip 安装 PySpark,那就把 SPARK_HOME 环境变量设置为当前目录的父目录
       export SPARK_HOME="$(cd "$(dirname "$0")"/..; pwd)"
     else
       # 如果通过 pip 安装 PySpark,就用脚本寻找 SPARK_HOME
       # 默认使用标准的 python 解释器,除非额外指定
       if [[ -z "$PYSPARK_DRIVER_PYTHON" ]]; then
          PYSPARK_DRIVER_PYTHON="${PYSPARK_PYTHON:-"python"}"
       fi
       export SPARK_HOME=$($PYSPARK_DRIVER_PYTHON "$FIND_SPARK_HOME_PYTHON_SCRIPT")
     fi
    

    spark-class

    • 加载环境变量
    • 生成 classpath
    • 通过 launcher 程序 org.apache.spark.launcher.Main 输出运行命令
    • 如果一切正常,执行生成的命令

    文件:${SPARK_HOME}/bin/spark-class

     # line20:如果没有 SPARK_HOME 环境变量,就通过 bin 目录下的 find-spark-home 脚本查找,跟上面一样,相当于二次检查
     if [ -z "${SPARK_HOME}" ]; then
       source "$(dirname "$0")"/find-spark-home
     fi
     
     # line24:配置 spark 环境
     . "${SPARK_HOME}"/bin/load-spark-env.sh
    
     # line27:查找 Java 环境,如果存在 JAVA_HOME 环境变量就采用,不存在就查找 java 命令,一般 Linux 系统都会有;如果还没找到那就不干了
     if [ -n "${JAVA_HOME}" ]; then
       RUNNER="${JAVA_HOME}/bin/java"
     else
       if [ "$(command -v java)" ]; then
         RUNNER="java"
       else
         echo "JAVA_HOME is not set" >&2
         exit 1
       fi
     fi
    
     # line39:查找 Spark 依赖,如果 ${SPARK_HOME}/jars 是个目录,就将其设置为环境变量 SPARK_JARS_DIR;否则就设置为 ${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION/jars,这个感觉像针对源码启动的方式,如果从官网下载编译好的版本是没有 assembly 目录的
     if [ -d "${SPARK_HOME}/jars" ]; then
       SPARK_JARS_DIR="${SPARK_HOME}/jars"
     else
       SPARK_JARS_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION/jars"
     fi
     # line45:如果 SPARK_JARS_DIR 不是一个目录,同时 "$SPARK_TESTING$SPARK_SQL_TESTING" 为空,就退出;否则将 classpath 设置为 "$SPARK_JARS_DIR/*"
     if [ ! -d "$SPARK_JARS_DIR" ] && [ -z "$SPARK_TESTING$SPARK_SQL_TESTING" ]; then
       echo "Failed to find Spark jars directory ($SPARK_JARS_DIR)." 1>&2
       echo "You need to build Spark with the target \"package\" before running this program." 1>&2
       exit 1
     else
       LAUNCH_CLASSPATH="$SPARK_JARS_DIR/*"
     fi
    
     # line53:如果 SPARK_PREPEND_CLASSES 存在,将构建目录添加到 classpath 中,可以忽略
     if [ -n "$SPARK_PREPEND_CLASSES" ]; then
       LAUNCH_CLASSPATH="${SPARK_HOME}/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH"
     fi
    
     # line70:解析启动命令参数,这里启用一个 Java 程序来解析输入的参数,该程序会把解析后的启动参数写到标准输出,然后下面 76 行再把这些参数读进来构建真正的启动命令
     build_command() {
       "$RUNNER" -Xmx128m $SPARK_LAUNCHER_OPTS -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
       printf "%d\0" $?
     }
    
     # line76:关闭 posix 模式,因为不支持进程替换;这里会把上面提到的 Java 进程的输出读进来解析,注意分隔符是 $'\0',这是从 Java 程序里输出的,具体的看后面对 org.apache.spark.launcher.Main 代码的解释;最终的命令存放在 CMD 数组里
     set +o posix
     CMD=()
     DELIM=$'\n'
     CMD_START_FLAG="false"
     while IFS= read -d "$DELIM" -r ARG; do
       if [ "$CMD_START_FLAG" == "true" ]; then
         CMD+=("$ARG")
       else
         if [ "$ARG" == $'\0' ]; then
           # Java 程序会先输出一个 '\0\n',用来标识开始输出命令参数
           DELIM=''
           CMD_START_FLAG="true"
         elif [ "$ARG" != "" ]; then
           echo "$ARG"
         fi
       fi
     done < <(build_command "$@")
    
     COUNT=${#CMD[@]}
     LAST=$((COUNT - 1))
     LAUNCHER_EXIT_CODE=${CMD[$LAST]}
    
     # line101:如果上面 Java 程序的返回值不是一个整数,就异常退出
     if ! [[ $LAUNCHER_EXIT_CODE =~ ^[0-9]+$ ]]; then
       echo "${CMD[@]}" | head -n-1 1>&2
       exit 1
     fi
    
     # line106:如果返回值不是 0,异常退出
     if [ $LAUNCHER_EXIT_CODE != 0 ]; then
       exit $LAUNCHER_EXIT_CODE
     fi
     
     # line110:真正的启动命令,注意删除了最后一个元素,因为前面的 build_command 函数中最后还加了返回值进去,这里会把 0 干掉
     CMD=("${CMD[@]:0:$LAST}")
     exec "${CMD[@]}"
    
    • 例如,如果在我的机器上执行 ${SPARK_HOME}/bin/spark-shell --master local 命令,最终的 CMD 其实是 /Library/Java/JavaVirtualMachines/jdk1.8.0_261.jdk/Contents/Home/bin/java -cp ${SPARK_HOME}/conf/:/Users/fengjian/opt/spark-3.0.1-bin-hadoop3.2/jars/* -Dscala.usejavacp=true -Xmx1g org.apache.spark.deploy.SparkSubmit --master local --class org.apache.spark.repl.Main --name Spark shell spark-shell

    load-spark-env

    • 检查 SPARK_HOME
    • 加载 SPARK_CONF_DIR 目录下的 spark-env.sh 脚本,默认使用 ${SPARK_HOME}"/conf 作为 SPARK_CONF_DIR,脚本中声明的变量会被提升为环境变量
    • 设置 SPARK_SCALA_VERSION 环境变量

    文件:${SPARK_HOME}/bin/load-spark-env.sh

     # line25:真保险,第三次检查了
     if [ -z "${SPARK_HOME}" ]; then
       source "$(dirname "$0")"/find-spark-home
     fi
    
     # line29:如果环境变量 SPARK_ENV_LOADED 不存在,
     SPARK_ENV_SH="spark-env.sh"
     if [ -z "$SPARK_ENV_LOADED" ]; then
       export SPARK_ENV_LOADED=1
       # 如果 SPARK_CONF_DIR 环境变量不存在,使用 ${SPARK_HOME}/conf 作为 SPARK_CONF_DIR
       export SPARK_CONF_DIR="${SPARK_CONF_DIR:-"${SPARK_HOME}"/conf}"
         # 找到 SPARK_CONF_DIR 目录下的 spark-env.sh 的脚本
       SPARK_ENV_SH="${SPARK_CONF_DIR}/${SPARK_ENV_SH}"
       if [[ -f "${SPARK_ENV_SH}" ]]; then
         # 将 spark-env.sh 脚本中声明的变量都暴露为环境变量
         set -a
         . ${SPARK_ENV_SH}
         set +a
       fi
     fi
     
     # line47:设置 SPARK_SCALA_VERSION 环境变量
     export SPARK_SCALA_VERSION=2.12
    

    默认情况下 ${SPARK_HOME}/conf/spark-env.sh 脚本不存在,有一个 ${SPARK_HOME}/conf/spark-env.sh.template 的范例脚本,里面包含了很多可以配置的环境变量名称和相应的用法,当然都是注释掉的,在需要使用的时候拷贝一份命名为 ${SPARK_HOME}/conf/spark-env.sh,再将需要的环境变量暴露出来就可以生效了。

    org.apache.spark.launcher.Main

    • 通过 buildCommand 方法解析命令行参数,该方法的细节这里就不展开了,感兴趣的朋友可以自行探索
    • 输出解析后的命令行参数,在 spark-class 脚本的 76 行会接收这些参数,如果一切正常,作为真正的启动脚本执行

    文件:${spark-project}/launcher/src/main/java/org/apache/spark/launcher/Main.java

    // line51
    public static void main(String[] argsArray) throws Exception {
      checkArgument(argsArray.length > 0, "Not enough arguments: missing class name.");
    
      List<String> args = new ArrayList<>(Arrays.asList(argsArray));
      String className = args.remove(0);
      
      // line57:可以看到能够通过 SPARK_PRINT_LAUNCH_COMMAND 环境变量来打印解析后的命令
      boolean printLaunchCommand = !isEmpty(System.getenv("SPARK_PRINT_LAUNCH_COMMAND"));
      Map<String, String> env = new HashMap<>();
      List<String> cmd;
      // line60:如果第一个参数时 org.apache.spark.deploy.SparkSubmit,说明是通过 spark-submit 脚本提交的,这也是最常用的
      if (className.equals("org.apache.spark.deploy.SparkSubmit")) {
        ...
      } else {
        ...
      }
    
      if (isWindows()) {
        // line91:如果是 Windows 操作系统,就直接打印命令
        System.out.println(prepareWindowsCommand(cmd, env));
      } else {
        // line94:打印一个 NULL 和一个换行符来告诉 spark-class 脚本接下来会输出真正的命令行运行参数
        System.out.println('\0');
    
        // line97:使用 NULL 作为分隔符是因为在 bash 中该付汇不可能是作为一个参数传递;打印命令项供 spark-class 脚本使用
        List<String> bashCmd = prepareBashCommand(cmd, env);
        for (String c : bashCmd) {
          System.out.print(c);
          System.out.print('\0');
        }
      }
    }
    

    Summary

    那么现在整个流程比较清晰了:

    • spark-submit 脚本调用了 spark-class 脚本,并传递参数 org.apache.spark.deploy.SparkSubmit
    • spark-class 脚本会加载需要的环境变量,生成 classpath,并通过 org.apache.spark.launcher.Main 生成真正运行的命令行脚本
    • 启动 JVM 进程

    相关文章

      网友评论

          本文标题:001 从 spark-submit 说起

          本文链接:https://www.haomeiwen.com/subject/uprprktx.html