美文网首页
Spark-算子-Active

Spark-算子-Active

作者: Demons_LLL | 来源:发表于2021-05-20 22:55 被阅读0次
    image.png

    reduce

      /**
       * 算出总价
       * 注意点:
       * 1. 函数中的curr参数,并不是value,而是一整条数据
       * 2. reduce 整体上的结果,只有一个
       */
      @Test
      def reduce(): Unit = {
        val tuple: (String, Double) = sc.parallelize(Seq(("手机", 10.0), ("手机", 15.0), ("电脑", 20.0)))
          .reduce((curr, agg) => ("总价", curr._2 + agg._2))
        println(tuple)
      }
    

    foreach

      @Test
      def foeach(): Unit = {
        sc.parallelize(Seq(1, 3, 2, 4, 5, 7, 6))
          .foreach(println(_))
      }
    

    countAndCountByKey

      /**
       * count 和 countByKey 的结果相差很远,每次调用Action 都会生成一个job,job会运行获取结果
       * 所以两个job 会有大量的log,其实就是在运行 job
       *
       * countByKey 的运算结果是 Map(电脑 -> 1, 手机 -> 2)
       * 数据倾斜,如果要解决数据倾斜的问题,是不是要先知道谁倾斜,通过countByKey可以查看对应数据的总数,从而解决数据倾斜的问题
       */
      @Test
      def countAndCountByKey(): Unit = {
        println(sc.parallelize(Seq(("手机", 10.0), ("手机", 15.0), ("电脑", 20.0)))
          .count())
        println("================================== ")
        println(sc.parallelize(Seq(("手机", 10.0), ("手机", 15.0), ("电脑", 20.0)))
          .countByKey())
      }
    

    take

      /**
       * take 和 takeSample都是获取数据,一个是 直接获取,一个是采样获取
       * frist 一般情况下,action会从所有分区获取数据,相对来说速度就比较慢,
       * frist 只是获取第一个元素,所以frist只会处理第一个分区,所以速度很快,无序处理所有数据
       */
      @Test
      def take(): Unit = {
        val rdd: RDD[Int] = sc.parallelize(Seq(1, 2, 3, 4, 5, 6))
        rdd.take(3).foreach(println(_))
        println(rdd.first())
        rdd.takeSample(withReplacement = true, num = 3).foreach(println(_))
      }
    

    相关文章

      网友评论

          本文标题:Spark-算子-Active

          本文链接:https://www.haomeiwen.com/subject/pfqdwktx.html