每种部署模式如何提交任务?
Client模式yarn
-
本地通过Spark-Submit提交任务,执行Main进程,
通过向ResourceManager申请启动ApplicationMaster,ResourceManager通知每一个NodeManager启动一个ApplicationMaster. -
然后ApplicationMaster 通知NodeManager 启动Executor进程.
Executor进程反向注册给Driver -
Driver 会一行一行的执行Spark代码,执行到某一个ACCTION之后,就触发一个JOB. 然后DAG 为每一个stoge 创建指定数量的task. TaskScheduler, 将每一个stoge 的task.分配到各个Excutor 上去执行算子函数.
Cluster模式yarn模式
- Exexutor反向注册完成后,AppMaster就会知道自己有哪些资源可以用.然后执行JOB拆分Stoge,提交Stage的Task.进行Task调度,分配到各个Executor上面去执行.
两种模式的不同点是什么?
1.运行地点不同.
- yarn-client会导致本地负责Spark任务调取.
3.所以yarn-cluster模式下,效果更好一些,因为不用反向注册回来给本地机器.
RDD
Resilient Distributed Dataset (弹性化,分布式,数据集)
- 不可变, 可分区, 可并行计算的集合
五大属性
1.分区列表
- RDD都是由不同分区组成,分区可以按照0-1-2-3-依次标号
2.计算函数
- 每一个RDD的分区都是由计算函数作用
3.依赖关系
- 每一个RDD依赖于父RDD
4.Key-Value
- 默认HashPartitioner
- partitionBy()更改分区器和分区个数
5.位置优先性 : 移动计算不移动存储
RDD的依赖
1-为什么有依赖
- Spark计算框架支持DAG,DAG前向执行计算,后向构建依赖关系
2-依赖有什么作用
- 通过依赖关系来容错
- 通过依赖构建血缘关系
- 加快并行计算 宽依赖会发生shuffle,窄依赖会发生大数据量并行计算
3- 如何判断宽窄依赖?
- 宽依赖,父RDD对应多个子RDD
- 窄依赖,父RDD对应1个子RDD
- 分区器跟分区个数一样也是窄依赖
面试:groubyKey是窄依赖和宽依赖?
大多数是宽依赖,分区个数跟分区数量一致
RDD的DAG
1-Spark的计算引擎关键组成.
2-DAG通过Action算子划分.
3-DAG对应就是Job.
4-DAG内部通过Shuffle算子划分Stages.
RDD的缓存
两种:Cache和Persist
-
Cache 默认调用的是 Persist
-
缓存级别有很多:
尽量选择内存,如果内存放不下可以尝试序列化,除非算子昂贵可以放在磁盘,如果容错恢复增加副本机制 -
rdd.cache - rdd.persist - rdd.unpersist
RDD的checkpoint
把RDD检查点放到hdfs中.斩断依赖关系,后续使用可以直接读取了.如果删除会报错.
-
个Spark的Application下面有很多DAG有向无环图
-
一个DAG对应的就是1个Job
-
一个Job下面根据是否发生Shuffle或宽依赖划分Stage
-
一个Stage下面有很多TaskSet,一个TaskSet就是一个RDD算子
-
一个TaskSet下面有很多task
-
每个Task都需要一个task线程执行每个分区的计算
广播变量
-
1-广播变量,是在driver端定义的,executor端拥有副本,在executor端是不能改变广播变量的值
-
2-广播变量获取的时候是从BlockManager中获取数据,如果本地没有从Driver端获取变量副本
-
3-如何使用:sc.broadcast(map.collect)
def main(args: Array[String]): Unit = {
//申请资源
val spark: SparkSession = SparkSession.builder()
.appName(this.getClass.getSimpleName.stripPrefix("$"))
.master("local[*]")
.getOrCreate()
import spark.implicits._
//创建RDD
val sc: SparkContext = spark.sparkContext
//水果名称
val kvFruit: RDD[(Int, String)] = sc
.parallelize(List((1, "apple"), (2, "orange"), (3, "banana"), (4, "grape")))
//把水果转换成map集合
val collmap: collection.Map[Int, String] = kvFruit.collectAsMap
//设置水果编号
val fruitMap: RDD[Int] = sc.parallelize(Array(2,1))
//需求:根据水果的编号查找水果的名称
fruitMap.map(x=>collmap(x)).collect().foreach(println)
//改进:如果水果很多,
// 那么每个水果都需要拉取fruitMap变量进行对比得到水果名称
val valueBroad: Broadcast[collection.Map[Int, String]] =
sc.broadcast(collmap) //此处为 广播变量.
//打印水果
fruitMap.map(x=>valueBroad.value(x)).collect().foreach(println)
spark.stop()
}
累加器
-
共享变量-累加器
-
scala的累加
-
rdd的累加问题
-
累加器
sc.longAccumulator("acc_count")
def main(args: Array[String]): Unit = {
//申请资源
val spark: SparkSession = SparkSession.builder()
.appName(this.getClass.getSimpleName.stripPrefix("$"))
.master("local[*]")
.getOrCreate()
import spark.implicits._
val sc: SparkContext = spark.sparkContext
var seq = Seq(1,2,3)
//scala的加法
var count = 0
seq.map(x=> count += x)
// println(count)
//rdd的加法-0--为什么会出现现象?因为变量在driver端定义,
// 将数据发送到executor执行累加,
// 但是执行完累加后结果并没返回driver
var counter2 = 0
val rdd1: RDD[Int] = sc.parallelize(seq)
rdd1.foreach(x => counter2 += x)
//println(counter2)
//提出了在driver端和executor端共享当前变量
//累加器也是在action操作的时候触发
val acc: Accumulator[Int] = sc.accumulator(0) //
rdd1.foreach(x=>acc+=x)
// println(acc)
//使用不过期的方法
val acc_count: LongAccumulator = sc.longAccumulator("acc_count")
rdd1.foreach(x=>acc_count.add(x))
println(acc_count)//LongAccumulator(id: 51, name: Some(acc_count), value: 6)
println(acc_count.value)
}
网友评论