一、核心组件
spark是一个典型的master-slave主从架构,有一些核心组件:Driver和Executor、Master和Worker、ApplicationMaster。对于standalone独立部署模式下,Driver和Executor负责计算这块,Master和Worker负责资源调度这块,ApplicationMaster负责协调资源和计算,避免计算和资源直接耦合,起到中间层的作用。
各组件具体介绍
-
Driver
:创建spark上下文对象环境的应用程序就称为Driver驱动器,用于执行spark任务中的main方法,将用户程序转换为作业job,负责实际代码的执行工作。 -
Executor
:执行器用于接收任务并执行任务,并将结果返回给Driver。 -
Master
:负责资源的调度和分配,并对集群进行监控,类似于YARN模式下的ResourceManager。 -
Worker
:由Master分配资源对数据进行并行的处理和计算,类似于YARN模式下的NodeManager。 -
ApplicationMaster
:用于向资源调度器申请执行任务的资源容器Container,运行用户自己的程序任务job,监控整个任务的执行及跟踪状态。
理解并发和并行的概念
比如机器cpu是单核的,那你程序设置抢占多核操作时其实只是相当于并发,多个线程抢占单核cpu,
那假设机器cpu是多核的,那你程序设置抢占多核操作时(每个任务执行占用一核,有3个任务分别抢占3个核同时执行),这才称作是并行,多条机器同时计算任务。
有向无环图(DAG):体现的是一种有向依赖关系,但不能形成闭环依赖,在程序当中主要起到调度作用。
二、三大数据结构
-
RDD:弹性分布式数据集,代表着弹性的、不可变、可分区、里面的元素可并行计算的集合。是Spark最基本的数据处理模型,同时也是最小的计算单元,每一个RDD封装着数据和计算逻辑,当复杂操作的时候可以多个RDD关联起来从而实现RDD的复杂逻辑。
RDD数据处理操作
,我们可以来类比下IO操作,IO操作过程中体现了装饰者设计模式,相当于是功能的扩展,而RDD操作也是一样,比如RDD经过一系列算子来对数据进行处理,一层层包装扩展功能。当然也有区别,RDD的数据只有在调用了action行动算子的时候才会真正执行业务逻辑操作,之前的转换都是功能的扩展,还有RDD也不会中间存储数据,只封装着计算逻辑,不像IO操作还会来个缓冲区临时存储下数据。
RDD分区(partitions)
,分区的多少涉及这个RDD进行并行计算的粒度,每个RDD分区的计算操作都在一个单独的任务中被执行,创建RDD时若无指定分区数则默认按分配到的资源CPU核个数作为RDD分区值。分区内的计算是有序的,不同分区的计算是无序的。
RDD优先位置(preferredLocations)
,Driver把计算任务发送给与数据相近的Executor去,更加节省网络传输数据,提升效率。
RDD依赖关系(dependencies)
,在spark中有窄依赖和宽依赖,每个父RDD的分区最多只能被一个子RDD的一个分区使用称为窄依赖,相反,每个父RDD的分区可以被多个子RDD的分区使用称为宽依赖。
image.png
RDD分区计算(compute)
,compute函数都是在对迭代器进行复合,不需要保存每次计算的结果。
RDD分区函数(partitioner)
,一种是HashPartitioner哈希分区,一种是RangePartitioner区域分区,且partitioner属性只存在于k-v键值对类型的RDD,若非键值对的RDD调用此属性则返回None。 -
累加器:分布式共享只写变量
累加器是用来把Executor端变量信息聚合到Driver端,在Driver程序定义的变量,在Executor端的每个Task会得到这变量的新的副本,每个task计算刷新这个副本值后会返回给Driver端进行合并值。一个分区对应着一个Task,如果以下sumAcc不声明为累加器的话,是没办法得到sumAcc=10的结果的,因为sumAcc在不同的Executor执行会被不断地覆盖,但是Executor不会把sumAcc结果返回给Driver,所以在Driver打印sumAcc的值就永远是0。累加器执行调用.jpg
package com.meizu.xiaojiang.spark.core.SparkCode03
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object SumAcc {
def main(args: Array[String]): Unit = {
//创建spark上下文环境
val config = new SparkConf().setMaster("local[*]").setAppName("SumAcc")
val sc = new SparkContext(config)
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
//系统累加器,spark默认就提供简单数据聚合的累加器
val sumAcc = sc.longAccumulator("sum")
rdd.foreach(
num => {
//调用累加方法
sumAcc.add(num)
}
)
//获取累加汇总值
println("sumAcc:"+sumAcc.value)
sc.stop()
}
}
-
广播变量:分布式共享只读变量
用来高效分发较大的对象。
sc.broadcast(data) //封装广播变量,实现分布式共享读取,避免每个task都各自获取同一个闭包数据。
三、源码深入时刻
-
1、基于内存中创建RDD,分区的设定和分区数据的切分规则
操作分析:点击makeRDD方法,跳到SparkContext.scala
源码,可以看到makeRDD方法实际上也是调用parallelize方法image.png 点击parallelize方法,看到new了一个ParalleCollectionRDD类
image.png 每个RDD都有一个分区列表,进入
ParalleCollectionRDD.scala
可以看到这个getPartitions方法,这个方法是获取分区数并按分区值把数据切分为Array数组image.png 还有一个spark框架的slice方法,在这里就封装着分区数据的切分规则了。源码如下:
def slice[T: ClassTag](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = {
if (numSlices < 1) {
throw new IllegalArgumentException("Positive number of partitions required")
}
// Sequences need to be sliced at the same set of index positions for operations
// like RDD.zip() to behave as expected
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
(0 until numSlices).iterator.map { i =>
val start = ((i * length) / numSlices).toInt
val end = (((i + 1) * length) / numSlices).toInt
(start, end)
}
}
seq match {
case r: Range =>
positions(r.length, numSlices).zipWithIndex.map { case ((start, end), index) =>
// If the range is inclusive, use inclusive range for the last slice
if (r.isInclusive && index == numSlices - 1) {
new Range.Inclusive(r.start + start * r.step, r.end, r.step)
}
else {
new Range(r.start + start * r.step, r.start + end * r.step, r.step)
}
}.toSeq.asInstanceOf[Seq[Seq[T]]]
case nr: NumericRange[_] =>
// For ranges of Long, Double, BigInteger, etc
val slices = new ArrayBuffer[Seq[T]](numSlices)
var r = nr
for ((start, end) <- positions(nr.length, numSlices)) {
val sliceSize = end - start
slices += r.take(sliceSize).asInstanceOf[Seq[T]]
r = r.drop(sliceSize)
}
slices
case _ =>
val array = seq.toArray // To prevent O(n^2) operations for List etc
positions(array.length, numSlices).map { case (start, end) =>
array.slice(start, end).toSeq
}.toSeq
}
}
先判断分区数是否小于1,小于则抛出异常,定义了个positions方法封装着计算规则,seq是代表着数据,把seq去做模式匹配,根据数据的场景选择对应策略调用positions方法划分数据,以最后一种场景来演示计算如下:
比如RDD数据是1,2,3,4,8,指定分区值是2
根据positions方法的切分规则,until表示左开右闭,即循环0,1 计算后返回一个tuple元组(start, end)
(0 until numSlices).iterator.map { i =>
val start = ((i * length) / numSlices).toInt
val end = (((i + 1) * length) / numSlices).toInt
(start, end)
}
演示切分规则
0
start=0*5/2=0
end=(0+1)*5/2=2
返回结果 tuple【0,2) 则第一个数据1和第二个数据2 在分区文件1
1
start=1*5/2=2
end=(1+1)*5/2=5
返回结果 tuple【2,5) 则第三个数据3和第四个数据4和第五个数据8 在分区文件2

总结:源码实现中分析,scala中的
slice(from,until)
方法是获取from 到 until之间的元素,这是一个左开右闭的过程,即不包含until的位置,而spark也封装了一个slice方法,对scala的slice方法进一步扩展,实现把RDD按照分区规则分配到不同的分区中
-
2、基于外部文件创建RDD,分区的设定和分区数据的切分规则
操作分析:点击textFile方法,进入SparkContext.scala
文件,看到有个minPartitions最小分区数量image.png 点击defaultMinPartitions进入看到如下最小分区数量规则
image.png 以上即两者比较,取最小值作为最小分区数量,但注意,这里的最小分区数量并不一定代表最后分区文件就是等于这个最小分区数量
image.png 从上图可以看出基于外部文件创建RDD方式的spark读取文件采用的是hadoop读取文件方式,一行行读取数据,同时遵从hadoop文件的分区规则,点击TextInputFormat类进去找到继承的父类FileInputFormat,找到getSplits方法,看里面的逻辑,如下,是用数据的总字节数除以分区值得到一个分区存放的字节数,然后hadoop文件分区规则规定剩余字节数若超过分区所分配的字节数的10%,则需要重分配一个分区
image.png spark读取文件创建RDD的方式,其分区数据的分配是根据数据分区的偏移量范围来做计算的
比如文件数据是
Hello Spark
Hello Scala
Hello Hbase
Hello Hadoop
Hello Spark
注意里面每行数据末尾有\r\n字符,所以最好是找到这个文件确认查看文件属性字节数,我确认了有64个字节
网友评论