美文网首页
Spark-RDD算子

Spark-RDD算子

作者: 布莱安托 | 来源:发表于2020-07-06 19:58 被阅读0次

map

  • 说明
    通过指定变换函数将原有RDD中的元素逐个进行变换,并返回一个新的RDD
  • 示例
val rdd = sc.parallelize(1 to 5)
val mapRdd = rdd.map(_ * 2)
mapRdd.collect.foreach(println)
  • 结果

2
4
6
8
10

mapPartitions

  • 说明
    与map类似,但独立的运行在RDD的每个分区上。通过分区数据集迭代器的方式,对整个分区进行处理。
  • 示例
val rdd = sc.parallelize(1 to 5)
// mapPartitions由于是对整个分区进行计算,所以减少了与Executor的交互次数,提高了效率
// 但当分区数据较大时,可能或出现内存溢出(OOM)
val mapPartitionsRdd = rdd.mapPartitions(iter => {
    iter.map(_ * 2)
})
mapPartitionsRdd.collect.foreach(println)
  • 结果

2
4
6
8
10

mapPartitionsWithIndex(func)

  • 说明
    类似于mapPartitions,但变换函数带有一个表示分区的整形参数
  • 示例
val rdd = sc.parallelize(1 to 5)
val mapPartitionsWithIndexRdd = rdd.mapPartitionsWithIndex((index,iter) => {
   iter.map((index, _)) 
});
mapPartitionsWithIndexRdd.collect.foreach(println)
  • 结果

(1, 1)
(1, 2)
(2, 3)
(2, 4)
(3, 5)

flapMap(func)

  • 说明
    类似map,但每一个输入元素可以被映射称为0个或多个输出元素
  • 示例
val rdd = sc.parallelize(Array(List(1,2), List(3,4)))
val flatMapRdd = rdd.flatMap(x => x)
flatMapRdd.collect.foreach(println)
  • 结果

1
2
3
4

glom

  • 说明
    将每一个分区形成一个数组
  • 示例
val rdd = sc.parallelize(1 to 12, 4)
val glomRdd = rdd.glom
glomRdd.collect.foreach(println(_.mkString(",")))
  • 结果

1,2,3
4,5,6
7,8,9
10,11,12

groupBy(func)

  • 说明
    按照传入函数的返回值进行分组,将相同键的对应值放入用一个迭代器
  • 示例
// 按照指定规则进行分组,分组后的数据为一个对偶元组
//key表示分组键,value表示分组数据
val rdd = sc.parallelize(1 to 4)
val groupByRdd = rdd.groupBy(_ % 2)
groupByRdd.collectr.foreach(println)
  • 结果

(0, CompactBuffer(2,4))
(1, CompactBuffer(1,3))

filter(func)

  • 说明
    根据指定规则进行过滤,将输入经过func计算,返回计算结果为true的值
  • 示例
val rdd = sc.parallelize(1 to 4)
val filterRdd = rdd.filter(_ % 2 == 0)
filterRdd.collect.foreach(println)
  • 结果

2
4

sample(withReplacement, fraction, seed)

  • 说明
    以指定的随机种子进行抽样比例为fraction的随机抽样,withReplacement表示抽样是否有放回,seed为随机数生成器种子
  • 示例
val rdd = sc.parallelize(1 to 10)
val sampleRdd = rdd.sample(false, 0.4)
sampleRdd.collect().foreach(println)
  • 结果

1
2
6
8

distinct([numPartitions])

  • 说明
    对源RDD进行去重操作,默认只能执行8个并行任务,也可以通过numTasks进行指定
  • 示例
val rdd = sc.parallelize(Array(1,2,1,3,4,2,3,1))
val distinctRdd = rdd.distinct
distinctRdd.collect.foreach(println)
  • 结果

1
2
3
4

coalesce(numPartitions)

  • 说明
    缩减分区数,用于大数据集过滤后提高小数据集的执行效率
  • 示例
val rdd = sc.parallelize(1 to 12, 4)
System.out.println("before coalesce: " + rdd.partitions.size)
val coalesceRdd = rdd.coalesce(2)
System.out.println("after coalesce: " + coalesceRdd.partitions.size)
  • 结果

before coalesce: 4
after coalesce: 2

repartition(numPartitions)

  • 说明
    根据指定的分区数,对数据进行重新分区
  • 示例
val rdd = sc.parallelize(1 to 12, 4)
System.out.println("before repartition: " + rdd.partitions.size)
val repartitionRdd = rdd.repartition(6)
System.out.println("after repartition: " + repartitionRdd.partitions.size)
  • 结果

before repartition: 4
after repartition: 6

[注]

coalesce重新分区,可以选择是否进行shuffle,由参数shuffle: Boolean = true/false决定
事实上,repartition是调用的coalesce方法,并且将shuffle设置为true

sortBy(func, [asceding], [numPartitions])

  • 说明
    使用func函数对数据处理后,对数据进行排序,默认是正序排序
  • 示例
val rdd = sc.parallelize(Array(3,1,4,2))
val sortByRdd = rdd.sortBy( x => x )
sortByRdd.collect.foreach(println)
  • 结果

1
2
3
4

union(otherDataset)

  • 说明
    将两个RDD求并集并返回一个新的RDD
  • 示例
val rdd1 = sc.parallelize(1 to 5)
val rdd2 = sc.parallelize(6 to 10)
val unionRdd = rdd1.union(rdd2)
unionRdd.collect.foreach(println)
  • 结果

1
2
3
4
5
6
7
8
9
10

cartesian(otherDataset)

  • 说明
    对两个RDD求笛卡尔积
  • 示例
val rdd1 = sc.parallelize(1 to 3)
val rdd2 = sc.parallelize(2 to 5)
val cartesianRdd = rdd1.cartesian(rdd2)
cartesianRdd.collect.foreach(println)
  • 结果

(1,2)
(1,3)
(1,4)
(1,5)
(2,2)
(2,3)
(2,4)
(2,5)
(3,2)
(3,3)
(3,4)
(3,5)

zip(otherDataset)

  • 说明
    将两个RDD组合成为KV类型的RDD,默认两个RDD分区数以及元素数量相同,否则会报错
  • 示例
val rdd1 = sc.parallelize(Array(1,2,3), 3)
val rdd2 = sc.parallelize(Array("a","b","c"), 3)
val zipRdd = rdd1.zip(rdd2)
zipRdd.collect.foreach(println)
  • 结果

(1,a)
(2,b)
(3,c)

partitionBy

  • 说明
    根据指定的分区器对RDD进行分区
  • 示例
val rdd = sc.parallelize(Array((1,"a"), (2,"b"), (3,"c"), (4, "d")), 4)
val partitionByRdd = rdd.partitionBy(new HashPartitioner(2))
partitionByRdd.collect.foreach(println)
  • 结果

Array((2, "b"), (4, "d"))
Array((1, "a"), (3, "c"))

groupByKey

  • 说明
    按照key进行分组
  • 示例
val rdd = sc.parallelize(Array("a", "a", "b", "c", "c", "c"))
val pairRdd = rdd.map((_, 1))
val groupRdd = pairRdd.groupByKey()
groupRdd.collect.foreach(println)
  • 结果

("a", CompactBuffer(1, 1))
("b", CompactBuffer(1))
("c", CompactBuffer(1, 1, 1))

reduceByKey(func, [numPartitions])

  • 说明
    通过制定函数对相同key的值进行聚合
  • 示例
val rdd = sc.parallelize(Array("a", "a", "b", "c", "c", "c"))
val pairRdd = rdd.map((_, 1))
val groupRdd = pairRdd.groupByKey()
val reduceRdd = groupRdd.reduceByKey(_ + _)
reduceRdd.collect.foreach(println)
  • 结果

("a", 2)
("b", 1)
("c", 3)

aggregateByKey(zeroValue)(seqOp, combOp)

  • 说明
    将各个分区内的数据将zeroValue作为初始值的情况下,先通过seqOp函数进行分区内聚合,在通过combOp函数将各分区的聚合数据进行合并聚合
  • 示例
val rdd = sc.parallelize(Array(("a", 3), ("a", 2), ("c", 4), ("b", 3), ("c", 6), ("c", 8)), 2)
val aggrRdd = rdd.aggregateByKey(0)(Maths.max(_, _), _ + _)
aggrRdd.collect.foreach(println)
  • 结果

("b", 3)
("a", 3)
("c", 12)

foldByKey(zeroValue)(func)

  • 说明
    与aggregateByKey功能相似,seqOp与combOp相同
  • 示例
val rdd = sc.parallelize(Array(("a", 3), ("a", 2), ("c", 4), ("b", 3), ("c", 6), ("c", 8)), 2)
val foldRdd = rdd.foldByKey(0)(_ + _)
foldRdd.collect.foreach(println)
  • 结果

("b", 3)
("a", 5)
("c", 18)

combineByKey(createCombiner, mergeValue, mergeCombiner)

  • 说明

    遍历分区内全部key,如果是第一次遍历到该key则通过createCombiner函数创建初始值,遍历后通过mergeValue函数对分区内key相同的值进行聚合,最后通过mergeCombiner函数将各个分区的聚合结果,按照key进行聚合

  • 示例

// 例:计算每个key的平均值
val rdd = sc.parallelize(Array(("a", 3), ("a", 2), ("c", 4), ("b", 3), ("c", 6), ("c", 8)), 2)
val combineRdd = rdd.combineByKey((_, 1), (a, v) => (a._1 + v,  a._2 + 1), (a1, a2) => (a1._1 + a2._1, a1._2 + a2._2))
val avgRdd = combineRdd.map((k, v) => (k, v._1 / v._2))
avg.Rdd.collect.foreach(println)
  • 结果

("a", 2.5)

("b", 4.5)

("c", 9)

sortByKey([ascending], [numPartitions])

  • 说明

    按照key值进行排序,其中key的类型必须实现Ordered接口

  • 示例

val rdd = sc.parallelize(Array((4, "d"), (1,"a"), (2,"b"), (3,"c")))
val sortRdd = rdd.sortByKey(true)
sortRdd.collect.foreach(println)
  • 结果

(1,"a")

(2,"b")

(3,"c")

(4, "d")

mapValues

  • 说明

    对kv对RDD中的value进行map操作

  • 示例

val rdd = sc.parallelize(Array((4, "d"), (1,"a"), (2,"b"), (3,"c")))
val upperRdd = rdd.mapValues(_.toUpperCase)
upperRdd.collect.foreach(println)
  • 结果

(1,"A")

(2,"B")

(3,"C")

(4, "D")

join(otherDataset)

  • 说明

    将两个RDD具有相同key的元素进行组合

  • 示例

val rdd1 = sc.parallelize(Array((4, "d"), (1,"a"), (2,"b"), (3,"c")))
val rdd2 = sc.parallelize(Array((4, 3), (2, 1), (3, 10), (1, 5)))
val joinRdd = rdd1.join(rdd2)
joinRdd.collect.foreach(println)
  • 结果

(1, ("a", 5))

(2, ("b", 1))

(3, ("c", 10))

(4, ("d", 3))

cogroup(otherDataset, [numPartitions])

  • 说明

    将两个KV类型RDD组合,将value形成集合返回

  • 示例

val rdd1 = sc.parallelize(Array((4, "d"), (1,"a"), (2,"b"), (3,"c")))
val rdd2 = sc.parallelize(Array((4, 3), (3, 10), (1, 5)))
val cogroupRdd = rdd1.cogroup(rdd2)
cogroupRdd.collect.foreach(println)
  • 结果

(1, (Compact("a"), Compact(5)))

(2, (Compact("b"), Compact()))

(3, (Compact("c"), Compact(10)))

(4, (Compact("d"), Compact(3)))

相关文章

  • Spark-RDD算子

    map 说明 通过指定变换函数将原有RDD中的元素逐个进行变换,并返回一个新的RDD 示例 结果 246810 ...

  • Spark-RDD行动算子

    reduce(func) 说明通过func函数聚合RDD中所有元素,先聚合分区内元素,再聚合分区间元素 示例val...

  • spark-RDD算子操作分类

    RDD算子操作分类 测试用例说明 前置方法 后置方法 成员变量 1.taansformation(转换) 它可以实...

  • 计算机视觉:matlab实现9种边缘检测算子

    9种边缘算子 Canny算子 Sobel算子 Susan算子 Prewitt算子 Laplace算子 LOG算子 ...

  • spark-rdd

    rdd Resilient Distributed DataSets 容错的 并行的数据结果 transform ...

  • Spark-RDD介绍

    RDD 1 RDD介绍 Driver program:包含程序的main()方法,RDDs的定义和操作。管理节点,...

  • Spark-RDD分区

    RDD分区 在分布式程序中,通信的代价是很大的,因此控制数据分布以获得最少的网络传输可以极大地提升整体性能。所以对...

  • Spark-RDD详解

    1.RDD是什么 RDD:Spark的核心概念是RDD (resilientdistributed data...

  • Spark-RDD简介

    什么是RDD RDD(Resilient Distributed Dataset)称为弹性分布式数据集,是Spar...

  • 1.5.1.1 Spark-RDD

    总目录:https://www.jianshu.com/p/e406a9bc93a9 Hadoop - 子目录:h...

网友评论

      本文标题:Spark-RDD算子

      本文链接:https://www.haomeiwen.com/subject/mwbgqktx.html