概述
基于Spark 2.4.0
每个Spark应用都具备三大角色:
客户端(Client):负责收集用户的配置,准备Spark应用的运行环境,并启动Spark应用。
驱动器(Driver):负责中央协调,调度各个分布式工作节点。
执行器(Executor):工作节点,负责数据的处理。
Spark可运行在本地,也可运行在远程集群。不同的运行模式这三个角色的分布也不一样。
如何提交一个Spark应用?
提交运行一个Spark应用的命令如下:
./bin/spark-submit \
--class <main-class> \
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
... # other options
<application-jar> \
[application-arguments]
详细参数如下:
--master 表示要连接的集群管理器,spark://host:port, mesos://host:port, yarn, k8s://https://host:port, local
--deploy-mode client:选择在本地启动驱动器程序(Client和Driver两个角色都在同一个进程)
cluster:Driver程序会被传输到集群中的一台工作节点机器(例如Yarn的NodeManager)上启动
默认是client模式
--class 指定应用程序的启动类(main方法),仅针对Java或Scala语言编写的应用
--name 指定应用程序的名称
--jars 指定需要上传并放到应用的CLASSPATH中的JAR包,多个jar包用逗号分隔
--files 指定需要放到应用工作目录中的文件的列表。这个参数一般用来放需要分发到各节点的数据文件
--packages 指定以逗号分隔的jar的maven坐标列表,包含在Driver和Executor类路径中。将搜索本地maven repo,然后搜索maven
central以及--repositories给出的任何其他远程存储库。坐标的格式应为groupId:artifactId:version
--exclude-packages 为了避免冲突而指定需要排除的某些package
--repositories 指定以逗号分隔的远程repository列表,用来搜索--packages给出的maven坐标
--py-files 指定需要添加到PYTHONPATH中的文件的列表。其中可以包含 .py、.egg以及.zip文件
--conf 指定配置Spark应用的参数,格式为:PROP=VALUE
--properties-file 指定要加载的配置文件,默认为./conf/spark-defaults.conf
--driver-memory 指定Driver进程的堆内存大小,默认1024M
--driver-java-options 指定启动Driver进程的额外参数
--driver-library-path 指定传给Driver进程的额外的库路径
--driver-class-path 指定传给Driver进程的额外的类路径
--executor-memory 指定Executor进程的堆内存大小,默认1G
--driver-cores 指定分配给Driver进程的CPU核数,--deploy-mode为cluster时有效
--executor-cores 指定分配给每个Executor进程的CPU核数,--master为yarn或spark://host:port时有效
--queue 指定任务的队列,仅仅--master为yarn时有效。默认default
--num-executors 指定启动的Executor进程的个数,仅仅--master为yarn时有效。默认2
Spark任务的运行模式有哪些?
一、本地运行模式
--master为local时,代表Spark任务运行的本地。此时Client、Driver和Executor角色都在同一个进程,即spark-submit进程。

./spark-submit \
--master local \
--class org.apache.spark.examples.SparkPi \
../examples/jars/spark-examples_2.11-2.4.0.jar
local的形式有多种:
local:在本地启动一个工作线程来处理数据。
local[K]:在本地启动K个工作线程来并行处理数据。
local[K,F]:在本地启动K个工作线程来处理数据,并允许F次失败。
local[*]:根据本地服务器的CPU核数来启动工作线程,比如4核服务器,就启动4个工作线程。
local[*,F]:根据本地服务器的CPU核数来启动工作线程,并允许F次失败。比如4核服务器,就启动4个工作线程。
二、分布式运行模式
在分布式环境下,Spark集群采用的是主/从结构。这时候就涉及到集群资源的管理了。根据资源管理器的不同可分为以下两类:
1、使用Spark自带的资源管理器
要使用Spark自带的资源管理器,必须先启动Spark资源管理器的Master进程和Worker进程。
# 启动Spark资源管理器的Master节点
./start-master.sh
# 启动Spark资源管理器的Worker节点
./start-slave.sh -h hostname url:master
向资源管理器提交运行Spark应用:在spark-submit命令中指定--master <master-url>
为--master spark://master-host:master-port
。
./spark-submit \
--master spark://master-host:master-port \
--class org.apache.spark.examples.SparkPi \
../examples/jars/spark-examples_2.11-2.4.0.jar
根据--deploy-mode
参数的不同,又可以细分成两类:
①--deploy-mode client
:Client和Driver角色在同一个进程,这个进程运行在提交任务的服务器上。Executor为独立的进程,运行在Worker管理的服务器上。
./spark-submit \
--master spark://master-host:master-port \
--deploy-mode client \
--class org.apache.spark.examples.SparkPi \
../examples/jars/spark-examples_2.11-2.4.0.jar
②--deploy-mode cluster
:Client角色、Driver角色和Executor角色均为独立的进程。Client进程运行在提交任务的服务器上,Driver和Executor进程均运行在Woker管理的服务器上。
./spark-submit \
--master spark://master-host:master-port \
--deploy-mode cluster \
--class org.apache.spark.examples.SparkPi \
../examples/jars/spark-examples_2.11-2.4.0.jar
2、使用外部的资源管理器(Yarn、Mesos)
这里以Yarn为例:

在spark-env.sh
中指定hadoop的配置文件export HADOOP_CONF_DIR=/etc/hadoop/conf
后,向Yarn提交运行Spark应用:在spark-submit命令中指定--master
为--master yarn
。
./spark-submit \
--master yarn \
--class org.apache.spark.examples.SparkPi \
../examples/jars/spark-examples_2.11-2.4.0.jar
根据--deploy-mode
参数的不同,又可以细分成两类:
①--deploy-mode client
:Client和Driver角色在同一个进程,这个进程运行在提交任务的服务器上。Executor为独立的进程,运行在NodeManager管理的服务器上。
./spark-submit \
--master yarn \
--deploy-mode client \
--class org.apache.spark.examples.SparkPi \
../examples/jars/spark-examples_2.11-2.4.0.jar
②--deploy-mode cluster
:Client角色、Driver角色和Executor角色均为独立的进程。Client进程运行在提交任务的服务器上,Driver和Executor进程均运行在NodeManager管理的服务器上。
./spark-submit \
--master yarn \
--deploy-mode cluster \
--class org.apache.spark.examples.SparkPi \
../examples/jars/spark-examples_2.11-2.4.0.jar
Spark任务提交流程是什么?
spark-submit
命令负责提交Spark应用,对应的启动类为org.apache.spark.deploy.SparkSubmit#main
。
# org.apache.spark.deploy.SparkSubmit#submit
// 根据用户传入的参数,准备应用提交的环境
val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
// 启动应用
runMain(childArgs, childClasspath, sparkConf, childMainClass, args.verbose)
1、准备了哪些环境?
①childArgs
:传递给子进程的参数。
②childClasspath
:需要加载到CLASSPATH下的jar包列表。
③sparkConf
:Spark应用的配置信息。
④childMainClass
:我们自己编写的Spark应用的main方法所在的类。
2、启动流程是什么?
①将资源jar包加载到CLASSPATH下。
for (jar <- childClasspath) {
addJarToClasspath(jar, loader)
}
②根据childMainClass生成相应的SparkApplication实例,我们自己编写的mainClass无法继承SparkApplication[Spark]
,最后生成的实例为JavaMainApplication
。然后启动。
mainClass = Utils.classForName(childMainClass)
val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
} else {
// SPARK-4170
if (classOf[scala.App].isAssignableFrom(mainClass)) {
logWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.")
}
new JavaMainApplication(mainClass)
}
app.start(childArgs.toArray, sparkConf)
③启动时将SparkConf中的key-value全部写入System
中。(Java的System类)
val sysProps = conf.getAll.toMap
sysProps.foreach { case (k, v) =>
sys.props(k) = v
}
这里会解决一个疑问:这里有一个SparkConf对象。我们自己编写Spark应用的时候也会new一个SparkConf对象。这两个conf对象怎么传递信息?
val sparkConf = new SparkConf().setAppName("SparkDemo_1").setMaster("local")
val sparkContext = new SparkContext(sparkConf)
上面说到spark-submit生成的SparkConf中的key-vaue会被写入到System
中,当我们自己new SparkConf()
时会从System
中加载出所有的key-value信息。
# class SparkConf(loadDefaults: Boolean)
def this() = this(true)
if (loadDefaults) {
loadFromSystemProperties(false)
}
④最后执行我们自己编写的main方法。
val mainMethod = klass.getMethod("main", new Array[String](0).getClass)
mainMethod.invoke(null, args)}
网友评论