美文网首页Spark
SparkCore(二)

SparkCore(二)

作者: 八爪鱼下水 | 来源:发表于2021-03-30 14:28 被阅读0次

每种部署模式如何提交任务?

Client模式yarn

  • 本地通过Spark-Submit提交任务,执行Main进程,
    通过向ResourceManager申请启动ApplicationMaster,ResourceManager通知每一个NodeManager启动一个ApplicationMaster.

  • 然后ApplicationMaster 通知NodeManager 启动Executor进程.
    Executor进程反向注册给Driver

  • Driver 会一行一行的执行Spark代码,执行到某一个ACCTION之后,就触发一个JOB. 然后DAG 为每一个stoge 创建指定数量的task. TaskScheduler, 将每一个stoge 的task.分配到各个Excutor 上去执行算子函数.

Cluster模式yarn模式

  • Exexutor反向注册完成后,AppMaster就会知道自己有哪些资源可以用.然后执行JOB拆分Stoge,提交Stage的Task.进行Task调度,分配到各个Executor上面去执行.

两种模式的不同点是什么?

1.运行地点不同.

  1. yarn-client会导致本地负责Spark任务调取.

3.所以yarn-cluster模式下,效果更好一些,因为不用反向注册回来给本地机器.

RDD

Resilient Distributed Dataset (弹性化,分布式,数据集)

  • 不可变, 可分区, 可并行计算的集合

五大属性

1.分区列表

  • RDD都是由不同分区组成,分区可以按照0-1-2-3-依次标号

2.计算函数

  • 每一个RDD的分区都是由计算函数作用

3.依赖关系

  • 每一个RDD依赖于父RDD

4.Key-Value

  • 默认HashPartitioner
  • partitionBy()更改分区器和分区个数

5.位置优先性 : 移动计算不移动存储

RDD的依赖

1-为什么有依赖
  • Spark计算框架支持DAG,DAG前向执行计算,后向构建依赖关系
2-依赖有什么作用
  • 通过依赖关系来容错
  • 通过依赖构建血缘关系
  • 加快并行计算 宽依赖会发生shuffle,窄依赖会发生大数据量并行计算
3- 如何判断宽窄依赖?
  • 宽依赖,父RDD对应多个子RDD
  • 窄依赖,父RDD对应1个子RDD
  • 分区器跟分区个数一样也是窄依赖
    面试:groubyKey是窄依赖和宽依赖?
    大多数是宽依赖,分区个数跟分区数量一致

RDD的DAG

1-Spark的计算引擎关键组成.
2-DAG通过Action算子划分.
3-DAG对应就是Job.
4-DAG内部通过Shuffle算子划分Stages.

RDD的缓存

两种:Cache和Persist

  • Cache 默认调用的是 Persist

  • 缓存级别有很多:
    尽量选择内存,如果内存放不下可以尝试序列化,除非算子昂贵可以放在磁盘,如果容错恢复增加副本机制

  • rdd.cache - rdd.persist - rdd.unpersist

RDD的checkpoint

把RDD检查点放到hdfs中.斩断依赖关系,后续使用可以直接读取了.如果删除会报错.

  • 个Spark的Application下面有很多DAG有向无环图

  • 一个DAG对应的就是1个Job

  • 一个Job下面根据是否发生Shuffle或宽依赖划分Stage

  • 一个Stage下面有很多TaskSet,一个TaskSet就是一个RDD算子

  • 一个TaskSet下面有很多task

  • 每个Task都需要一个task线程执行每个分区的计算

广播变量

  • 1-广播变量,是在driver端定义的,executor端拥有副本,在executor端是不能改变广播变量的值

  • 2-广播变量获取的时候是从BlockManager中获取数据,如果本地没有从Driver端获取变量副本

  • 3-如何使用:sc.broadcast(map.collect)

 def main(args: Array[String]): Unit = {
    //申请资源

    val spark: SparkSession = SparkSession.builder()
      .appName(this.getClass.getSimpleName.stripPrefix("$"))
      .master("local[*]")
      .getOrCreate()
    import spark.implicits._
    //创建RDD
    val sc: SparkContext = spark.sparkContext
    //水果名称
    val kvFruit: RDD[(Int, String)] = sc
      .parallelize(List((1, "apple"), (2, "orange"), (3, "banana"), (4, "grape")))
    //把水果转换成map集合
    val collmap: collection.Map[Int, String] = kvFruit.collectAsMap

    //设置水果编号
    val fruitMap: RDD[Int] = sc.parallelize(Array(2,1))

    //需求:根据水果的编号查找水果的名称
    fruitMap.map(x=>collmap(x)).collect().foreach(println)

    //改进:如果水果很多,
    // 那么每个水果都需要拉取fruitMap变量进行对比得到水果名称
    val valueBroad: Broadcast[collection.Map[Int, String]] =
    sc.broadcast(collmap) //此处为 广播变量.
    //打印水果
    fruitMap.map(x=>valueBroad.value(x)).collect().foreach(println)

    spark.stop()
  }

累加器

  • 共享变量-累加器

  • scala的累加

  • rdd的累加问题

  • 累加器

sc.longAccumulator("acc_count")

 def main(args: Array[String]): Unit = {
    //申请资源

    val spark: SparkSession = SparkSession.builder()
      .appName(this.getClass.getSimpleName.stripPrefix("$"))
      .master("local[*]")
      .getOrCreate()
    import spark.implicits._

    val sc: SparkContext = spark.sparkContext

    var seq = Seq(1,2,3)

    //scala的加法
    var count = 0
    seq.map(x=> count += x)
   // println(count)

    //rdd的加法-0--为什么会出现现象?因为变量在driver端定义,
    // 将数据发送到executor执行累加,
    // 但是执行完累加后结果并没返回driver
    var counter2 = 0
    val rdd1: RDD[Int] = sc.parallelize(seq)
    rdd1.foreach(x => counter2 += x)
    //println(counter2)

    //提出了在driver端和executor端共享当前变量
    //累加器也是在action操作的时候触发
    val acc: Accumulator[Int] = sc.accumulator(0) //
    rdd1.foreach(x=>acc+=x)
   // println(acc)

    //使用不过期的方法
    val acc_count: LongAccumulator = sc.longAccumulator("acc_count")
    rdd1.foreach(x=>acc_count.add(x))
    println(acc_count)//LongAccumulator(id: 51, name: Some(acc_count), value: 6)
    println(acc_count.value)

  }

相关文章

  • SparkCore基础(二)

    * SparkCore基础(二) 继续探讨SparkCore,开门见山,不多废话。 SparkApplicatio...

  • SparkCore(二)

    每种部署模式如何提交任务? Client模式yarn 本地通过Spark-Submit提交任务,执行Main进程,...

  • SparkStreaming基础

    * SparkStreaming基础 打开之前构建好的Maven工程,如何构建?请参看SparkCore基础(二)...

  • 大数据入门:Spark RDD基础概念

    在Spark框架的核心部分,SparkCore作为平台基础通用执行引擎,重要性自是不必多说。而在SparkCore...

  • spark组成

    spark spark = SparkCore + SparkSq1 + SparkStreaming + MLl...

  • 通过案例对spark streaming透彻理解三板斧之一

    spark的核心是sparkcore,sparkstreaming,sparkgraph,sparkmlib其实是...

  • SparkSQL(四)

    什么是sparksql SparkCore撰写代码非常复杂,引入SparkSQL处理结构化数据 SparkSQL基...

  • SparkCore基础(一)

    * SparkCore基础(一) 学习Spark,首先要熟悉Scala,当然你说你会Python或者Java能不能...

  • SparkCore之Action

    reduce(func)案例 作用: 通过func函数聚集RDD中的所有元素,先聚合分区内数据,再聚合分区间数据。...

  • sparkcore的join

    先准备两个rdd数据 1.a join b得到的结果 2.a leftOuterJoin b得到的结果 3.a r...

网友评论

    本文标题:SparkCore(二)

    本文链接:https://www.haomeiwen.com/subject/ynxghltx.html