美文网首页
2020-11-25-Spark-4(Spark-Core)

2020-11-25-Spark-4(Spark-Core)

作者: 冰菓_ | 来源:发表于2020-12-04 08:04 被阅读0次

    RDD算子:flatmap glom groupby fliter sample distinct coalesce

    尚未回答的问题

    为什么shuffle中下游的分区数是由上游决定的,以及如何决定
    上游部分聚合完落到hdfs上的数据,上游还会读取吗
    如何获取分区的数量
    局部聚合是如何完成的
    reducebykey和groupbykey的区别
    compactbuffer是怎么样的数据结构
    对于算子中 shuffleRdd aggregator方法的重写
    去重算子底层是如何实现的
    课程中方法重写代码的书写
    对案例的解答

    1.flatmap算子的示例(扁平化)

     val sc = new SparkContext(new SparkConf().setAppName("tset").setMaster("local[*]"))
        val rdd: RDD[List[Int]] = sc.makeRDD(List[List[Int]](List(1, 2), List(3, 4)))
        val result: RDD[Int] = rdd.flatMap(data => data)
        result.collect.foreach(println)
        sc.stop()
    
       val sc = new SparkContext(new SparkConf().setAppName("tset").setMaster("local[*]"))
        val rdd: RDD[String] = sc.makeRDD(List[String]("haddoop yarn", "spark scala"))
        val result: RDD[String] = rdd.flatMap(data => data.split(" "))
        result.collect.foreach(println)
        sc.stop()
    
        val sc = new SparkContext(new SparkConf().setAppName("tset").setMaster("local[*]"))
        val RDD = sc.makeRDD(List(List(1, 2), 3, List(4, 5)))
        //模式匹配
        val result: RDD[Any] = RDD.flatMap {
          case list: List[_] => list
          case num => List(num)
        }
        result.collect.foreach(println)
        sc.stop()
    

    2.glom算子演示(包装为数组形式)

      val sc = new SparkContext(new SparkConf().setAppName("tset").setMaster("local[*]"))
        val RDD = sc.makeRDD(List[Int](1, 2, 3, 4, 5, 6))
        val result: RDD[Array[Int]] = RDD.glom().map(data => data)
        result.collect.foreach(data => println(data.mkString(",")))
        sc.stop()
    
        //求取分区的最大值的和
        val sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
        val rdd = sc.makeRDD(List[Int](1, 2, 3, 4, 5, 6), 2)
        val glomrdd: RDD[Array[Int]] = rdd.glom()
        val result: RDD[Int] = glomrdd.map(data => {
          data.max
        })
        println(result.collect.sum)
        sc.stop()
    

    3.groupby(key的类型为所判断的类型)

        val sc = new SparkContext(new SparkConf().setAppName("test2").setMaster("local[*]"))
        val rdd = sc.makeRDD(List[String]("hadoop,yarn,mapreduce", "java,scala,node.js"), 2)
        val flatrdd: RDD[String] = rdd.flatMap(data => data.split(","))
        //key是 charat(0)
        //  val result: RDD[(Char, Iterable[String])] = flatrdd.groupBy(data => data.charAt(0))
        //key是true和false
        val res: RDD[(Boolean, Iterable[String])] = flatrdd.groupBy(data => data.contains("s"))
        res.collect.foreach(println)
        sc.stop()
    
        val sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
        //读取文本获取,时间内的点击量
        val rdd = sc.textFile("src/main/resources/aa.txt")
        val value: RDD[(String, Int)] = rdd.map(data => {
          val str: Array[String] = data.split(" ")
          //获取时间
          val str1 = str(3)
          val format = new SimpleDateFormat("dd/mm/yyyy:HH:MM:SS")
          val str2: Date = format.parse(str1)
          val format1 = new SimpleDateFormat("HH")
          val str3: String = format1.format(str2)
          (str3, 1)
        }).groupBy(_._1).map(data => (data._1, data._2.size))
        value.collect.foreach(println)
        sc.stop()
    

    4.fliter算子的示例(数据倾斜)

    5.sample算子的示例(伯努利和泊松分布)

    withReplacement:表示抽出样本后是否在放回去,true表示会放回去,这也就意味着抽出的样本可能有重复

       val sc = new SparkContext(new SparkConf().setAppName("test3").setMaster("local[*]"))
        val RDD = sc.makeRDD(List[Int](1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
        val result = RDD.sample(
          true,
          0.4
        )
        result.collect.foreach(println)
        sc.stop()
      }
    
    image.png

    6.distinct算子底层与scala中distinct的区别

    spark中的去重

    partitioner match {
          case Some(_) if numPartitions == partitions.length =>
            mapPartitions(removeDuplicatesInPartition, preservesPartitioning = true)
          case _ => map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1)
        }
    

    scala中的去重(调用的是hashset)

    default Object distinct() {
          boolean isImmutable = this instanceof scala.collection.immutable.Seq;
          if (isImmutable && this.lengthCompare(1) <= 0) {
             return this.repr();
          } else {
             Builder b = this.newBuilder();
             HashSet seen = new HashSet();
             Iterator it = this.iterator();
             boolean different = false;
    

    7.coalesce(分区)

    (缩减分区数,要指定true否则是没有shuffle的,数据倾斜,想要扩大分区就必须指定trueshuffle spark提供了一个repartition算子扩大分区)

        val sc = new SparkContext(new 
        SparkConf().setAppName("test3").setMaster("local[*]"))
        val RDD = sc.makeRDD(List[Int](1, 2, 3, 4, 1, 4, 7, 8, 9),3)
        RDD.coalesce(2,true).saveAsTextFile("src/aa")
        sc.stop()
    

    8.repartition(扩大分区)

    相关文章

      网友评论

          本文标题:2020-11-25-Spark-4(Spark-Core)

          本文链接:https://www.haomeiwen.com/subject/qvpgiktx.html