美文网首页
2020-11-30-Spark-9(Spark-Core)

2020-11-30-Spark-9(Spark-Core)

作者: 冰菓_ | 来源:发表于2020-12-06 08:01 被阅读0次

DAG基本概念 分区算子与shuffle 交集和差集实现 partitionBy实现

1.基本概念

image.png
一个job是一个DAG
一个task是类的实例
taskset等同于stage

2.交集,差集

交集

object Test2 {
  def main(args: Array[String]): Unit = {
    val sc: SparkContext = new SparkContext(new SparkConf().setAppName("test5").setMaster("local[*]"))
    val rdd1 = sc.makeRDD(List(1, 2, 3, 4, 5))
    val rdd2 = sc.makeRDD(List(4, 5, 6, 7, 8))
    val rdd = rdd2.map(data => (data, null))
    val result = rdd1.map(data => (data, null)).cogroup(rdd).filter {
      case (_, (ws, vs)) => ws.nonEmpty && vs.nonEmpty
    }.keys
    result.collect.foreach(println)
    sc.stop()
  }
}

差集
底层使用hash去重

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.immutable.HashMap
import scala.collection.{immutable, mutable}

object Test4 {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setAppName("").setMaster("local[*]"))
    val rdd1 = sc.makeRDD(List(1, 2,2,3, 3, 4, 5))
    val rdd2 = sc.makeRDD(List(4, 5, 5, 6, 7))
    val map1rdd1: RDD[(Int, Iterable[Null])] = rdd1.map(data => (data, null)).groupByKey()
    val map1rdd2: RDD[(Int, Iterable[Null])] = rdd2.map(data => (data, null)).groupByKey()
    var hash = mutable.HashMap[Int, Iterable[Null]]()
    map1rdd1.collect.foreach(data => hash += data)
    map1rdd2.collect.foreach(data => hash.remove(data._1) )
    println(hash)
    val list: immutable.Seq[(Int, Null)] = hash.toList.flatMap(data => {
      data._2.map(data1 => (data._1, data1))
    })
    list.map(data => data._1).foreach(println)
    sc.stop()
  }
}

3.分区算子

//partitionBy实现
object Test3 {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setAppName("").setMaster("local[*]"))
    var rdd: RDD[(String, Int)] =sc.makeRDD(List(("A",1),("B",2),("D",9),("A",1)),2)
    val value: ShuffledRDD[String, Int, Int] = new ShuffledRDD[String, Int, Int](rdd, new HashPartitioner(rdd.partitions.length+1))
    println(value.partitions.length)
    sc.stop()
  }
}
image.png

4.Driver和 Executor

相关文章

网友评论

      本文标题:2020-11-30-Spark-9(Spark-Core)

      本文链接:https://www.haomeiwen.com/subject/bsrqwktx.html