美文网首页
Spark Core - 编程基础

Spark Core - 编程基础

作者: 奋斗的蛐蛐 | 来源:发表于2021-03-17 11:23 被阅读0次

    RDD编程

    什么是RDD

    RDD是Spark的基石,是实现Spark数据处理的核心抽象。RDD是一个抽象类,它代表一个不可变、可分区、里面元素可并行计算的集合。

    RDD(Resilient Distributed Dataset)是Spark中的核心概念,它是一个容错、可以并行执行的分布式数据集

    什么是RDD.png
    RDD包含五个特征
    1. 一个分区的列表
    2. 一个计算函数:compute,作用是对每个分区进行计算
    3. 记录对其他RDDs的依赖(宽依赖、窄依赖)列表
    4. 对于Key-value RDD来说,存在一个分区器(可选的)
    5. 对每个分区有一个优先位置的列表(可选的)
    • 一组分片,即数据集的基本组成单位。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度,用户可以在创建RDD时指定RDD分片个数,如果没有指定,那么就会采用默认值。
    • 一个对分区数据进行计算的函数。Spark中RDD的计算是以分片为单位的,每个RDD都会实现compute函数以达到该目的。compute函数会对迭代器进行组合,不需要保存每次的计算结果。
    • RDD之间存在依赖关系,RDD每次转换都会生成一个新的RDD,RDD之间形成类似于流水线一样的前后依赖关系(lineage)。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对所有的分区进行计算。
    • 对于Key-Value的RDD来说,可能存在分区器(Partitioner)。Spark实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另一个是基于范围的RangePartitioner。只有key-value的RDD,才可能有Partitioner,非Key-Value的RDD的Partitioner的值是nono。Partitioner函数决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。
    • 一个列表,存储每个Parition的优先位置(preferred location)。对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置,安装“移动计算不移动数据”的理念,会尽可能的将计算任务分配到其所要处理数据块的存储位置。

    RDD特点

    分区

    RDD逻辑上时分区的,每个分区的数据时抽象存在的,计算的时候会通过一个compute函数得到每个分区的数据,如果RDD是通过已有的文件系统构建,则Compute函数是读取指定文件系统中的数据,如果RDD是通过其他RDD转换而来,而Compute函数是执行转换逻辑将其他RDD的数据进行转换。


    分区.png
    只读

    RDD是只读的,想要改变RDD中的数据,只能在现有的RDD基础上创建新的RDD。一个RDD转换为另一个RDD。通过丰富的操作算子(map,filter,union,join,reduceByKey等)实现,不像MR只能写Map和Reduce

    只读.png

    RDD的操作算子包含两类:

    • transformation 用来对RDD进行转化,延迟执行(lazy)
    • action 用来出发RDD计算,得到的计算结果或者将RDD保存到文件系统中
    依赖

    RDDs通过算子进行转换,转换得到的新的RDD包含了从其他RDDs衍生的所必需的信息,RDDs之间维护着这种血缘关系(lineage),也称之为依赖。依赖包括两种

    • 窄依赖。RDDs之间分区是一一对应的(1:1 或 n:1)
    • 宽依赖。子RDD的每个分区与父RDD的每个分区都有关,是多对多的关系(n:m)有Shuffle发生
    依赖.png
    缓存

    可以控制存储级别(内存、磁盘等)进行缓存

    如果在应用程序中多次使用同一个RDD,可以将该RDD缓存起来,该RDD只有在第一次计算的时候会根据血缘关系得到的分区数据,在后续其他地方用到该RDD的时候,会直接冲缓存处读取而不是在根据血缘关系计算,这样就加速的重用

    缓存.png
    checkpoint

    虽然RDD的血缘关系天然地可以实现容错,当RDD的某个分区数据失败或丢失,可以通过血缘关系重建。但是于长时间迭代型应用来说,随着迭代的进行,RDDs之间的血缘关系会越来越长,一旦在后续的迭代过程中出错,则需要通过非常长的血缘关系重建,势必影响性能。RDD支持checkpoint将数据保存到持久化的存储中,这样就可以切断之前的血缘关系,因为checkpoint后的RDD就不需要知道他的父RDDs,他可以直接从checkpoint处拿到数据。

    Spark编程模型

    Spark编程模型.png
    • RDD表示数据对象
    • 通过对象上的方法调用来对RDD进行转换
    • 最终显示结果或将结果输出到外部数据源
    • RDD转换算子称为Transformation是Lazy的(延迟执行)
    • 需要遇到Action算子才会执行RDD操作

    需要使用Spark,需要编写Driver程序,它被提交到集群运行

    • Driver中定义了一个或者多个RDD,并调用RDD上的各种算子
    • Worker则执行RDD分区计算任务
    Spark编程模型2.png

    RDD创建

    SparkContext

    SparkContext是编写Spark程序用到的第一个类,是Spark的主要入口点。他负责整个集群的交互。如果把Spark集群当作服务端,那么Driver就是客户端,SparkContext是客户端的核心,SparkContext是Spark对外接口,负责向调用者提供Spark的各种功能。SparkContext用于连接Spark集群、创建RDD、累加器、广播变量。在Spark-shell中SparkContext已经创建好了,可直接使用,编写Spark Driver程序的第一件事就是创建SparkContext

    RDD创建.png
    从集合创建RDD

    从集合中创建RDD,主要用户测试,Spark提供一下函数:parallelize、makeRDD、range

    /** Distribute a local Scala collection to form an RDD.
       *
       * @note Parallelize acts lazily. If `seq` is a mutable collection and is altered after the call
       * to parallelize and before the first action on the RDD, the resultant RDD will reflect the
       * modified collection. Pass a copy of the argument to avoid this.
       * @note avoid using `parallelize(Seq())` to create an empty `RDD`. Consider `emptyRDD` for an
       * RDD with no partitions, or `parallelize(Seq[T]())` for an RDD of `T` with empty partitions.
       * @param seq Scala collection to distribute
       * @param numSlices number of partitions to divide the collection into
       * @return RDD representing distributed collection
       */
    def parallelize[T: ClassTag](
      seq: Seq[T],
      numSlices: Int = defaultParallelism): RDD[T] = withScope {
      assertNotStopped()
      new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
    }
    
    /**
       * Creates a new RDD[Long] containing elements from `start` to `end`(exclusive), increased by
       * `step` every element.
       *
       * @note if we need to cache this RDD, we should make sure each partition does not exceed limit.
       *
       * @param start the start value.
       * @param end the end value.
       * @param step the incremental step
       * @param numSlices number of partitions to divide the collection into
       * @return RDD representing distributed range
       */
    def range(
      start: Long,
      end: Long,
      step: Long = 1,
      numSlices: Int = defaultParallelism): RDD[Long] = withScope {
      assertNotStopped()
      // when step is 0, range will run infinitely
      require(step != 0, "step cannot be 0")
      val numElements: BigInt = {
        val safeStart = BigInt(start)
        val safeEnd = BigInt(end)
        if ((safeEnd - safeStart) % step == 0 || (safeEnd > safeStart) != (step > 0)) {
          (safeEnd - safeStart) / step
        } else {
          // the remainder has the same sign with range, could add 1 more
          (safeEnd - safeStart) / step + 1
        }
      }
      parallelize(0 until numSlices, numSlices).mapPartitionsWithIndex { (i, _) =>
        val partitionStart = (i * numElements) / numSlices * step + start
        val partitionEnd = (((i + 1) * numElements) / numSlices) * step + start
        def getSafeMargin(bi: BigInt): Long =
        if (bi.isValidLong) {
          bi.toLong
        } else if (bi > 0) {
          Long.MaxValue
        } else {
          Long.MinValue
        }
        val safePartitionStart = getSafeMargin(partitionStart)
        val safePartitionEnd = getSafeMargin(partitionEnd)
    
        new Iterator[Long] {
          private[this] var number: Long = safePartitionStart
          private[this] var overflow: Boolean = false
    
          override def hasNext =
          if (!overflow) {
            if (step > 0) {
              number < safePartitionEnd
            } else {
              number > safePartitionEnd
            }
          } else false
    
          override def next() = {
            val ret = number
            number += step
            if (number < ret ^ step < 0) {
              // we have Long.MaxValue + Long.MaxValue < Long.MaxValue
              // and Long.MinValue + Long.MinValue > Long.MinValue, so iff the step causes a step
              // back, we are pretty sure that we have an overflow.
              overflow = true
            }
            ret
          }
        }
      }
    }
    
    /** Distribute a local Scala collection to form an RDD.
       *
       * This method is identical to `parallelize`.
       * @param seq Scala collection to distribute
       * @param numSlices number of partitions to divide the collection into
       * @return RDD representing distributed collection
       */
    def makeRDD[T: ClassTag](
      seq: Seq[T],
      numSlices: Int = defaultParallelism): RDD[T] = withScope {
      parallelize(seq, numSlices)
    }
    
    scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5,6))
    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
    
    scala> val rdd2 = sc.parallelize(1 until 100)
    rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24
    
    scala> rdd1.collect
    res0: Array[Int] = Array(1, 2, 3, 4, 5, 6)
    
    scala> rdd2.collect
    res1: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99)
    
    scala> rdd1.getNumPartitions
    res2: Int = 5
    
    scala> rdd1.partition
    partitioner   partitions
    
    scala> rdd1.partitions.length
    res3: Int = 5
    
    scala> val rdd3 = sc.range(1,100,2)
    rdd3: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[3] at range at <console>:24
    
    scala> rdd3.collect
    res4: Array[Long] = Array(1, 3, 5, 7, 9, 11, 13, 15, 17, 19, 21, 23, 25, 27, 29, 31, 33, 35, 37, 39, 41, 43, 45, 47, 49, 51, 53, 55, 57, 59, 61, 63, 65, 67, 69, 71, 73, 75, 77, 79, 81, 83, 85, 87, 89, 91, 93, 95, 97, 99)
    
    scala> val rdd4 = sc.parallelize(1 to 100,3)
    rdd4: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24
    
    scala> rdd3.getNumPartitions
    res5: Int = 5
    
    scala> val rdd3 = sc.range(1,100,2,1)
    rdd3: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[6] at range at <console>:24
    
    scala> rdd3.getNumPartitions
    res6: Int = 1
    

    rdd.collect方法是聚合的意思,在生产中不要使用,会造成Driver OOM

    从文件系统创建RDD

    用textFile()方法来文件系统中加载数据创建RDD,方法将文件的URI作为参数,这个URI可以是:

    • 本地文件系统
      • 如果是本地文件系统,在Spark集群中,要注意是否每个服务器上同个目录下都有该文件
    • 分布式文件系统HDFS的地址
    • Amazon S3的地址
    // 从本地文件系统加载数据
    val lines = sc.textFile("file:///root/data/wc.txt")
    // 从分布式文件系统加载数据
    val lines = sc.textFile("hdfs://linux121:9000/user/root/data/uaction.dat") val lines = sc.textFile("/user/root/data/uaction.dat")
    val lines = sc.textFile("data/uaction.dat")
    
    从RDD创建RDD

    本质是将一个RDD转换为另一个RDD。详细信息参见 3.5 Transformation

    Transformation

    RDD的操作算子分为两类:

    • Transformation,用来对RDD进行转化,这个操作是延时执行或者说是Lazy的,返回的是一个新的RDD。
    • Action,用来触发RDD的计算,得到相关计算结果 或者 将结果保存的外部系统中,返回结果int、double、集合(不会返回新的RDD)
    • 要很准确区分Transformation、Action

    每一次Tranformation操作都会产生新的RDD,供给下一个“转化”使用,转化得到的RDD是惰性求值得,也就是说,整个转换过程只是记录了转换的轨迹,并不会真正的计算,只有遇到了Action操作时,才会发生真正的计算,开始从血缘关系(lineage)源头开始,进行物理的转换操作;

    Transformation.png

    创建的Transformation:官网

    常用转换算子1
    • map(func):对数据集中的每个元素都进行func操作,然后返回一个新的RDD。

    • filter(func):对数据集中的每个元素都进行func操作,然后执行func操作,执行为true的返回构成一个新的RDD

    • flatMap(func) 与map类似,和Scala中的flatMap一致,每个输入元素会被映射为0个或多个输出元素

    • mapPartitions(func):和map很像,但是map是将func作用在每个元素上,而mapPartitions是func作用在整个分区上。假设一个RDD有N个元素,M个分区,(N >> M),那么map的函数将会将会被调用N次,而mapParitions会被调用M次一次处理一个分区的数据

    • mapPartitionsWithIndex(func):与mapPartitions类似,多了分区的索引值的信息。

    全都是窄依赖

    scala> val rdd1 = sc.parallelize(1 to 100)
    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
    
    scala> rdd1.collect
    res0: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100)
    
    scala> rdd1.map(_ * 2).collect
    res1: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48, 50, 52, 54, 56, 58, 60, 62, 64, 66, 68, 70, 72, 74, 76, 78, 80, 82, 84, 86, 88, 90, 92, 94, 96, 98, 100, 102, 104, 106, 108, 110, 112, 114, 116, 118, 120, 122, 124, 126, 128, 130, 132, 134, 136, 138, 140, 142, 144, 146, 148, 150, 152, 154, 156, 158, 160, 162, 164, 166, 168, 170, 172, 174, 176, 178, 180, 182, 184, 186, 188, 190, 192, 194, 196, 198, 200)
    
    scala> rdd1.filter(_ > 50).collect
    res2: Array[Int] = Array(51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100)
    
    scala> val rdd2 = sc.textFile("/azkaban-wc/wc.txt")
    rdd2: org.apache.spark.rdd.RDD[String] = /azkaban-wc/wc.txt MapPartitionsRDD[4] at textFile at <console>:24
    
    scala> rdd2.flatMap(_.split(" ")).collect
    res4: Array[String] = Array(hadoop, mapreduce, yarn, hdfs, hadoop, mapreduce, mapreduce, yarn, lagou, lagou, lagou)
    
    ## mapPartitions
    scala> val rdd1 = sc.makeRDD(1 to 20,3)
    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at <console>:24
    
    scala> rdd1.getNumPartitions
    res0: Int = 3
    
    scala> rdd1.mapPartitions(as => Iterator(as.toArray.mkString(":"))).collect
    res5: Array[String] = Array(1:2:3:4:5:6, 7:8:9:10:11:12:13, 14:15:16:17:18:19:20)
    
    scala> rdd1.mapPartitions(iter => Iterator(iter.toList)).collect
    res8: Array[List[Int]] = Array(List(1, 2, 3, 4, 5, 6), List(7, 8, 9, 10, 11, 12, 13), List(14, 15, 16, 17, 18, 19, 20))
    
    scala> rdd1.mapPartitions(iter => Iterator(iter.toArray.toBuffer)).collect
    res9: Array[scala.collection.mutable.Buffer[Int]] = Array(ArrayBuffer(1, 2, 3, 4, 5, 6), ArrayBuffer(7, 8, 9, 10, 11, 12, 13), ArrayBuffer(14, 15, 16, 17, 18, 19, 20))
    
    
    scala> rdd1.mapPartitionsWithIndex((i,iter) => Iterator(i + "|" + iter.toArray.toBuffer)).collect
    res13: Array[String] = Array(0|ArrayBuffer(1, 2, 3, 4, 5, 6), 1|ArrayBuffer(7, 8, 9, 10, 11, 12, 13), 2|ArrayBuffer(14, 15, 16, 17, 18, 19, 20))
    
    
    /**
    * 第一个参数参数是一个方法,讲一个迭代器迭代成另一个迭代器
    */
    def mapPartitions[U: ClassTag](
      f: Iterator[T] => Iterator[U],
      preservesPartitioning: Boolean = false): RDD[U] = withScope {
      val cleanedF = sc.clean(f)
      new MapPartitionsRDD(
        this,
        (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
        preservesPartitioning)
    }
    
    def mapPartitionsWithIndex[U: ClassTag](
      f: (Int, Iterator[T]) => Iterator[U],
      preservesPartitioning: Boolean = false): RDD[U] = withScope {
      val cleanedF = sc.clean(f)
      new MapPartitionsRDD(
        this,
        (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter),
        preservesPartitioning)
    }
    

    map 与 mapPartitions的区别

    • map:每次处理一条数据
    • mapPartitions:每次处理一个分区的数据,分区的数据处理完成后,数据才能释放,资源不足容易OOM
    • 最佳时间,当内存资源充足时,建议使用mapPartitions,以提高处理效率
    常用转换算子2
    • groupBy(func):按照传入函数的返回值进行分组,将key相同的值放入一个迭代器

    • glom():将每一个分区形成一个数组,形成新的RDD[Array[T]]

    • sample(withReplaceMent,fraction,seed):采样算子,以指定的随机种子(seed)随机抽样出数量为fraction的数据,withReplacement表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样

    • distinct([num]):对RDD元素去重后,返回一个新的RDD,可传入numTasks才参数改变RDD分区数

    • coalesce(numPartitions):缩减分区数,无shuffle

    • repartitions(numPartitions):增加或缩减分区数,有shuffle

    • sortBy(func,[ascending],[numTasks]):使用func对数据进行处理,对处理后的结果进行排序

    宽依赖算子:groupBy、distinct、repartition、sortBy

    scala> val rdd1 = sc.makeRDD(1 to 10)
    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[15] at makeRDD at <console>:24
    
    scala> rdd1
    res21: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[15] at makeRDD at <console>:24
    
    scala> rdd1.collect
    res22: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    
    scala> rdd1.mapPartitions(iter => iter.map(_ * 10))
    res24: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[16] at mapPartitions at <console>:26
    
    scala> res24.collect
    res25: Array[Int] = Array(10, 20, 30, 40, 50, 60, 70, 80, 90, 100)
    
    scala> val rdd2 = rdd1.groupBy(_ % 3)
    rdd2: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[18] at groupBy at <console>:25
    
    scala> rdd2.collect
    res27: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(6, 9, 3)), (1,CompactBuffer(7, 1, 4, 10)), (2,CompactBuffer(2, 5, 8)))
    
    scala> rdd2.mapValues(_.map(_*2)).collect
    res28: Array[(Int, Iterable[Int])] = Array((0,List(6, 12, 18)), (1,List(2, 20, 8, 14)), (2,List(10, 4, 16)))
    
    scala> val rdd1 = sc.makeRDD(1 to 20,3)
    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:24
    
    scala> rdd1.glom
    res0: org.apache.spark.rdd.RDD[Array[Int]] = MapPartitionsRDD[1] at glom at <console>:26
    
    scala> res0.collect
    res1: Array[Array[Int]] = Array(Array(1, 2, 3, 4, 5, 6), Array(7, 8, 9, 10, 11, 12, 13), Array(14, 15, 16, 17, 18, 19, 20))
    
    scala> rdd1.collect
    res2: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
    
    scala> val rdd1 = sc.makeRDD(1 to 111)
    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at makeRDD at <console>:24
    
    scala> rdd1.glom.collect
    res3: Array[Array[Int]] = Array(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22), Array(23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44), Array(45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66), Array(67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88), Array(89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111))
    
    scala> rdd1.glom.map(_.map(_ * 10)).collect
    res4: Array[Array[Int]] = Array(Array(10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120, 130, 140, 150, 160, 170, 180, 190, 200, 210, 220), Array(230, 240, 250, 260, 270, 280, 290, 300, 310, 320, 330, 340, 350, 360, 370, 380, 390, 400, 410, 420, 430, 440), Array(450, 460, 470, 480, 490, 500, 510, 520, 530, 540, 550, 560, 570, 580, 590, 600, 610, 620, 630, 640, 650, 660), Array(670, 680, 690, 700, 710, 720, 730, 740, 750, 760, 770, 780, 790, 800, 810, 820, 830, 840, 850, 860, 870, 880), Array(890, 900, 910, 920, 930, 940, 950, 960, 970, 980, 990, 1000, 1010, 1020, 1030, 1040, 1050, 1060, 1070, 1080, 1090, 1100, 1110))
    
    scala> val rdd1 = sc.makeRDD(1 to 111,2)
    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at makeRDD at <console>:24
    
    scala> rdd1.glom
    res5: org.apache.spark.rdd.RDD[Array[Int]] = MapPartitionsRDD[7] at glom at <console>:26
    
    scala> rdd1.glom.collect
    res6: Array[Array[Int]] = Array(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55), Array(56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111))
    
    scala> rdd1.glom.getNumPartitions
    res7: Int = 2
    
    scala> val rdd3 = rdd1.glom.map(_.sliding(10,10))
    rdd3: org.apache.spark.rdd.RDD[Iterator[Array[Int]]] = MapPartitionsRDD[11] at map at <console>:25
    
    scala> rdd3.collect
    res8: Array[Iterator[Array[Int]]] = Array(<iterator>, <iterator>)
    # 将数组聚合到一起,然后对聚合到以前的数据进行遍历,下面的_ 表示的是Array[Int],然后对Array[Int]进行分割,步十个元素一组,步长为10
    scala> val rdd3 = rdd1.glom.map(_.sliding(10,10).toArray)
    rdd3: org.apache.spark.rdd.RDD[Array[Array[Int]]] = MapPartitionsRDD[13] at map at <console>:25
    scala> rdd3.collect
    res10: Array[Array[Array[Int]]] = Array(Array(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), Array(11, 12, 13, 14, 15, 16, 17, 18, 19, 20), Array(21, 22, 23, 24, 25, 26, 27, 28, 29, 30), Array(31, 32, 33, 34, 35, 36, 37, 38, 39, 40), Array(41, 42, 43, 44, 45, 46, 47, 48, 49, 50), Array(51, 52, 53, 54, 55)), Array(Array(56, 57, 58, 59, 60, 61, 62, 63, 64, 65), Array(66, 67, 68, 69, 70, 71, 72, 73, 74, 75), Array(76, 77, 78, 79, 80, 81, 82, 83, 84, 85), Array(86, 87, 88, 89, 90, 91, 92, 93, 94, 95), Array(96, 97, 98, 99, 100, 101, 102, 103, 104, 105), Array(106, 107, 108, 109, 110, 111)))
    
    scala> rdd3.getNumPartitions
    res11: Int = 2
    ## 生成一个1 到 33的数组
    scala> val rddx = (1 to 33).toArray
    rddx: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33)
    ## 将数组进行分割,十个元素为一组,默认的步长为1,即第一个数组为1~10,第二个数组为2~11,1和2的步长为1
    scala> rddx.sliding(10).toArray
    res14: Array[Array[Int]] = Array(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), Array(2, 3, 4, 5, 6, 7, 8, 9, 10, 11), Array(3, 4, 5, 6, 7, 8, 9, 10, 11, 12), Array(4, 5, 6, 7, 8, 9, 10, 11, 12, 13), Array(5, 6, 7, 8, 9, 10, 11, 12, 13, 14), Array(6, 7, 8, 9, 10, 11, 12, 13, 14, 15), Array(7, 8, 9, 10, 11, 12, 13, 14, 15, 16), Array(8, 9, 10, 11, 12, 13, 14, 15, 16, 17), Array(9, 10, 11, 12, 13, 14, 15, 16, 17, 18), Array(10, 11, 12, 13, 14, 15, 16, 17, 18, 19), Array(11, 12, 13, 14, 15, 16, 17, 18, 19, 20), Array(12, 13, 14, 15, 16, 17, 18, 19, 20, 21), Array(13, 14, 15, 16, 17, 18, 19, 20, 21, 22), Array(14, 15, 16, 17, 18, 19, 20, 21, 22, 23), Array(15, 16, 17, 18, 19, 20, 21, 22, 23, 24), Array(16, 17, 18, 19, 20, 21, 22, 23, 24, 25), Array(17, 18, 19, 20, 21, 22, 2...
    ## 将数组进行分割,十个元素为一组,默认的步长为5
    scala> rddx.sliding(10,5).toArray
    res15: Array[Array[Int]] = Array(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), Array(6, 7, 8, 9, 10, 11, 12, 13, 14, 15), Array(11, 12, 13, 14, 15, 16, 17, 18, 19, 20), Array(16, 17, 18, 19, 20, 21, 22, 23, 24, 25), Array(21, 22, 23, 24, 25, 26, 27, 28, 29, 30), Array(26, 27, 28, 29, 30, 31, 32, 33))
    ## 将数组进行分割,十个元素为一组,默认的步长为10
    scala> rddx.sliding(10,10).toArray
    res16: Array[Array[Int]] = Array(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), Array(11, 12, 13, 14, 15, 16, 17, 18, 19, 20), Array(21, 22, 23, 24, 25, 26, 27, 28, 29, 30), Array(31, 32, 33))
    
    # 对数据采样,先生成一个数组
    scala> val rdd1 = sc.makeRDD(1 to 200)
    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at <console>:24
    # 对数组进行采样,第一个参数为是否允许采样过的数据,再次被收集到,true为可以,第二个参数是取多少比例的数据,该数据是一个范围,不是准备的就是那么多,最后一个参数为算子,当算子相同的时候,每次采样的数据必定是相同的,该参数可以不写,则每次采样的数据就是完全随机的了。
    scala> val rdd2 = rdd1.sample(true,0.1,10)
    rdd2: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[2] at sample at <console>:25
    
    scala> rdd2.collect
    res1: Array[Int] = Array(2, 4, 20, 20, 37, 60, 67, 81, 96, 106, 107, 156, 164, 166, 167, 172, 173, 174, 174, 185, 187, 188, 195, 197)
    
    scala> rdd1.sample(true,0.1,10).collect
    res2: Array[Int] = Array(2, 4, 20, 20, 37, 60, 67, 81, 96, 106, 107, 156, 164, 166, 167, 172, 173, 174, 174, 185, 187, 188, 195, 197)
    
    scala> rdd1.sample(true,0.1,101).collect
    res3: Array[Int] = Array(28, 32, 45, 53, 60, 94, 102, 120, 157, 162, 167, 170, 170, 183, 185, 200)
    
    scala> rdd1.sample(true,0.1).collect
    res4: Array[Int] = Array(13, 14, 36, 51, 55, 81, 83, 84, 88, 106, 106, 120, 127, 142, 145, 149, 158, 176, 188, 190)
    
    scala> rdd1.sample(true,0.1).collect
    res5: Array[Int] = Array(1, 12, 35, 39, 45, 57, 63, 80, 99, 107, 113, 117, 145, 160, 160, 161, 162, 170, 175, 185, 185, 186, 200)
    
    scala> rdd1.sample(false,0.1,10).collect
    res6: Array[Int] = Array(2, 5, 8, 10, 15, 16, 31, 34, 35, 37, 59, 71, 75, 81, 90, 108, 154, 164, 169, 179, 181, 183)
    
    scala> rdd1.sample(false,0.1,10).collect
    res7: Array[Int] = Array(2, 5, 8, 10, 15, 16, 31, 34, 35, 37, 59, 71, 75, 81, 90, 108, 154, 164, 169, 179, 181, 183)
    
    scala> rdd1.sample(false,0.1).collect
    res8: Array[Int] = Array(31, 40, 44, 67, 70, 107, 108, 112, 117, 125, 154, 163, 170, 174, 185, 187, 199)
    
    ## 测试去重,先生成一个random,然后生成随机数
    scala> val random = scala.util.Random
    random: util.Random.type = scala.util.Random$@6022cf7e
    
    
    scala> val arr = (1 to 30).map(_ => random.nextInt(15))
    arr: scala.collection.immutable.IndexedSeq[Int] = Vector(1, 6, 7, 4, 5, 1, 4, 1, 8, 13, 14, 1, 0, 4, 1, 13, 7, 4, 12, 14, 12, 7, 4, 10, 4, 13, 5, 5, 2, 14)
    
    
    scala> val rdd = sc.makeRDD(arr)
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:26
    
    scala> rdd.collect
    res0: Array[Int] = Array(1, 6, 7, 4, 5, 1, 4, 1, 8, 13, 14, 1, 0, 4, 1, 13, 7, 4, 12, 14, 12, 7, 4, 10, 4, 13, 5, 5, 2, 14)
    
    scala> rdd.distinct.collect
    res1: Array[Int] = Array(0, 10, 5, 1, 6, 7, 12, 2, 13, 8, 4, 14)
    
    
    ## 减少或增加分区数
    # coalesce只能减少分区数,不能增加分区数
    scala> val rdd2 = rdd1.distinct.coalesce(2)
    rdd2: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[13] at coalesce at <console>:25
    
    scala> rdd2.getNumPartitions
    res4: Int = 2
    
    scala> val rdd3 = rdd1.distinct.repartition(2)
    rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[20] at repartition at <console>:25
    
    scala> rdd3.getNumPartitions
    res5: Int = 2
    
    scala> val rdd3 = rdd1.distinct.repartition(6)
    rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[27] at repartition at <console>:25
    
    scala> rdd3.getNumPartitions
    res6: Int = 6
    
    ## 排序,默认升序,false为降序
    scala> rdd1.sortBy(x => x).collect
    res11: Array[Int] = Array(0, 1, 1, 1, 1, 2, 4, 4, 5, 5, 5, 6, 7, 7, 7, 7, 8, 8, 10, 10, 10, 10, 11, 11, 11, 13, 13, 13, 14, 14)
    
    scala> rdd1.sortBy(x => x,false).collect
    res12: Array[Int] = Array(14, 14, 13, 13, 13, 11, 11, 11, 10, 10, 10, 10, 8, 8, 7, 7, 7, 7, 6, 5, 5, 5, 4, 4, 2, 1, 1, 1, 1, 0)
    
    coalesce 与 repartition的区别
    /**
    * Return a new RDD that has exactly numPartitions partitions.
    *
    * Can increase or decrease the level of parallelism in this RDD. Internally, this uses
    * a shuffle to redistribute data.
    *
    * If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
    * which can avoid performing a shuffle.
    *
    * TODO Fix the Shuffle+Repartition data loss issue described in SPARK-23207.
    */
    def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
      //调用coalesce,但是有shuffle,明确指定shuffle为true
      coalesce(numPartitions, shuffle = true)
    }
    
    /**
    * Return a new RDD that is reduced into `numPartitions` partitions.
    *
    * This results in a narrow dependency, e.g. if you go from 1000 partitions
    * to 100 partitions, there will not be a shuffle, instead each of the 100
    * new partitions will claim 10 of the current partitions. If a larger number
    * of partitions is requested, it will stay at the current number of partitions.
    *
    * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
    * this may result in your computation taking place on fewer nodes than
    * you like (e.g. one node in the case of numPartitions = 1). To avoid this,
    * you can pass shuffle = true. This will add a shuffle step, but means the
    * current upstream partitions will be executed in parallel (per whatever
    * the current partitioning is).
    *
    * @note With shuffle = true, you can actually coalesce to a larger number
    * of partitions. This is useful if you have a small number of partitions,
    * say 100, potentially with a few partitions being abnormally large. Calling
    * coalesce(1000, shuffle = true) will result in 1000 partitions with the
    * data distributed using a hash partitioner. The optional partition coalescer
    * passed in must be serializable.
    */
    def coalesce(numPartitions: Int, shuffle: Boolean = false,
                 partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
    (implicit ord: Ordering[T] = null)
    : RDD[T] = withScope {
      require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
      if (shuffle) {
        /** Distributes elements evenly across output partitions, starting from a random partition. */
        val distributePartition = (index: Int, items: Iterator[T]) => {
          var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions)
          items.map { t =>
            // Note that the hash code of the key will just be the key itself. The HashPartitioner
            // will mod it with the number of total partitions.
            position = position + 1
            (position, t)
          }
        } : Iterator[(Int, T)]
    
        // include a shuffle step so that our upstream tasks are still distributed
        new CoalescedRDD(
          new ShuffledRDD[Int, T, T](
            mapPartitionsWithIndexInternal(distributePartition, isOrderSensitive = true),
            new HashPartitioner(numPartitions)),
          numPartitions,
          partitionCoalescer).values
      } else {
        new CoalescedRDD(this, numPartitions, partitionCoalescer)
      }
    }
    

    小结:

    • repartition: 增大或减少分区数;有shuffle
    • coalesce:一般用于减少分区数(此时无shuffle)
    常见转换算子3

    RDD之间的交、并、差算子:分别如下

    • intersection(otherRDD) 交集
    • union(otherRDD) 并集
    • subtract(otherRDD) 差集,rdd1.subtract(rdd2),这里面指的是rdd1相比rdd2的差集
    • cartesian(otherRDD):笛卡尔积
    • zip(otherRDD):将两个RDD组合成key-value形式的RDD,默认两个RDD的partition数量以及元素数量都相同,否则会抛出异常。

    宽依赖的算子(shuffle):intersection、subtract、cartesian

    scala> val rdd1 = sc.range(1,21)
    rdd1: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[1] at range at <console>:24
    
    scala> val rdd2 = sc.range(10,31)
    rdd2: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[3] at range at <console>:24
    
    scala> rdd1.intersection(rdd2).collect
    res0: Array[Long] = Array(15, 20, 10, 16, 11, 17, 12, 13, 18, 19, 14)
    
    scala> rdd1.intersection(rdd2).collect.sorted
    res1: Array[Long] = Array(10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
    
    scala> rdd1.intersection(rdd2).sortBy(x => x).collect
    res3: Array[Long] = Array(10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
    
    scala> rdd1.intersection(rdd2).sortBy(x => x).collect
    res3: Array[Long] = Array(10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
    
    scala> rdd1.union(rdd2).sortBy(x => x).collect
    res4: Array[Long] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 10, 11, 11, 12, 12, 13, 13, 14, 14, 15, 15, 16, 16, 17, 17, 18, 18, 19, 19, 20, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30)
    
    scala> rdd1.union(rdd2).distinct.sortBy(x => x).collect
    res5: Array[Long] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30)
    
    scala> rdd1.subtract(rdd2).sortBy(x => x).collect
    res6: Array[Long] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
    
    scala> rdd1.intersection(rdd2).getNumPartitions
    res7: Int = 5
    
    scala> rdd1.union(rdd2).getNumPartitions
    res8: Int = 10
    
    scala> rdd1.subtract(rdd2).getNumPartitions
    res9: Int = 5
    
    scala> val rdd1 = sc.range(1,5)
    rdd1: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[63] at range at <console>:24
    
    scala> val rdd2 = sc.range(6,10)
    rdd2: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[65] at range at <console>:24
    
    scala> rdd1.cartesian(rdd2).collect
    res10: Array[(Long, Long)] = Array((1,6), (1,7), (1,8), (1,9), (2,6), (2,7), (2,8), (2,9), (3,6), (3,7), (3,8), (3,9), (4,6), (4,7), (4,8), (4,9))
    
    scala> rdd1.cartesian(rdd2).getNumPartitions
    res11: Int = 25
    
    scala> rdd1.zip(rdd2).collect
    res12: Array[(Long, Long)] = Array((1,6), (2,7), (3,8), (4,9))
    
    scala> rdd1.zip(rdd2).getNumPartitions
    res13: Int = 5
    
    scala> val rdd2 = sc.range(1,12)
    rdd2: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[71] at range at <console>:24
    
    scala> rdd1.zip(rdd2)
    res14: org.apache.spark.rdd.RDD[(Long, Long)] = ZippedPartitionsRDD2[72] at zip at <console>:28
    
    scala> rdd1.zip(rdd2).collect
    20/10/23 14:17:58 WARN TaskSetManager: Lost task 2.0 in stage 30.0 (TID 182, 172.17.178.97, executor 0): org.apache.spark.SparkException: Can only zip RDDs with same number of elements in each partition
      at org.apache.spark.rdd.RDD$$anon$2.hasNext(RDD.scala:914)
      at scala.collection.Iterator.foreach(Iterator.scala:941)
      at scala.collection.Iterator.foreach$(Iterator.scala:941)
      at org.apache.spark.rdd.RDD$$anon$2.foreach(RDD.scala:910)
      at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
      at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
      at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
      at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
      at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
      at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
      at org.apache.spark.rdd.RDD$$anon$2.to(RDD.scala:910)
      at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
      at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
      at org.apache.spark.rdd.RDD$$anon$2.toBuffer(RDD.scala:910)
      at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
      at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
      at org.apache.spark.rdd.RDD$$anon$2.toArray(RDD.scala:910)
      at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:990)
      at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2101)
      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
      at org.apache.spark.scheduler.Task.run(Task.scala:123)
      at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
      at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      at java.lang.Thread.run(Thread.java:748)
    
    

    备注:

    • union是窄依赖:得到的RDD分区数为两个RDD分区数之和
    • cartesian是宽依赖:得到的RDD分区数为两个分区数之积。慎用

    Action

    Action是用来触发RDD的计算,得到相关计算结果;

    Action触发job。一个Spark程序(Driver程序)包含了多少Action算子,那么就有多少个Job
    典型的Action算子:collect(将数据聚合到一起)、count(统计RDD中数据的数量)
    每个action的调用链路,例如:collect()
    collect() => sc.runJob() => ... => dagScheduler.runJob() => 触发了Job
    要求:能快速的区分Transformation、Action
    

    源码:

    /**
    * Return an array that contains all of the elements in this RDD.
    *
    * @note This method should only be used if the resulting array is expected to be small, as
    * all the data is loaded into the driver's memory.
    */
    def collect(): Array[T] = withScope {
      val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
      Array.concat(results: _*)
    }
    .....
    
    /**
    * Run an action job on the given RDD and pass all the results to the resultHandler function as
    * they arrive.
    *
    * @param rdd target RDD to run tasks on
    * @param func a function to run on each partition of the RDD
    * @param partitions set of partitions to run on; some jobs may not want to compute on all
    *   partitions of the target RDD, e.g. for operations like first()
    * @param callSite where in the user program this job was called
    * @param resultHandler callback to pass each result to
    * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
    *
    * @note Throws `Exception` when the job fails
    */
    def runJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): Unit = {
      val start = System.nanoTime
      val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
      ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
      waiter.completionFuture.value.get match {
        case scala.util.Success(_) =>
        logInfo("Job %d finished: %s, took %f s".format
                (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
        case scala.util.Failure(exception) =>
        logInfo("Job %d failed: %s, took %f s".format
                (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
        // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
        val callerStackTrace = Thread.currentThread().getStackTrace.tail
        exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
        throw exception
      }
    }
    

    collect/collectAsMap()

    stats/count/mean/stdev/max/min

    reduce(func)/fold(func)/aggregate(func)

    ![fold And Aggregate](图片/fold And Aggregate.png)

    first():取RDD的第一个元素

    take(n):取RDD的前n个元素

    top(n):按照默认或者指定排序规则,返回前n个元素

    takeSample(withReplacement, num, [seed]):返回采样的数据

    foreach(func) / foreachPartition(func):与map、mapPartitions类似,区别是 foreach 是 Action

    saveAsTextFile(path) / saveAsSequenceFile(path) / saveAsObjectFile(path)

    # stats:返回统计信息的,只能作用于RDD[Double]类型上调用
    scala> val rdd1 = sc.range(1,101)
    rdd1: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[1] at range at <console>:24
    
    scala> rdd1.stats
    res0: org.apache.spark.util.StatCounter = (count: 100, mean: 50.500000, stdev: 28.866070, max: 100.000000, min: 1.000000)
    
    scala> val rdd2 =sc.range(1,101)
    rdd2: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[5] at range at <console>:24
    # 不能调用
    scala> rdd1.zip(rdd2).stats
    <console>:28: error: value stats is not a member of org.apache.spark.rdd.RDD[(Long, Long)]
           rdd1.zip(rdd2).stats
                          ^
    # count在各种类型的RDD上,均能调用
    scala> rdd1.zip(rdd2).count
    res2: Long = 100
    # 获取第一个元素
    scala> rdd1.zip(rdd2).first
    res3: (Long, Long) = (1,1)
    # 获取前十个元素
    scala> rdd1.zip(rdd2).take(10)
    res4: Array[(Long, Long)] = Array((1,1), (2,2), (3,3), (4,4), (5,5), (6,6), (7,7), (8,8), (9,9), (10,10))
    # 对rdd1中的元素求和
    scala> rdd1.reduce(_ +_)
    res5: Long = 5050
    
    scala> rdd1.fold(0)(_+_)
    res6: Long = 5050
    
    scala> rdd1.fold(1)(_+_)
    res7: Long = 5053
    
    scala> rdd1.getNumPartitions
    res8: Int = 2
    
    scala> val rdd = sc.makeRDD(1 to 10,2)
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:24
    
    scala> rdd.getNumPartitions
    res0: Int = 2
    
    scala> rdd.reduce(_ + _)
    res1: Int = 55
    
    scala> rdd.reduce((x,y) =>{
         | println(s"x:$x,y:$y")
         | x+y
         | })
    x:6,y:7
    x:13,y:8
    x:21,y:9
    x:30,y:10
    x:1,y:2
    x:3,y:3
    x:6,y:4
    x:10,y:5
    x:40,y:15
    res2: Int = 55
    
    scala> rdd.fold(1)(_+_)
    res3: Int = 58
    #fold(初始值)(局部汇总和全局汇总要用到的函数)
    scala> rdd.fold(1)((x,y) => {
         | println(s"x:$x,y:$y")
         | x + y
         | })
    x:1,y:1
    x:2,y:2
    x:4,y:3
    x:7,y:4
    x:11,y:5
    x:1,y:6
    x:7,y:7
    x:14,y:8
    x:22,y:9
    x:31,y:10
    x:1,y:16
    x:17,y:41
    res4: Int = 58
    # aggregate(初始值)((局部汇总的函数),(全局汇总的函数))
    scala> rdd.aggregate(1)(_+_,_+_)
    res5: Int = 58
    
    scala> rdd.aggregate(1)((x,y) =>{
         |  println(s"x:$x,y:$y")
         | x+y
         | }
         | ,
         | (a,b) => {
         | println(s"a:$a,b:$b")
         | a+b
         | })
    x:1,y:1
    x:2,y:2
    x:4,y:3
    x:7,y:4
    x:11,y:5
    a:1,b:16
    x:1,y:6
    x:7,y:7
    x:14,y:8
    x:22,y:9
    x:31,y:10
    a:17,b:41
    res6: Int = 58
    
    Action.png

    first、take(n)、top(n)

    scala> val rdd = sc.range(1,101)
    rdd: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[1] at range at <console>:24
    #取出第一个元素
    scala> rdd.first
    res0: Long = 1
    # 取出前十个元素
    scala> rdd.take(10)
    res1: Array[Long] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    #倒序后,取出前20个元素
    scala> rdd.top(20)
    res3: Array[Long] = Array(100, 99, 98, 97, 96, 95, 94, 93, 92, 91, 90, 89, 88, 87, 86, 85, 84, 83, 82, 81)
    
    scala> val random = scala.util.Random
    random: util.Random.type = scala.util.Random$@34f9275f
    
    scala> var rdd1 = (1 to 100).map(x => random.nextInt(200))
    rdd1: scala.collection.immutable.IndexedSeq[Int] = Vector(100, 97, 145, 12, 156, 82, 70, 67, 84, 152, 39, 10, 164, 138, 174, 176, 182, 111, 171, 129, 191, 145, 190, 114, 177, 193, 139, 163, 134, 188, 193, 14, 91, 66, 143, 91, 6, 39, 51, 86, 62, 179, 121, 16, 94, 76, 116, 193, 85, 155, 108, 176, 69, 92, 20, 121, 173, 94, 150, 93, 86, 59, 62, 40, 30, 162, 64, 43, 156, 176, 6, 41, 154, 53, 186, 47, 88, 10, 21, 181, 74, 112, 153, 1, 56, 11, 17, 42, 94, 182, 171, 194, 23, 16, 167, 165, 123, 133, 71, 198)
    
    scala> val rdd = sc.makeRDD(rdd1)
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at makeRDD at <console>:26
    
    scala> rdd.take(10)
    res5: Array[Int] = Array(100, 97, 145, 12, 156, 82, 70, 67, 84, 152)
    
    scala> rdd.top(10)
    res6: Array[Int] = Array(198, 194, 193, 193, 193, 191, 190, 188, 186, 182)
    #采样,第一个参数为是否返回,第二个位获取几个数据,第三个位随机算子
    scala> rdd.takeSample(true,10)
    res7: Array[Int] = Array(84, 143, 56, 129, 163, 11, 190, 176, 177, 12)
    
    scala> rdd.takeSample(true,40)
    res8: Array[Int] = Array(134, 167, 86, 94, 12, 177, 69, 139, 53, 30, 163, 181, 177, 94, 176, 134, 150, 30, 16, 133, 193, 121, 67, 163, 76, 182, 145, 16, 93, 91, 10, 123, 171, 163, 39, 111, 181, 10, 94, 150)
    
    scala> rdd.takeSample(false,40)
    res9: Array[Int] = Array(134, 121, 86, 108, 198, 111, 171, 6, 30, 155, 121, 116, 171, 176, 153, 145, 114, 1, 43, 74, 66, 56, 41, 177, 156, 47, 191, 88, 53, 179, 64, 176, 138, 129, 84, 194, 123, 70, 163, 93)
    

    foreach(func)

    scala> rdd.foreach(_+1)
    
    scala> rdd.collect
    res11: Array[Int] = Array(100, 97, 145, 12, 156, 82, 70, 67, 84, 152, 39, 10, 164, 138, 174, 176, 182, 111, 171, 129, 191, 145, 190, 114, 177, 193, 139, 163, 134, 188, 193, 14, 91, 66, 143, 91, 6, 39, 51, 86, 62, 179, 121, 16, 94, 76, 116, 193, 85, 155, 108, 176, 69, 92, 20, 121, 173, 94, 150, 93, 86, 59, 62, 40, 30, 162, 64, 43, 156, 176, 6, 41, 154, 53, 186, 47, 88, 10, 21, 181, 74, 112, 153, 1, 56, 11, 17, 42, 94, 182, 171, 194, 23, 16, 167, 165, 123, 133, 71, 198)
    
    scala> val rdd = sc.makeRDD(1 to 10)
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at makeRDD at <console>:24
    
    scala> rdd.getNumPartitions
    res12: Int = 2
    
    scala> rdd.foreach(x => {
         | println(x+1)
         | }
         | )
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    
    

    foreachPartition(func)

    scala> val rdd = sc.makeRDD(1 to 10)
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:24
    
    scala> rdd.foreachPartition
       def foreachPartition(f: Iterator[Int] => Unit): Unit
    
    scala>rdd.foreachPartition(iter => iter.foreach(println(_)))
    [Stage 0:>                                                          (0 + 2) / 2]
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    

    saveAsTextFile

    scala> rdd.collect
    res1: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    
    scala> rdd.getNumPartitions
    res2: Int = 2
    #存放到HDFS中,一个分区就存一个文件,容易出现小文件问题(t1目录下就是两个文件)
    scala> rdd.saveAsTextFile("/spark-test/t1")
    #存放到HDFS中,一个分区就存一个文件,容易出现小文件问题(t1目录下就是一个文件)
    scala> rdd.coalesce(1).saveAsTextFile("/spark-test/t2")
    

    Key-Value RDD操作

    RDD整体上分为Value类型和Key-Value类型,前面介绍的是Value类型RDD操作,实际使用更多的是key-value类型的RDD,也被称为pariRDD。value类型RDD的操作基本集中在RDD.scala中,key-value类型的RDD操作集中在PairRDDFunctions.scala中

    Key-ValueRDD操作1.png

    前面介绍的大多数算子对Pair RDD都是有效的,Pair RDD还有属于自己的Transformation、Action算子

    创建PairRDD
    scala> val rdd = sc.makeRDD(1 to 10)
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:24
    
    scala> rdd.collect
    res0: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    
    scala> val arr = (1 to 10).toArray
    arr: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    
    scala> val arr1 = arr.map(x =>(x,x * 10,x* 100))
    arr1: Array[(Int, Int, Int)] = Array((1,10,100), (2,20,200), (3,30,300), (4,40,400), (5,50,500), (6,60,600), (7,70,700), (8,80,800), (9,90,900), (10,100,1000))
    # rdd不是 Pari RDD
    scala> val rdd = sc.makeRDD(arr1)
    rdd: org.apache.spark.rdd.RDD[(Int, Int, Int)] = ParallelCollectionRDD[1] at makeRDD at <console>:26
    
    scala> rdd.first
    res1: (Int, Int, Int) = (1,10,100)
    # res2 是Pari RDD
    scala> rdd.map(x => (x._1,(x._2,x._3))).take(3)
    res2: Array[(Int, (Int, Int))] = Array((1,(10,100)), (2,(20,200)), (3,(30,300)))
    
    scala> rdd.map(x => (x._1,(x._2,x._3))).collectAsMap
    res4: scala.collection.Map[Int,(Int, Int)] = Map(8 -> (80,800), 2 -> (20,200), 5 -> (50,500), 4 -> (40,400), 7 -> (70,700), 10 -> (100,1000), 1 -> (10,100), 9 -> (90,900), 3 -> (30,300), 6 -> (60,600))
    
    
    Transformation操作

    类似Map操作

    mapValues/flatMapValues/keys/values,这些操作都可以使用map操作实现,是简化操作。

    mapValues:直接操作map的values

    flatMapValues:操作完values值后,在将值拉平

    scala> val a = sc.parallelize(List((1,2),(3,4),(5,6)))
    a: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[4] at parallelize at <console>:24
    
    scala> a.collect
    res5: Array[(Int, Int)] = Array((1,2), (3,4), (5,6))
    
    scala> a.collectAsMap
    res6: scala.collection.Map[Int,Int] = Map(5 -> 6, 1 -> 2, 3 -> 4)
    
    scala> rdd.map(x => (x._1,(x._2,x._3))).collect
    res8: Array[(Int, (Int, Int))] = Array((1,(10,100)), (2,(20,200)), (3,(30,300)), (4,(40,400)), (5,(50,500)), (6,(60,600)), (7,(70,700)), (8,(80,800)), (9,(90,900)), (10,(100,1000)))
    
    scala> val b = a.mapValues(x => ( 1 to x))
    b: org.apache.spark.rdd.RDD[(Int, scala.collection.immutable.Range.Inclusive)] = MapPartitionsRDD[6] at mapValues at <console>:25
    
    scala> b.collect
    res9: Array[(Int, scala.collection.immutable.Range.Inclusive)] = Array((1,Range 1 to 2), (3,Range 1 to 4), (5,Range 1 to 6))
    
    scala> val c = a.map(x => (x._1,1 to x._2))
    c: org.apache.spark.rdd.RDD[(Int, scala.collection.immutable.Range.Inclusive)] = MapPartitionsRDD[7] at map at <console>:25
    
    scala> c.collect
    res10: Array[(Int, scala.collection.immutable.Range.Inclusive)] = Array((1,Range 1 to 2), (3,Range 1 to 4), (5,Range 1 to 6))
    
    scala> val d = a.map{case(k,v) => (k,1 to v)}
    d: org.apache.spark.rdd.RDD[(Int, scala.collection.immutable.Range.Inclusive)] = MapPartitionsRDD[8] at map at <console>:25
    
    scala> d.collect
    res11: Array[(Int, scala.collection.immutable.Range.Inclusive)] = Array((1,Range 1 to 2), (3,Range 1 to 4), (5,Range 1 to 6))
    
    scala> val e = a.flatMapValues(x => 1 to x)
    e: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[9] at flatMapValues at <console>:25
    
    scala> e.collect
    res12: Array[(Int, Int)] = Array((1,1), (1,2), (3,1), (3,2), (3,3), (3,4), (5,1), (5,2), (5,3), (5,4), (5,5), (5,6))
    
    scala> val f = a.map(x => (x._1,1 to x._2)).collect
    f: Array[(Int, scala.collection.immutable.Range.Inclusive)] = Array((1,Range 1 to 2), (3,Range 1 to 4), (5,Range 1 to 6))
    
    
    scala> val f = a.map(x => (x._1,1 to x._2)).flatMap{case (k ,v) => v.map(elem => (k,elem))}
    f: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[12] at flatMap at <console>:25
    
    scala> f.collect
    res13: Array[(Int, Int)] = Array((1,1), (1,2), (3,1), (3,2), (3,3), (3,4), (5,1), (5,2), (5,3), (5,4), (5,5), (5,6))
    
    scala> f.keys.collect
    res14: Array[Int] = Array(1, 1, 3, 3, 3, 3, 5, 5, 5, 5, 5, 5)
    
    scala> f.values.collect
    res15: Array[Int] = Array(1, 2, 1, 2, 3, 4, 1, 2, 3, 4, 5, 6)
    
    scala> f.map{case(k,y) => k}
    res16: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[15] at map at <console>:26
    
    scala> f.map{case(k,y) => k}.collect
    res17: Array[Int] = Array(1, 1, 3, 3, 3, 3, 5, 5, 5, 5, 5, 5)
    
    scala> f.map{case(k,v) => v}.collect
    res19: Array[Int] = Array(1, 2, 1, 2, 3, 4, 1, 2, 3, 4, 5, 6)
    
    scala> f.map(x=> x._1).collect
    res20: Array[Int] = Array(1, 1, 3, 3, 3, 3, 5, 5, 5, 5, 5, 5)
    
    scala> f.map{case(k,_) => k}
    res21: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[19] at map at <console>:26
    
    scala> f.map{case(k,_) => k}.collect
    res23: Array[Int] = Array(1, 1, 3, 3, 3, 3, 5, 5, 5, 5, 5, 5)
    
    聚合操作

    PariRDD(k,v)使用范围广,聚合

    groupByKey/reduceByKey/foldByKey/aggregateByKey

    combineByKey(OLD)/combineByKeyWithClassTag(NEW) => 底层的实现

    subtractByKey:类似subtract,删掉RDD中健与other RDD中的健相同的元素

    小案例:给定一组数据:("spark", 12), ("hadoop", 26), ("hadoop", 23), ("spark", 15), ("scala", 26), ("spark", 25), ("spark", 23), ("hadoop", 16), ("scala", 24), ("spark", 16), 键值对的key表示图书名称,value表示某天图书销量。计算每个键对应的平均 值,也就是计算每种图书的每天平均销量。

    scala> val rdd = sc.makeRDD(Array(("spark", 12), ("hadoop", 26), ("hadoop", 23), ("spark", 15), ("scala", 26), ("spark", 25), ("spark", 23), ("hadoop", 16), ("scala", 24), ("spark", 16)))
    rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at makeRDD at <console>:24
    # groupByKey
    scala> rdd.groupByKey
    res0: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[1] at groupByKey at <console>:26
    
    scala> rdd.groupByKey.collect
    res1: Array[(String, Iterable[Int])] = Array((scala,CompactBuffer(26, 24)), (spark,CompactBuffer(12, 15, 25, 23, 16)), (hadoop,CompactBuffer(26, 23, 16)))
    
    scala> rdd.groupByKey.mapValues(x => (x.toArray.sum/x.size))
    res5: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[6] at mapValues at <console>:26
    
    scala> rdd.groupByKey.mapValues(x => (x.toArray.sum/x.size)).collect
    res6: Array[(String, Int)] = Array((scala,25), (spark,18), (hadoop,21))
    
    scala>  rdd.groupByKey.map(x =>(x._1,x._2.sum) )
    res7: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[10] at map at <console>:26
    
    scala>  rdd.groupByKey.map(x =>(x._1,x._2.sum / x._2.size) )
    res8: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[12] at map at <console>:26
    
    scala>  rdd.groupByKey.map(x =>(x._1,x._2.sum / x._2.size) ).collect
    res9: Array[(String, Int)] = Array((scala,25), (spark,18), (hadoop,21))
    
    scala>  rdd.groupByKey.map(x =>(x._1,x._2.sum.toDouble / x._2.size) ).collect
    res10: Array[(String, Double)] = Array((scala,25.0), (spark,18.2), (hadoop,21.666666666666668))
    
    scala> rdd.groupByKey.map{case (k,v) => (k,v.sum * 1.0 / v.size)}
    res11: org.apache.spark.rdd.RDD[(String, Double)] = MapPartitionsRDD[18] at map at <console>:26
    
    scala> rdd.groupByKey.map{case (k,v) => (k,v.sum * 1.0 / v.size)}.collect
    res12: Array[(String, Double)] = Array((scala,25.0), (spark,18.2), (hadoop,21.666666666666668))
    
    scala> rdd.groupByKey.mapValues(v => v.sum * 1.0 / v.size)
    res13: org.apache.spark.rdd.RDD[(String, Double)] = MapPartitionsRDD[22] at mapValues at <console>:26
    
    scala> rdd.groupByKey.mapValues(v => v.sum * 1.0 / v.size).collect
    res14: Array[(String, Double)] = Array((scala,25.0), (spark,18.2), (hadoop,21.666666666666668))
    
    scala> rdd.reduceByKey(_+_).collect
    res16: Array[(String, Int)] = Array((scala,50), (spark,91), (hadoop,65))
    
    scala> rdd.mapValues(x => (x,1)).collect
    res0: Array[(String, (Int, Int))] = Array((spark,(12,1)), (hadoop,(26,1)), (hadoop,(23,1)), (spark,(15,1)), (scala,(26,1)), (spark,(25,1)), (spark,(23,1)), (hadoop,(16,1)), (scala,(24,1)), (spark,(16,1)))
    
    # reduceByKey
    scala> rdd.mapValues(x => (x,1)).reduceByKey((x,y) =>(x._1 + y._1,y._1 + x._1))
    res1: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[3] at reduceByKey at <console>:26
    
    scala> rdd.mapValues(x => (x,1)).reduceByKey((x,y) =>(x._1 + y._1,y._1 + x._1)).collect
    res2: Array[(String, (Int, Int))] = Array((scala,(50,50)), (spark,(91,91)), (hadoop,(65,65)))
    
    scala> rdd.mapValues(x => (x,1)).reduceByKey((x,y) =>(x._1 + y._1,y._2 + x._2)).collect
    res3: Array[(String, (Int, Int))] = Array((scala,(50,2)), (spark,(91,5)), (hadoop,(65,3)))
    
    scala> rdd.mapValues(x => (x,1)).reduceByKey((x,y) =>(x._1 + y._1,y._2 + x._2)).mapValues(v => v._1.toDouble / v._2)
    res4: org.apache.spark.rdd.RDD[(String, Double)] = MapPartitionsRDD[10] at mapValues at <console>:26
    
    scala> rdd.mapValues(x => (x,1)).reduceByKey((x,y) =>(x._1 + y._1,y._2 + x._2)).mapValues(v => v._1.toDouble / v._2).collect
    res5: Array[(String, Double)] = Array((scala,25.0), (spark,18.2), (hadoop,21.666666666666668))
    
    scala> rdd.mapValues(x =>(x,1)).collect
    res6: Array[(String, (Int, Int))] = Array((spark,(12,1)), (hadoop,(26,1)), (hadoop,(23,1)), (spark,(15,1)), (scala,(26,1)), (spark,(25,1)), (spark,(23,1)), (hadoop,(16,1)), (scala,(24,1)), (spark,(16,1)))
    #foldByKey
    scala> rdd.mapValues(x => (x,1)).foldByKey((0,0))((x,y) =>(x._1 + y._1,y._2 + x._2)).mapValues(v => v._1.toDouble / v._2).collect
    res10: Array[(String, Double)] = Array((scala,25.0), (spark,18.2), (hadoop,21.666666666666668))
    

    aggregateByKey => 定义初值 + 分区内的聚合函数+分区间的聚合函数

    rdd.mapValues(x => (x,1)).aggregateByKey((0,0))((x,y) =>(x._1 + y._1,y._2 + x._2),(a,b) =>(a._1 + b._1,a._2 + b._2)).mapValues(v => v._1.toDouble / v._2).collect
    
    ## 初值(这里是元组)可以与RDD元素类型(Int)可以不一致,此时的x就是(0,0)元组,y就是RDD的元素(Int)
    scala> rdd.aggregateByKey((0,0))(
         | (x,y) =>(x._1 + y,x._2 + 1),
         | (a,b) => (a._1 + b._1 ,a._2 + b._2))
    res29: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[30] at aggregateByKey at <console>:26
    
    scala> rdd.aggregateByKey((0,0))(
         | (x,y) =>(x._1 + y,x._2 + 1),
         | (a,b) => (a._1 + b._1 ,a._2 + b._2)).collect
    res30: Array[(String, (Int, Int))] = Array((scala,(50,2)), (spark,(91,5)), (hadoop,(65,3)))
    
    scala> rdd.aggregateByKey((0,0))(
         | (x,y) =>(x._1 + y,x._2 + 1),
         | (a,b) => (a._1 + b._1 ,a._2 + b._2)).mapValues(x => (x._1.toD))
    toDegrees   toDouble
         | (a,b) => (a._1 + b._1 ,a._2 + b._2)).mapValues(x => (x._1.toDouble / x._2))
    res31: org.apache.spark.rdd.RDD[(String, Double)] = MapPartitionsRDD[33] at mapValues at <console>:28
    
    scala> rdd.aggregateByKey((0,0))(
         | (x,y) =>(x._1 + y,x._2 + 1),
         | (a,b) => (a._1 + b._1 ,a._2 + b._2)).mapValues(x => (x._1.toDouble / x._2)).collect
    res32: Array[(String, Double)] = Array((scala,25.0), (spark,18.2), (hadoop,21.666666666666668))
    
    scala> rdd.aggregateByKey(scala.collection.mutable.ArrayBuffer[Int]())((x,y) => {x.append(y);x},(a,b) => a++b).collect
    res34: Array[(String, scala.collection.mutable.ArrayBuffer[Int])] = Array((scala,ArrayBuffer(26, 24)), (spark,ArrayBuffer(12, 15, 25, 23, 16)), (hadoop,ArrayBuffer(26, 23, 16)))
    
    scala> rdd.aggregateByKey(scala.collection.mutable.ArrayBuffer[Int]())((x,y) => {x.append(y);x},(a,b) => a++b).mapValues(x => (x.sum.toDouble / x.size)).collect
    res36: Array[(String, Double)] = Array((scala,25.0), (spark,18.2), (hadoop,21.666666666666668))
    
    
    scala> rdd.aggregateByKey((0,0))((x,y) => (x._1 + y,x._2 + 1),(a,b) => (a._1 + b._1,a._2 + b._2)).collect
    res37: Array[(String, (Int, Int))] = Array((scala,(50,2)), (spark,(91,5)), (hadoop,(65,3)))
    
    
    
    # 分区内的合并和分区间的合并,可以采用不同的方式
    scala> rdd.aggregateByKey(scala.collection.mutable.ArrayBuffer[Int]())((x,y) => {x.append(y);x},(a,b) => a++b).mapValues(x => (x.sum.toDouble / x.size)).collect
    res36: Array[(String, Double)] = Array((scala,25.0), (spark,18.2), (hadoop,21.666666666666668))
    # 此时x就是(0,0)元组,y就是rdd的元素(int)
    scala> rdd.aggregateByKey((0,0))((x,y) => (x._1 + y,x._2 + 1),(a,b) => (a._1 + b._1,a._2 + b._2)).collect
    res37: Array[(String, (Int, Int))] = Array((scala,(50,2)), (spark,(91,5)), (hadoop,(65,3)))
    
    # combineByKey
    scala> rdd.combineByKey(
         | (x:Int) => (x,1), # 初始值,相当于上面做的一个map(x => (x,1))的一个操作
         | (x:(Int,Int),y:Int)=>(x._1 + y, x._2 +1), # 分区内的聚合
         # 分区间的聚合
         | (a:(Int,Int),b:(Int,Int)) => (a._1 + b._1,a._2 + b._2)).collect
    res40: Array[(String, (Int, Int))] = Array((scala,(50,2)), (spark,(91,5)), (hadoop,(65,3)))
    
    ## subtractByKey
    scala> val rdd1 = sc.makeRDD(Array(("spark", 12), ("hadoop", 26), ("hadoop", 23), ("spark", 15)))
    rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[41] at makeRDD at <console>:24
    
    scala> val rdd2 = sc.makeRDD(Array(("spark", 100), ("hadoop", 300)))
    rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[42] at makeRDD at <console>:24
    
    scala> rdd1.subtractByKey(rdd2).collect
    res42: Array[(String, Int)] = Array()
    
    scala> val rdd = sc.makeRDD(Array(("a",1), ("b",2), ("c",3), ("a",5), ("d",5)))
    rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[45] at makeRDD at <console>:24
                                                                          ^
    scala> val other = sc.makeRDD(Array(("a",10), ("b",20), ("c",30)))
    other: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[46] at makeRDD at <console>:24
    
    scala> rdd.subtractByKey(other).collect
    res44: Array[(String, Int)] = Array((d,5))
    
    

    结论:效率相等的情况下,使用最熟悉的方法;groupByKey一般情况下,效率低,尽量少用。

    初学者考虑实现,如果使用的groupByKey,寻找代替的算子实现。

    为什么groupByKey的效率低
    为什么groupByKey的效率低.png

    ReduceByKey 和 groupByKey的相同点和不同点:

    相同点:

    • 都作用于RDD[K,V]
    • 都是根据key来进行分组聚合
    • 默认分区数量不变,但是可以通过参数指定分区数量

    不同点:

    • groupByKey没有默认的聚合函数,得到的返回值类型是RDD[k,Iterable[v]]
    • reduceByKey必须传聚合函数,返回值类型是RDD[k,聚合后的V]
    • groupByKey.map() = reduceByKey
    排序操作

    sortByKey:sortByKey作用于PariRDD函数,对Key进行排序,在org.apache.spark.rdd.OrderedRDDFunctions 中实现:

    排序操作.png
    scala> val a = sc.makeRDD(List("wyp","spark","hadoop","123321","hive"))
    a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at makeRDD at <console>:24                        ^
    
    scala> val b = sc.makeRDD(1 to a.count.toInt)
    b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at <console>:26
    
    scala> val c = a.zip(b)
    c: org.apache.spark.rdd.RDD[(String, Int)] = ZippedPartitionsRDD2[2] at zip at <console>:27
    
    scala> c.collect
    res1: Array[(String, Int)] = Array((wyp,1), (spark,2), (hadoop,3), (123321,4), (hive,5))
    
    scala> c.sortByKey().collect
    res3: Array[(String, Int)] = Array((123321,4), (hadoop,3), (hive,5), (spark,2), (wyp,1))
    
    scala> c.sortByKey(false).collect
    res4: Array[(String, Int)] = Array((wyp,1), (spark,2), (hive,5), (hadoop,3), (123321,4))
    
    join操作

    cogroup/join/leftOuterJoin/rightOuterJoin/fullOuterJoin

    源码:

      /**
       * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
       * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
       * (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD.
       */
      def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope {
        this.cogroup(other, partitioner).flatMapValues( pair =>
          for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
        )
      }
    

    练习:

    scala> val rdd1 = sc.makeRDD(Array((1,"Spark"), (2,"Hadoop"), (3,"Kylin"), (4,"Flink")))
    rdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[0] at makeRDD at <console>:24
    
    scala> val rdd2 = sc.makeRDD(Array((3,"李四"), (4,"王五"), (5,"赵六"), (6,"冯七")))
    rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[1] at makeRDD at <console>:24
    
    #cogroup
    scala> val rdd3 = rdd1.cogroup(rdd2)
    rdd3: org.apache.spark.rdd.RDD[(Int, (Iterable[String], Iterable[String]))] = MapPartitionsRDD[3] at cogroup at <console>:27
    
    scala> rdd3.collect
    res0: Array[(Int, (Iterable[String], Iterable[String]))] = Array((4,(CompactBuffer(Flink),CompactBuffer(王五))), (6,(CompactBuffer(),CompactBuffer(冯七))), (2,(CompactBuffer(Hadoop),CompactBuffer())), (1,(CompactBuffer(Spark),CompactBuffer())), (3,(CompactBuffer(Kylin),CompactBuffer(李四))), (5,(CompactBuffer(),CompactBuffer(赵六))))
    
    scala> rdd3.foreach(println)
    (4,(CompactBuffer(Flink),CompactBuffer(王五)))
    (6,(CompactBuffer(),CompactBuffer(冯七)))
    (2,(CompactBuffer(Hadoop),CompactBuffer()))
    (1,(CompactBuffer(Spark),CompactBuffer()))
    (3,(CompactBuffer(Kylin),CompactBuffer(李四)))
    (5,(CompactBuffer(),CompactBuffer(赵六)))
    # leftOuterJoin
    scala> rdd1.leftOuterJoin(rdd2).collect
    res2: Array[(Int, (String, Option[String]))] = Array((4,(Flink,Some(王五))), (2,(Hadoop,None)), (1,(Spark,None)), (3,(Kylin,Some(李四))))
    # rightOuterJoin
    scala> rdd1.rightOuterJoin(rdd2).collect
    res3: Array[(Int, (Option[String], String))] = Array((4,(Some(Flink),王五)), (6,(None,冯七)), (3,(Some(Kylin),李四)), (5,(None,赵六)))
    # fullOuterJoin
    scala> rdd1.fullOuterJoin(rdd2).collect
    res4: Array[(Int, (Option[String], Option[String]))] = Array((4,(Some(Flink),Some(王五))), (6,(None,Some(冯七))), (2,(Some(Hadoop),None)), (1,(Some(Spark),None)), (3,(Some(Kylin),Some(李四))), (5,(None,Some(赵六))))
    # join
    scala> rdd1.join(rdd2).collect
    res5: Array[(Int, (String, String))] = Array((4,(Flink,王五)), (3,(Kylin,李四)))
    # join 实际上调用的就是cogroup,
    scala> rdd1.cogroup(rdd2).flatMapValues( pair =>
         |       for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
         |     )
    res8: org.apache.spark.rdd.RDD[(Int, (String, String))] = MapPartitionsRDD[18] at flatMapValues at <console>:28
    
    

    测试上面的那个for循环代码:

    def main(args: Array[String]): Unit = {
    
      val arrayBuffer = scala.collection.mutable.Map[Int, (ArrayBuffer[String], ArrayBuffer[String])]()
      //    (4,(CompactBuffer(Flink),CompactBuffer(王五)))
      //    (6,(CompactBuffer(),CompactBuffer(冯七)))
      //    (2,(CompactBuffer(Hadoop),CompactBuffer()))
      //    (1,(CompactBuffer(Spark),CompactBuffer()))
      //    (3,(CompactBuffer(Kylin),CompactBuffer(李四)))
      //    (5,(CompactBuffer(),CompactBuffer(赵六)))
      arrayBuffer(4) = (ArrayBuffer("Flink"), ArrayBuffer("王五"))
      arrayBuffer(6) = (ArrayBuffer(), ArrayBuffer("冯七"))
      arrayBuffer(2) = (ArrayBuffer("Hadoop"), ArrayBuffer())
      arrayBuffer(1) = (ArrayBuffer("Spark"), ArrayBuffer())
      arrayBuffer(3) = (ArrayBuffer("Kylin"), ArrayBuffer("李四"))
      arrayBuffer(5) = (ArrayBuffer(), ArrayBuffer("赵六"))
      val intToTuples = arrayBuffer.mapValues(v => for (i <- v._1; j <- v._2)
                                              yield (i, j)
                                             )
      println(arrayBuffer)
      println("*" * 15)
      println(intToTuples)
    }
    

    输出:

    Map(2 -> (ArrayBuffer(Hadoop),ArrayBuffer()), 5 -> (ArrayBuffer(),ArrayBuffer(赵六)), 4 -> (ArrayBuffer(Flink),ArrayBuffer(王五)), 1 -> (ArrayBuffer(Spark),ArrayBuffer()), 3 -> (ArrayBuffer(Kylin),ArrayBuffer(李四)), 6 -> (ArrayBuffer(),ArrayBuffer(冯七)))
    ***************
    Map(2 -> ArrayBuffer(), 5 -> ArrayBuffer(), 4 -> ArrayBuffer((Flink,王五)), 1 -> ArrayBuffer(), 3 -> ArrayBuffer((Kylin,李四)), 6 -> ArrayBuffer())
    
    然后再使用下面的输出去过,进行flatMap
    
    Action操作

    collectAsMap、countByKey、lookup(key)

    collectAsMap:

    scala> val rdd1 = sc.makeRDD(Array((1,"Spark"), (2,"Hadoop"), (3,"Kylin"), (4,"Flink")))
    rdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[0] at makeRDD at <console>:24
    
    scala> val rdd2 = sc.makeRDD(Array((3,"李四"), (4,"王五"), (5,"赵六"), (6,"冯七")))
    rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[1] at makeRDD at <console>:24
    
    scala> val rdd3 = rdd1.join(rdd2)
    rdd3: org.apache.spark.rdd.RDD[(Int, (String, String))] = MapPartitionsRDD[4] at join at <console>:27
    
    scala> rdd3.collectAsMap
    res0: scala.collection.Map[Int,(String, String)] = Map(4 -> (Flink,王五), 3 -> (Kylin,李四))
    

    countByKey

    源码:

     /**
       * Count the number of elements for each key, collecting the results to a local Map.
       *
       * @note This method should only be used if the resulting map is expected to be small, as
       * the whole thing is loaded into the driver's memory.
       * To handle very large results, consider using rdd.mapValues(_ => 1L).reduceByKey(_ + _), which
       * returns an RDD[T, Long] instead of a map.
       */
      def countByKey(): Map[K, Long] = self.withScope {
        //将每个key对应的value设置为1,然后对相同的key进行聚合
        self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap
      }
    

    演示

    scala> rdd3.countByKey
    res2: scala.collection.Map[Int,Long] = Map(4 -> 1, 3 -> 1)
    

    lookup(key):高效的查找方法,只查找对应分区的数据(如果RDD有分区器的话)

    源码:

    /**
    * Return the list of values in the RDD for key `key`. This operation is done efficiently if the
    * RDD has a known partitioner by only searching the partition that the key maps to.
    * 如果改rdd有分区器,该方法会效率高,因为只查询一个分区,否则会查询所有的元素
    */
    def lookup(key: K): Seq[V] = self.withScope {
      self.partitioner match {
        case Some(p) =>
        val index = p.getPartition(key)
        val process = (it: Iterator[(K, V)]) => {
          val buf = new ArrayBuffer[V]
          for (pair <- it if pair._1 == key) {
            buf += pair._2
          }
          buf
        } : Seq[V]
        val res = self.context.runJob(self, process, Array(index))
        res(0)
        case None =>
        self.filter(_._1 == key).map(_._2).collect()
      }
    }
    

    演示:

    scala> val rdd1 = sc.makeRDD(Array(("1","Spark"),("2","Hadoop"), ("3","Scala"),("1","Java")))
    rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[7] at makeRDD at <console>:24
    
    scala> rdd1.lookup("1")
    res3: Seq[String] = WrappedArray(Spark, Java)
    

    输入与输出

    文件输入与输出

    文本文件

    数据读取:textFi le(String) 可指定单个文件,支持通配符。但是这样对大量的小文件读取销量并不高,应该使用wholeTextFiles

    /**
    * Read a directory of text files from HDFS, a local file system (available on all nodes), or any
    * Hadoop-supported file system URI. Each file is read as a single record and returned in a
    * key-value pair, where the key is the path of each file, the value is the content of each file.
    *
    * <p> For example, if you have the following files:
    * {{{
    *   hdfs://a-hdfs-path/part-00000
    *   hdfs://a-hdfs-path/part-00001
    *   ...
    *   hdfs://a-hdfs-path/part-nnnnn
    * }}}
    *
    * Do `val rdd = sparkContext.wholeTextFile("hdfs://a-hdfs-path")`,
    *
    * <p> then `rdd` contains
    * {{{
    *   (a-hdfs-path/part-00000, its content)
    *   (a-hdfs-path/part-00001, its content)
    *   ...
    *   (a-hdfs-path/part-nnnnn, its content)
    * }}}
    *
    * @note Small files are preferred, large file is also allowable, but may cause bad performance.
    * @note On some filesystems, `.../path/&#42;` can be a more efficient way to read all files
    *       in a directory rather than `.../path/` or `.../path`
    * @note Partitioning is determined by data locality. This may result in too few partitions
    *       by default.
    *
    * @param path Directory to the input data files, the path can be comma separated paths as the
    *             list of inputs.
    * @param minPartitions A suggestion value of the minimal splitting number for input data.
    * @return RDD representing tuples of file path and the corresponding file content
    */
    def wholeTextFiles(
        path: String,
        minPartitions: Int = defaultMinPartitions): RDD[(String, String)] = withScope {
      assertNotStopped()
      val job = NewHadoopJob.getInstance(hadoopConfiguration)
      // Use setInputPaths so that wholeTextFiles aligns with hadoopFile/textFile in taking
      // comma separated files as input. (see SPARK-7155)
      NewFileInputFormat.setInputPaths(job, path)
      val updateConf = job.getConfiguration
      new WholeTextFileRDD(
        this,
        classOf[WholeTextFileInputFormat],
        classOf[Text],
        classOf[Text],
        updateConf,
        minPartitions).map(record => (record._1.toString, record._2.toString)).setName(path)
    }
    

    返回值RDD[(String,String)],其中key是文件名称,value是文件内容。

    数据保存:saveAsTextFile(String).指定的输出目录

    CSV文件

    读取CSV(Comma-Separated Values)/TSV(Tab-Separated Values)数据和读取json数据相似,都需要先把文件当作普通文本来读取数据,然后通过每一行进行解析,实现对CSV的读取。

    CSV/TSV数据的输出也是需要将结构化RDD通过相关的库转换成字符串RDD。然后使用Spark的文本文件API写出去。

    json文件

    如果一个JSON文件一行就是一个JSON记录,那么可以通过将JSON文件党对文本文件来读取,然后利用相关的JSON库对每一条数据进行JSON解析。

    JSON数据的输出主要是通过输出之前将结构化数据组成的RDD转为字符串RDD,然后使用Spark文本文件API写出去

    JSON文件的处理使用SparkSQL最为简洁

    SequenceFile

    SequenceFile文件是Hadoop用来存储二进制形式的key-value对而设计的一种平面文件(Flat File)。Spark有专门用来读取SequenceFile的接口,在SparkContext中,可以调用sequenceFile[keyClass,valueClass];

    调用saveAsSequenceFile(Path)保存PairRDD,系统将键和值自动转为Writable类型

    对象文件

    对象文件是将对象序列化后保存的文件,采用Java的序列化机制

    通过objectFile[k,v](path)接受一个路径,读取对象文件,返回对应的RDD,也可以个通过调用saveAsObjectFile[]实现对对象文件的输出,因为是序列化所以要指定类型

    JDBC

    见综合案例

    算子综合应用案例

    WordCount ----- Scala

    备注:打包上传服务器运行

    package com.hhb.spark.core
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
     * @description:
     * @author: huanghongbo
     * @date: 2020-10-26 21:03
     **/
    object WordCountTest {
    
      def main(args: Array[String]): Unit = {
    
        //1 创建sc
        //    val sparkConf = new SparkConf().setMaster("local").setAppName("WordCountTest")
        //打包到服务器时,不需要有master
        val sparkConf = new SparkConf().setAppName("WordCountTest")
        val sc = new SparkContext(sparkConf)
    
        //2 读取文件
        //    val lines = sc.textFile("/Users/baiwang/myproject/spark/data/wc.dat")
        //动态的,不能写死
        val lines = sc.textFile(args(0))
    
        //3 数据转换
        //将每行数据展开
        val words = lines.flatMap(line => line.split("\\s+"))
        val wordMap = words.map(x => (x, 1))
        val result: RDD[(String, Int)] = wordMap.reduceByKey(_ + _)
    
        //4  输出结果
        result.foreach(println(_))
    
        //5  关闭sc
        sc.stop()
    
       // 打包到集群运行,local模式
    //    spark-submit --master local[*] --class com.hhb.spark.core.WordCountTest \
    //    original-ParseDateWork.jar /azkaban-wc/wc.txt
        //明天要试一下打包到到standalone模式
        
         //Yarn
        // spark-submit --master yarn --class cn.lagou.sparkcore.WordCount \
        // original-LagouBigData-1.0-SNAPSHOT.jar /wcinput/*
      }
    }
    

    WordCount ---- Java

    Spark提供了:Scala、Java、Python、R语言的API; 对 Scala 和 Java 语言的支持最好;

    wordcount-java.png
    package com.hhb.java.core;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import scala.Tuple2;
    
    import java.util.Arrays;
    
    /**
     * @description:
     * @author: huanghongbo
     * @date: 2020-10-26 21:31
     **/
    public class JavaWordCount {
    
        public static void main(String[] args) {
    
            //创建sc
            SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount").setMaster("local");
            JavaSparkContext sc = new JavaSparkContext(sparkConf);
            //读取文件
            JavaRDD<String> lines = sc.textFile("/Users/baiwang/myproject/spark/data/wc.dat");
            //转换数据
            JavaRDD<String> words = lines.flatMap(line -> Arrays.stream(line.split("\\s+")).iterator());
            JavaPairRDD<String, Integer> wordMap = words.mapToPair(word -> new Tuple2<>(word, 1));
            JavaPairRDD<String, Integer> result = wordMap.reduceByKey((x, y) -> x + y);
    
            // 输出结果
            result.foreach(r -> System.err.println(r));
            //关闭sc
            sc.stop();
    
        }
    }
    

    备注:

    • Spark入口点:JavaSparkContext
    • Value-RDD:JavaRDD,key-value RDD:JavaPairRDD
    • JavaRDD和JavaPairRDD转化
      • JavaRDD => JavaPairRDD: 通过mapToPair函数
      • JavaPairRDD => JavaRDD: 通过map函数转化
    • lambda表达式使用 ->

    计算圆周率

    使用蒙特卡洛思想

    蒙特卡洛.png
    package com.hhb.spark.core
    
    import org.apache.spark.{SparkConf, SparkContext}
    import scala.math.random
    
    /**
     * @description:
     * @author: huanghongbo
     * @date: 2020-10-26 22:00
     **/
    object SparkPi {
    
      /**
       * 计算圆周率,使用蒙特卡洛法 : 4 /pi = N / n => pi = 4*n/N
       *
       * @param args
       */
      def main(args: Array[String]): Unit = {
    
        //1. 创建sc
        val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName.init))
        //2. 读取数据
        val N = 10000000
        val slices: Int = if (args.length > 0) args(0).toInt else 10
    
        val m = sc.makeRDD(1 to N, slices)
          .map(_ => {
            val (x, y) = (random, random)
            if (x * x + y * y <= 1) 1 else 0
          })
        //3. 转换数据
        val n = m.reduce(_ + _)
        //4. 输出
        println(s"pi : ${4.0 * n / N}")
        // 关闭sc
        sc.stop()
    
    
      }
    
    }
    

    广告数据统计

    数据格式:timestamp province city userid adid 时间点,省份,城市,用户,广告

    需求:

    1. 统计每一个省份点击TOP3的广告
    2. 统计每一个省份每一个小时的Top3的广告
    统计每一个省份点击TOP3的广告
    package com.hhb.spark.core
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
     * @description: 需求1:统计每个省份点击 TOP3 的广告ID
     * @author: huanghongbo
     * @date: 2020-10-26 22:20
     **/
    object AdstatTest1 {
    
      def main(args: Array[String]): Unit = {
    
        //1. 创建sc
        val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName.init))
        //2. 读取数据
        val lines: RDD[String] = sc.textFile("/Users/baiwang/myproject/spark/data/advert.log")
        //3. 转换数据
        // 字段:时间、省份、城市、用户、广告
        val lineRDD: RDD[((String, String), Int)] = lines.map {
          line => {
            val lineArray: Array[String] = line.split("\\s+")
            ((lineArray(1), lineArray(4)), 1)
          }
        }
        //(Henan,(5,2189))
        //(Hebei,(7,2250))
        //(Henan,(0,2237))
        //(Jiangsu,(1,2166))
        //(Henan,(7,2151))
        //(Hebei,(8,2240))
        //(Hunan,(7,2132))
        //(Hunan,(0,2162))
        //(Hubei,(7,2150))
        val priRDD: RDD[(String, (String, Int))] = lineRDD.reduceByKey(_ + _).map {
          case ((a, b), c) => (a, (b, c))
        }
        //    (Hunan,CompactBuffer((7,2132), (0,2162), (4,2140), (8,2189), (2,2193), (9,2122), (3,2157), (1,2202), (6,2082), (5,2273)))
        priRDD.groupByKey()
          //(Hunan,List((5,2273), (1,2202), (2,2193), (8,2189), (0,2162), (3,2157), (4,2140), (7,2132), (9,2122), (6,2082)))
          //对每个省的数据取所有的value,转换成list后排序,取前三个
          .mapValues(buf => buf.toList.sortWith(_._2 > _._2).take(3).map {
            case (x, y) => x
          }.mkString(":")).collect().foreach(println(_))
    
        //4. 输出
    
        // 关闭sc
        sc.stop()
    
      }
    }
    
    统计每一个省份每一个小时的Top3的广告
    package com.hhb.spark.core
    
    import java.time.{LocalDateTime, ZoneOffset}
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
     * @description: 统计每一个省份每一个小时的 TOP3广告ID 
     * @author: huanghongbo
     * @date: 2020-10-27 13:35
     **/
    object AdstatTest2 {
    
      /**
       * 数据格式:时间点 省份 城市 用户 广告
       *
       * @param args
       */
      def main(args: Array[String]): Unit = {
        //创建sc
        val sc = new SparkContext(new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]"))
    
        //读取数据
        val lines: RDD[String] = sc.textFile("/Users/baiwang/myproject/spark/data/advert.log")
    
        //数据转换
        val wordRDD: RDD[((Int, String, String), Int)] = lines.map(line => {
          val words = line.split("\\s+")
          ((LocalDateTime.ofEpochSecond(words(0).toLong / 1000, 0, ZoneOffset.ofHours(8)).getHour, words(1), words(4)), 1)
        })
        //如果key进行分区后聚合
        wordRDD.reduceByKey(_ + _)
          //进行数据格式转换
          .map { case ((a, b, c), d) => ((a, b), (c, d)) }
          //将相同的key进行聚合
          .groupByKey()
          //将根据value的第二个元素进行排序,然后去前三个
          .mapValues(buf => buf.toList.sortWith(_._2 > _._2).take(3))
          .collect().foreach(println(_))
        //关闭sc
        sc.stop()
      }
    }
    
    总结
    总结.png

    如果将上面的两个都合并到一个main中,读取完数据lines后,先计算第一个需求,然后再使用lines计算第二个需求,那么是读取几次数据文件呢?

    回答:两次

    找共同好友

    Super WordCount

    要求:将单词全部转换为小写,去除标点符号(难),去除停用词(难);最后按照 count 值降序保存到文件,同时将全部结果保存到MySQL(难);标点符号和停用词可 以自定义。

    停用词:语言中包含很多功能词。与其他词相比,功能词没有什么实际含义。最普遍 的功能词是限定词,介词(on、in、to、from、 over等)、代词、数量词等。

    Array[(String, Int)] => scala jdbc => MySQL

    第一个版本

    package com.hhb.spark.core
    
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
     * @description:
     * @author: huanghongbo
     * @date: 2020-10-27 17:41
     **/
    object SuperWordCount {
    
      def main(args: Array[String]): Unit = {
        val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName.init))
    
        val lines = sc.textFile("/Users/baiwang/myproject/spark/data/swc.dat")
        val list = "and of see the to a in".split("\\s+")
    
    
        val p = """[()\\?\\.,:;'’”“!\\? ]"""
        lines.flatMap(_.split("\\s+"))
          .map(word => {
            word.toLowerCase()
              .replaceAll(p, "")
          }).filter(word => word.trim.length > 0 && !list.contains(word))
          .map((_, 1))
          .reduceByKey(_ + _)
          .sortBy(_._2, false)
          .collect().foreach(println(_))
      }
    }
    

    第二个版本:

    package com.hhb.spark.core
    
    import java.sql.{Connection, DriverManager, PreparedStatement}
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
     * @description:
     * @author: huanghongbo
     * @date: 2020-10-29 10:46
     **/
    object SuperWordCount1 {
    
      private val p = """[()\\?\\.,:;'’”“!\\? ]"""
      private val list = "and of see the to a in".split("\\s+")
    
      private val url = "jdbc:mysql://linux123:3306/ebiz?useUnicode=true&characterEncoding=utf-8&useSSL=false"
      private val userName = "hive"
      private val password = "12345678"
      private val sql = "insert  into test (wort,total) values(?,?);"
    
      def main(args: Array[String]): Unit = {
    
        val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName.init))
        val lines: RDD[String] = sc.textFile("/Users/baiwang/myproject/spark/data/swc.dat")
        lines.flatMap(_.split("\\s+"))
          .map(x => {
            x.toLowerCase.replaceAll(p, "")
          })
          .filter(word => word.trim.length > 0 && !list.contains(word))
          .map((_, 1))
          .reduceByKey(_ + _)
          .sortBy(_._2, false)
          .foreach { case (k, v) => {
            var conn: Connection = null
            var st: PreparedStatement = null
            try {
              conn = DriverManager.getConnection(url, userName, password)
              st = conn.prepareStatement(sql)
              st.setString(1, k)
              st.setInt(2, v)
              st.executeUpdate()
            } catch {
              case e: Exception => e.printStackTrace()
            } finally {
              if (st != null) st.close()
              if (conn != null) conn.close()
            }
          }
          }
      }
    }
    

    scala链接JDBC

    package com.hhb.spark.core
    
    import java.sql.{Connection, DriverManager, PreparedStatement}
    
    /**
     * @description:
     * @author: huanghongbo
     * @date: 2020-10-28 21:01
     **/
    object JDBCDemo {
    
      def main(args: Array[String]): Unit = {
    
        val list = "a b c d e f g".split("\\s+").zipWithIndex
        list.foreach(println(_))
        val url = "jdbc:mysql://linux123:3306/ebiz?useUnicode=true&characterEncoding=utf-8&useSSL=false"
        val userName = "hive"
        val password = "12345678"
        val sql = "insert  into test (wort,total) values(?,?);"
    
        var conn: Connection = null
        var st: PreparedStatement = null
    
        try {
          conn = DriverManager.getConnection(url, userName, password)
          st = conn.prepareStatement(sql)
          list.foreach(w => {
            st.setString(1, w._1)
            st.setInt(2, w._2)
            st.executeUpdate()
          })
        } catch {
          case e: Exception => e.printStackTrace()
        } finally {
          if (st != null) st.close()
          if (conn != null) conn.close()
        }
    
      }
    }
    

    pom

    <!-- JDBC -->
    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>5.1.44</version>
    </dependency>
    

    第三个版本

    package com.hhb.spark.core
    
    import java.sql.{Connection, DriverManager, PreparedStatement}
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
     * @description:
     * @author: huanghongbo
     * @date: 2020-10-29 10:46
     **/
    object SuperWordCount2 {
    
      private val p = """[()\\?\\.,:;'’”“!\\? ]"""
      private val list = "and of see the to a in".split("\\s+")
    
      private val url = "jdbc:mysql://linux123:3306/ebiz?useUnicode=true&characterEncoding=utf-8&useSSL=false"
      private val userName = "hive"
      private val password = "12345678"
      private val sql = "insert  into test (wort,total) values(?,?);"
      private var count = 0;
    
      def main(args: Array[String]): Unit = {
    
        val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName.init))
        val lines: RDD[String] = sc.textFile("/Users/baiwang/myproject/spark/data/swc.dat")
        lines.flatMap(_.split("\\s+"))
          .map(x => {
            x.toLowerCase.replaceAll(p, "")
          })
          .filter(word => word.trim.length > 0 && !list.contains(word))
          .map((_, 1))
          .reduceByKey(_ + _)
          .sortBy(_._2, false)
          .foreachPartition(iter => {
            var conn: Connection = null
            var st: PreparedStatement = null
            conn = DriverManager.getConnection(url, userName, password)
            st = conn.prepareStatement(sql)
            count += 1
            println(count)
            try {
              iter.foreach { case (k, v) => {
                st.setString(1, k)
                st.setInt(2, v)
                st.executeUpdate()
              }
              }
            } catch {
              case e: Exception => e.printStackTrace()
            }
            finally {
              if (st != null) st.close()
              if (conn != null) conn.close()
            }
          }
          )
      }
    }
    

    第四个版本

    package com.hhb.spark.core
    
    import java.sql.{Connection, DriverManager, PreparedStatement}
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
     * @description:
     * @author: huanghongbo
     * @date: 2020-10-29 10:46
     **/
    object SuperWordCount3 {
    
      private val p = """[()\\?\\.,:;'’”“!\\? ]"""
      private val list = "and of see the to a in".split("\\s+")
    
      private val url = "jdbc:mysql://linux123:3306/ebiz?useUnicode=true&characterEncoding=utf-8&useSSL=false"
      private val userName = "hive"
      private val password = "12345678"
      private val sql = "insert  into test (wort,total) values(?,?);"
      private var count = 0;
    
      def main(args: Array[String]): Unit = {
    
        val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName.init))
        val lines: RDD[String] = sc.textFile("/Users/baiwang/myproject/spark/data/swc.dat")
        val result = lines.flatMap(_.split("\\s+"))
          .map(x => {
            x.toLowerCase.replaceAll(p, "")
          })
          .filter(word => word.trim.length > 0 && !list.contains(word))
          .map((_, 1))
          .reduceByKey(_ + _)
          .sortBy(_._2, false)
        result.saveAsTextFile("/Users/baiwang/myproject/spark/data/super")
    
        result.foreachPartition(iter => insert(iter))
      }
    
      def insert(iter: Iterator[(String, Int)]): Unit = {
        var conn: Connection = null
        var st: PreparedStatement = null
        conn = DriverManager.getConnection(url, userName, password)
        st = conn.prepareStatement(sql)
        count += 1
        println(count)
        try {
          iter.foreach { case (k, v) => {
            st.setString(1, k)
            st.setInt(2, v)
            st.executeUpdate()
          }
          }
        } catch {
          case e: Exception => e.printStackTrace()
        }
        finally {
          if (st != null) st.close()
          if (conn != null) conn.close()
        }
      }
    }
    

    总结:最终优化版本使用foreachPartition 代替 foreach

    备注:

    • SparkSQL有方便读写MySQL的方法,给参数直接调用即可
    • 但是掌握以上方法非常有必要,因为SparkSQL不支持所有类型的数据库

    相关文章

      网友评论

          本文标题:Spark Core - 编程基础

          本文链接:https://www.haomeiwen.com/subject/mqiscltx.html