Spark(一):任务提交

作者: b91cbec6a902 | 来源:发表于2019-03-15 14:53 被阅读15次

概述

基于Spark 2.4.0

每个Spark应用都具备三大角色:
客户端(Client):负责收集用户的配置,准备Spark应用的运行环境,并启动Spark应用。
驱动器(Driver):负责中央协调,调度各个分布式工作节点。
执行器(Executor):工作节点,负责数据的处理。
Spark可运行在本地,也可运行在远程集群。不同的运行模式这三个角色的分布也不一样。

如何提交一个Spark应用?

提交运行一个Spark应用的命令如下:

./bin/spark-submit \
  --class <main-class> \
  --master <master-url> \
  --deploy-mode <deploy-mode> \
  --conf <key>=<value> \
  ... # other options
  <application-jar> \
  [application-arguments]

详细参数如下:

--master            表示要连接的集群管理器,spark://host:port, mesos://host:port, yarn, k8s://https://host:port, local

--deploy-mode       client:选择在本地启动驱动器程序(Client和Driver两个角色都在同一个进程)
                    cluster:Driver程序会被传输到集群中的一台工作节点机器(例如Yarn的NodeManager)上启动
                    默认是client模式

--class             指定应用程序的启动类(main方法),仅针对Java或Scala语言编写的应用

--name              指定应用程序的名称

--jars              指定需要上传并放到应用的CLASSPATH中的JAR包,多个jar包用逗号分隔

--files             指定需要放到应用工作目录中的文件的列表。这个参数一般用来放需要分发到各节点的数据文件

--packages          指定以逗号分隔的jar的maven坐标列表,包含在Driver和Executor类路径中。将搜索本地maven repo,然后搜索maven 
                    central以及--repositories给出的任何其他远程存储库。坐标的格式应为groupId:artifactId:version

--exclude-packages  为了避免冲突而指定需要排除的某些package

--repositories      指定以逗号分隔的远程repository列表,用来搜索--packages给出的maven坐标

--py-files          指定需要添加到PYTHONPATH中的文件的列表。其中可以包含 .py、.egg以及.zip文件

--conf              指定配置Spark应用的参数,格式为:PROP=VALUE

--properties-file   指定要加载的配置文件,默认为./conf/spark-defaults.conf

--driver-memory     指定Driver进程的堆内存大小,默认1024M

--driver-java-options 指定启动Driver进程的额外参数

--driver-library-path 指定传给Driver进程的额外的库路径

--driver-class-path 指定传给Driver进程的额外的类路径

--executor-memory   指定Executor进程的堆内存大小,默认1G

--driver-cores      指定分配给Driver进程的CPU核数,--deploy-mode为cluster时有效

--executor-cores    指定分配给每个Executor进程的CPU核数,--master为yarn或spark://host:port时有效

--queue             指定任务的队列,仅仅--master为yarn时有效。默认default

--num-executors     指定启动的Executor进程的个数,仅仅--master为yarn时有效。默认2

Spark任务的运行模式有哪些?

一、本地运行模式

--master为local时,代表Spark任务运行的本地。此时Client、Driver和Executor角色都在同一个进程,即spark-submit进程。

Spark本地模式
./spark-submit \
--master local \
--class org.apache.spark.examples.SparkPi \
../examples/jars/spark-examples_2.11-2.4.0.jar

local的形式有多种:
local:在本地启动一个工作线程来处理数据。
local[K]:在本地启动K个工作线程来并行处理数据。
local[K,F]:在本地启动K个工作线程来处理数据,并允许F次失败。
local[*]:根据本地服务器的CPU核数来启动工作线程,比如4核服务器,就启动4个工作线程。
local[*,F]:根据本地服务器的CPU核数来启动工作线程,并允许F次失败。比如4核服务器,就启动4个工作线程。

二、分布式运行模式

在分布式环境下,Spark集群采用的是主/从结构。这时候就涉及到集群资源的管理了。根据资源管理器的不同可分为以下两类:

1、使用Spark自带的资源管理器

要使用Spark自带的资源管理器,必须先启动Spark资源管理器的Master进程和Worker进程。

# 启动Spark资源管理器的Master节点
./start-master.sh

# 启动Spark资源管理器的Worker节点
./start-slave.sh -h hostname url:master

向资源管理器提交运行Spark应用:在spark-submit命令中指定--master <master-url>--master spark://master-host:master-port

./spark-submit \
--master spark://master-host:master-port \
--class org.apache.spark.examples.SparkPi \
../examples/jars/spark-examples_2.11-2.4.0.jar

根据--deploy-mode参数的不同,又可以细分成两类:
--deploy-mode client:Client和Driver角色在同一个进程,这个进程运行在提交任务的服务器上。Executor为独立的进程,运行在Worker管理的服务器上。

./spark-submit \
--master spark://master-host:master-port \
--deploy-mode client \
--class org.apache.spark.examples.SparkPi \
../examples/jars/spark-examples_2.11-2.4.0.jar

--deploy-mode cluster:Client角色、Driver角色和Executor角色均为独立的进程。Client进程运行在提交任务的服务器上,Driver和Executor进程均运行在Woker管理的服务器上。

./spark-submit \
--master spark://master-host:master-port \
--deploy-mode cluster \
--class org.apache.spark.examples.SparkPi \
../examples/jars/spark-examples_2.11-2.4.0.jar

2、使用外部的资源管理器(Yarn、Mesos)

这里以Yarn为例:

Spark On Yarn运行模式

spark-env.sh 中指定hadoop的配置文件export HADOOP_CONF_DIR=/etc/hadoop/conf后,向Yarn提交运行Spark应用:在spark-submit命令中指定--master--master yarn

./spark-submit \
--master yarn \
--class org.apache.spark.examples.SparkPi \
../examples/jars/spark-examples_2.11-2.4.0.jar

根据--deploy-mode参数的不同,又可以细分成两类:
--deploy-mode client:Client和Driver角色在同一个进程,这个进程运行在提交任务的服务器上。Executor为独立的进程,运行在NodeManager管理的服务器上。

./spark-submit \
--master yarn \
--deploy-mode client \
--class org.apache.spark.examples.SparkPi \
../examples/jars/spark-examples_2.11-2.4.0.jar

--deploy-mode cluster:Client角色、Driver角色和Executor角色均为独立的进程。Client进程运行在提交任务的服务器上,Driver和Executor进程均运行在NodeManager管理的服务器上。

./spark-submit \
--master yarn \
--deploy-mode cluster \
--class org.apache.spark.examples.SparkPi \
../examples/jars/spark-examples_2.11-2.4.0.jar

Spark任务提交流程是什么?

spark-submit命令负责提交Spark应用,对应的启动类为org.apache.spark.deploy.SparkSubmit#main

# org.apache.spark.deploy.SparkSubmit#submit

// 根据用户传入的参数,准备应用提交的环境
val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)

// 启动应用
runMain(childArgs, childClasspath, sparkConf, childMainClass, args.verbose)

1、准备了哪些环境?

childArgs:传递给子进程的参数。
childClasspath:需要加载到CLASSPATH下的jar包列表。
sparkConf:Spark应用的配置信息。
childMainClass:我们自己编写的Spark应用的main方法所在的类。

2、启动流程是什么?

①将资源jar包加载到CLASSPATH下。

for (jar <- childClasspath) {
  addJarToClasspath(jar, loader)
}

②根据childMainClass生成相应的SparkApplication实例,我们自己编写的mainClass无法继承SparkApplication[Spark],最后生成的实例为JavaMainApplication。然后启动。

mainClass = Utils.classForName(childMainClass)

val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
  mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
} else {
  // SPARK-4170
  if (classOf[scala.App].isAssignableFrom(mainClass)) {
    logWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.")
  }
  new JavaMainApplication(mainClass)
}
app.start(childArgs.toArray, sparkConf)

③启动时将SparkConf中的key-value全部写入System中。(Java的System类)

val sysProps = conf.getAll.toMap
sysProps.foreach { case (k, v) =>
  sys.props(k) = v
}

这里会解决一个疑问:这里有一个SparkConf对象。我们自己编写Spark应用的时候也会new一个SparkConf对象。这两个conf对象怎么传递信息?

val sparkConf = new SparkConf().setAppName("SparkDemo_1").setMaster("local")
val sparkContext = new SparkContext(sparkConf)

上面说到spark-submit生成的SparkConf中的key-vaue会被写入到System中,当我们自己new SparkConf()时会从System中加载出所有的key-value信息。

# class SparkConf(loadDefaults: Boolean)
def this() = this(true)
if (loadDefaults) {
loadFromSystemProperties(false)
}

④最后执行我们自己编写的main方法。

val mainMethod = klass.getMethod("main", new Array[String](0).getClass)

mainMethod.invoke(null, args)}

相关文章

网友评论

    本文标题:Spark(一):任务提交

    本文链接:https://www.haomeiwen.com/subject/qhzbdqtx.html