美文网首页
Spark-RDD算子

Spark-RDD算子

作者: 布莱安托 | 来源:发表于2020-07-06 19:58 被阅读0次

    map

    • 说明
      通过指定变换函数将原有RDD中的元素逐个进行变换,并返回一个新的RDD
    • 示例
    val rdd = sc.parallelize(1 to 5)
    val mapRdd = rdd.map(_ * 2)
    mapRdd.collect.foreach(println)
    
    • 结果

    2
    4
    6
    8
    10

    mapPartitions

    • 说明
      与map类似,但独立的运行在RDD的每个分区上。通过分区数据集迭代器的方式,对整个分区进行处理。
    • 示例
    val rdd = sc.parallelize(1 to 5)
    // mapPartitions由于是对整个分区进行计算,所以减少了与Executor的交互次数,提高了效率
    // 但当分区数据较大时,可能或出现内存溢出(OOM)
    val mapPartitionsRdd = rdd.mapPartitions(iter => {
        iter.map(_ * 2)
    })
    mapPartitionsRdd.collect.foreach(println)
    
    • 结果

    2
    4
    6
    8
    10

    mapPartitionsWithIndex(func)

    • 说明
      类似于mapPartitions,但变换函数带有一个表示分区的整形参数
    • 示例
    val rdd = sc.parallelize(1 to 5)
    val mapPartitionsWithIndexRdd = rdd.mapPartitionsWithIndex((index,iter) => {
       iter.map((index, _)) 
    });
    mapPartitionsWithIndexRdd.collect.foreach(println)
    
    • 结果

    (1, 1)
    (1, 2)
    (2, 3)
    (2, 4)
    (3, 5)

    flapMap(func)

    • 说明
      类似map,但每一个输入元素可以被映射称为0个或多个输出元素
    • 示例
    val rdd = sc.parallelize(Array(List(1,2), List(3,4)))
    val flatMapRdd = rdd.flatMap(x => x)
    flatMapRdd.collect.foreach(println)
    
    • 结果

    1
    2
    3
    4

    glom

    • 说明
      将每一个分区形成一个数组
    • 示例
    val rdd = sc.parallelize(1 to 12, 4)
    val glomRdd = rdd.glom
    glomRdd.collect.foreach(println(_.mkString(",")))
    
    • 结果

    1,2,3
    4,5,6
    7,8,9
    10,11,12

    groupBy(func)

    • 说明
      按照传入函数的返回值进行分组,将相同键的对应值放入用一个迭代器
    • 示例
    // 按照指定规则进行分组,分组后的数据为一个对偶元组
    //key表示分组键,value表示分组数据
    val rdd = sc.parallelize(1 to 4)
    val groupByRdd = rdd.groupBy(_ % 2)
    groupByRdd.collectr.foreach(println)
    
    • 结果

    (0, CompactBuffer(2,4))
    (1, CompactBuffer(1,3))

    filter(func)

    • 说明
      根据指定规则进行过滤,将输入经过func计算,返回计算结果为true的值
    • 示例
    val rdd = sc.parallelize(1 to 4)
    val filterRdd = rdd.filter(_ % 2 == 0)
    filterRdd.collect.foreach(println)
    
    • 结果

    2
    4

    sample(withReplacement, fraction, seed)

    • 说明
      以指定的随机种子进行抽样比例为fraction的随机抽样,withReplacement表示抽样是否有放回,seed为随机数生成器种子
    • 示例
    val rdd = sc.parallelize(1 to 10)
    val sampleRdd = rdd.sample(false, 0.4)
    sampleRdd.collect().foreach(println)
    
    • 结果

    1
    2
    6
    8

    distinct([numPartitions])

    • 说明
      对源RDD进行去重操作,默认只能执行8个并行任务,也可以通过numTasks进行指定
    • 示例
    val rdd = sc.parallelize(Array(1,2,1,3,4,2,3,1))
    val distinctRdd = rdd.distinct
    distinctRdd.collect.foreach(println)
    
    • 结果

    1
    2
    3
    4

    coalesce(numPartitions)

    • 说明
      缩减分区数,用于大数据集过滤后提高小数据集的执行效率
    • 示例
    val rdd = sc.parallelize(1 to 12, 4)
    System.out.println("before coalesce: " + rdd.partitions.size)
    val coalesceRdd = rdd.coalesce(2)
    System.out.println("after coalesce: " + coalesceRdd.partitions.size)
    
    • 结果

    before coalesce: 4
    after coalesce: 2

    repartition(numPartitions)

    • 说明
      根据指定的分区数,对数据进行重新分区
    • 示例
    val rdd = sc.parallelize(1 to 12, 4)
    System.out.println("before repartition: " + rdd.partitions.size)
    val repartitionRdd = rdd.repartition(6)
    System.out.println("after repartition: " + repartitionRdd.partitions.size)
    
    • 结果

    before repartition: 4
    after repartition: 6

    [注]

    coalesce重新分区,可以选择是否进行shuffle,由参数shuffle: Boolean = true/false决定
    事实上,repartition是调用的coalesce方法,并且将shuffle设置为true

    sortBy(func, [asceding], [numPartitions])

    • 说明
      使用func函数对数据处理后,对数据进行排序,默认是正序排序
    • 示例
    val rdd = sc.parallelize(Array(3,1,4,2))
    val sortByRdd = rdd.sortBy( x => x )
    sortByRdd.collect.foreach(println)
    
    • 结果

    1
    2
    3
    4

    union(otherDataset)

    • 说明
      将两个RDD求并集并返回一个新的RDD
    • 示例
    val rdd1 = sc.parallelize(1 to 5)
    val rdd2 = sc.parallelize(6 to 10)
    val unionRdd = rdd1.union(rdd2)
    unionRdd.collect.foreach(println)
    
    • 结果

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10

    cartesian(otherDataset)

    • 说明
      对两个RDD求笛卡尔积
    • 示例
    val rdd1 = sc.parallelize(1 to 3)
    val rdd2 = sc.parallelize(2 to 5)
    val cartesianRdd = rdd1.cartesian(rdd2)
    cartesianRdd.collect.foreach(println)
    
    • 结果

    (1,2)
    (1,3)
    (1,4)
    (1,5)
    (2,2)
    (2,3)
    (2,4)
    (2,5)
    (3,2)
    (3,3)
    (3,4)
    (3,5)

    zip(otherDataset)

    • 说明
      将两个RDD组合成为KV类型的RDD,默认两个RDD分区数以及元素数量相同,否则会报错
    • 示例
    val rdd1 = sc.parallelize(Array(1,2,3), 3)
    val rdd2 = sc.parallelize(Array("a","b","c"), 3)
    val zipRdd = rdd1.zip(rdd2)
    zipRdd.collect.foreach(println)
    
    • 结果

    (1,a)
    (2,b)
    (3,c)

    partitionBy

    • 说明
      根据指定的分区器对RDD进行分区
    • 示例
    val rdd = sc.parallelize(Array((1,"a"), (2,"b"), (3,"c"), (4, "d")), 4)
    val partitionByRdd = rdd.partitionBy(new HashPartitioner(2))
    partitionByRdd.collect.foreach(println)
    
    • 结果

    Array((2, "b"), (4, "d"))
    Array((1, "a"), (3, "c"))

    groupByKey

    • 说明
      按照key进行分组
    • 示例
    val rdd = sc.parallelize(Array("a", "a", "b", "c", "c", "c"))
    val pairRdd = rdd.map((_, 1))
    val groupRdd = pairRdd.groupByKey()
    groupRdd.collect.foreach(println)
    
    • 结果

    ("a", CompactBuffer(1, 1))
    ("b", CompactBuffer(1))
    ("c", CompactBuffer(1, 1, 1))

    reduceByKey(func, [numPartitions])

    • 说明
      通过制定函数对相同key的值进行聚合
    • 示例
    val rdd = sc.parallelize(Array("a", "a", "b", "c", "c", "c"))
    val pairRdd = rdd.map((_, 1))
    val groupRdd = pairRdd.groupByKey()
    val reduceRdd = groupRdd.reduceByKey(_ + _)
    reduceRdd.collect.foreach(println)
    
    • 结果

    ("a", 2)
    ("b", 1)
    ("c", 3)

    aggregateByKey(zeroValue)(seqOp, combOp)

    • 说明
      将各个分区内的数据将zeroValue作为初始值的情况下,先通过seqOp函数进行分区内聚合,在通过combOp函数将各分区的聚合数据进行合并聚合
    • 示例
    val rdd = sc.parallelize(Array(("a", 3), ("a", 2), ("c", 4), ("b", 3), ("c", 6), ("c", 8)), 2)
    val aggrRdd = rdd.aggregateByKey(0)(Maths.max(_, _), _ + _)
    aggrRdd.collect.foreach(println)
    
    • 结果

    ("b", 3)
    ("a", 3)
    ("c", 12)

    foldByKey(zeroValue)(func)

    • 说明
      与aggregateByKey功能相似,seqOp与combOp相同
    • 示例
    val rdd = sc.parallelize(Array(("a", 3), ("a", 2), ("c", 4), ("b", 3), ("c", 6), ("c", 8)), 2)
    val foldRdd = rdd.foldByKey(0)(_ + _)
    foldRdd.collect.foreach(println)
    
    • 结果

    ("b", 3)
    ("a", 5)
    ("c", 18)

    combineByKey(createCombiner, mergeValue, mergeCombiner)

    • 说明

      遍历分区内全部key,如果是第一次遍历到该key则通过createCombiner函数创建初始值,遍历后通过mergeValue函数对分区内key相同的值进行聚合,最后通过mergeCombiner函数将各个分区的聚合结果,按照key进行聚合

    • 示例

    // 例:计算每个key的平均值
    val rdd = sc.parallelize(Array(("a", 3), ("a", 2), ("c", 4), ("b", 3), ("c", 6), ("c", 8)), 2)
    val combineRdd = rdd.combineByKey((_, 1), (a, v) => (a._1 + v,  a._2 + 1), (a1, a2) => (a1._1 + a2._1, a1._2 + a2._2))
    val avgRdd = combineRdd.map((k, v) => (k, v._1 / v._2))
    avg.Rdd.collect.foreach(println)
    
    • 结果

    ("a", 2.5)

    ("b", 4.5)

    ("c", 9)

    sortByKey([ascending], [numPartitions])

    • 说明

      按照key值进行排序,其中key的类型必须实现Ordered接口

    • 示例

    val rdd = sc.parallelize(Array((4, "d"), (1,"a"), (2,"b"), (3,"c")))
    val sortRdd = rdd.sortByKey(true)
    sortRdd.collect.foreach(println)
    
    • 结果

    (1,"a")

    (2,"b")

    (3,"c")

    (4, "d")

    mapValues

    • 说明

      对kv对RDD中的value进行map操作

    • 示例

    val rdd = sc.parallelize(Array((4, "d"), (1,"a"), (2,"b"), (3,"c")))
    val upperRdd = rdd.mapValues(_.toUpperCase)
    upperRdd.collect.foreach(println)
    
    • 结果

    (1,"A")

    (2,"B")

    (3,"C")

    (4, "D")

    join(otherDataset)

    • 说明

      将两个RDD具有相同key的元素进行组合

    • 示例

    val rdd1 = sc.parallelize(Array((4, "d"), (1,"a"), (2,"b"), (3,"c")))
    val rdd2 = sc.parallelize(Array((4, 3), (2, 1), (3, 10), (1, 5)))
    val joinRdd = rdd1.join(rdd2)
    joinRdd.collect.foreach(println)
    
    • 结果

    (1, ("a", 5))

    (2, ("b", 1))

    (3, ("c", 10))

    (4, ("d", 3))

    cogroup(otherDataset, [numPartitions])

    • 说明

      将两个KV类型RDD组合,将value形成集合返回

    • 示例

    val rdd1 = sc.parallelize(Array((4, "d"), (1,"a"), (2,"b"), (3,"c")))
    val rdd2 = sc.parallelize(Array((4, 3), (3, 10), (1, 5)))
    val cogroupRdd = rdd1.cogroup(rdd2)
    cogroupRdd.collect.foreach(println)
    
    • 结果

    (1, (Compact("a"), Compact(5)))

    (2, (Compact("b"), Compact()))

    (3, (Compact("c"), Compact(10)))

    (4, (Compact("d"), Compact(3)))

    相关文章

      网友评论

          本文标题:Spark-RDD算子

          本文链接:https://www.haomeiwen.com/subject/mwbgqktx.html