美文网首页
SparkCore(一)(RDD和一些算子)

SparkCore(一)(RDD和一些算子)

作者: 八爪鱼下水 | 来源:发表于2021-03-29 21:02 被阅读0次

什么是RDD

RDD(Resilient Distributed Dataset)叫做分布式数据集,是Spark中最基本的数据抽象。代码中是一个抽象类,它代表一个不可变、可分区、里面的元素可并行计算的集合。

  • 一组分区(Partition),即数据集的基本组成单位;

  • 一个计算每个分区的函数;

  • RDD之间的依赖关系;

  • 一个Partitioner,即RDD的分片函数;

  • 一个列表,存储存取每个Partition的优先位置(preferred location)。

RDD创建的方法

  • 从集合中创建 并行度一般为2
##makerdd或parallise都是根据totalcpucores和2比较最大值
##如果直接覆盖makerdd或parallise的第二个分区个数的参数可以改变数量
 override def defaultParallelism(): Int = {
    conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
  }

#查看源码所得
sc.parallelize

#makeRDD实际上是在内部创建了一个parallelize
sc.makeRDD

  • 从文件中转换
#从文件转换RDD
sc.textFile

#从文件夹拉取多个文件
sc.wholeTextFiles("data/baseinput/ratings100/")

  • textFile在读取小文件的时候,会参考小文件的个数,文件个数越多,分区个数越多

  • sc.textFile遇到小文件没有办法很好合并小文件的,即便重写第二个参数也没有作用

  • 用textFile时,它的partition的数量是与文件夹下的文件数量(实例中用3个xxx.log文件)相关,一个文件就是一个partition(既然3个文件就是:partition=3)。

  • wholeTextFiles的partition数量是根据用户指定或者文件大小来(文件内的数据量少 有hdfs源码默认确定的)

  • 确定与hdfs目录下的文件数量无关!所以说:wholeTextFile通常用于读取许多小文件的需求。

查看RDD分区的shell命令

#从集合中创建
sc.parallelize(Seq(1,2,3,4))

#查看分区数量(并行数量)
res3.getNumPartitions

#查看分区并行数量的内容
#将每一个分区形成一个数组,形成新的RDD类型时RDD[Array[T]]

res3.glom.collect

#查看分区数量(并行数量)
res3.partitions.length

关于DRR分区决定因素

  • 第一点:RDD分区的原则是使得分区的个数尽量等于集群中的CPU核心(core)数目,这样可以充分利用CPU的计算资源;

  • 第二点:在实际中为了更加充分的压榨CPU的计算资源,会把并行度设置为cpu核数的2~3倍;

  • 第三点:RDD分区数和启动时指定的核数、调用方法时指定的分区数、如文件本身分区数有关

partitionBy 改变分区

解析:

  • 对RDD进行分区操作,如果原有的partionRDD和现有的partionRDD是一致的话就不进行分区, 否则会生成ShuffleRDD.
scala> val rdd = sc.parallelize(Array((1,"aaa"),(2,"bbb"),(3,"ccc"),(4,"ddd")),4)
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[44] at parallelize at <console>:24

scala> rdd.partitions.size
res24: Int = 4

scala> var rdd2 = rdd.partitionBy(new org.apache.spark.HashPartitioner(2))
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[45] at partitionBy at <console>:26

scala> var rdd3 = rdd.partitionBy(new org.apache.spark.RangePartitioner(2,rdd))
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[40] at partitionBy at <console>:25

scala> rdd2.partitions.size
res25: Int = 2
scala> rdd2.glom.collect
res26: Array[Array[(Int, String)]] = Array(Array((2,bbb), (4,ddd)), Array((1,aaa), (3,ccc)))
scala> rdd3.glom.collect
res27: Array[Array[(Int, String)]] = Array(Array((1,aaa), (2,bbb)), Array((3,ccc), (4,ddd)))

注意:Spark采用的分区有三种:

  • 水平分区,也就是sc.makerdd按照下标元素划分,

  • Hash划分根据数据确定性划分到某个分区,一般只给定分区数。

  • Range分区该方法一般按照元素大小进行划分不同区域,每个分区表示一个数据区域,如数组中每个数是[0,100]之间的随机数,Range划分首先将区域划分为10份,然后将数组中每个数字分发到不同的分区,比如将18分到(10,20]的分区,最后对每个分区进行排序。

RDD编程

在Spark中,RDD被表示为对象,通过对象上的方法调用来对RDD进行转换。经过一系列的transformations定义RDD之后,就可以调用actions触发RDD的计算,action可以是向应用程序返回结果(count, collect等),或者是向存储系统保存数据(saveAsTextFile等)。在Spark中,只有遇到action,才会执行RDD的计算(即延迟计算),这样在运行时可以通过管道的方式传输多个转换。

要使用Spark,开发者需要编写一个Driver程序,它被提交到集群以调度运行Worker,如下图所示。Driver中定义了一个或多个RDD,并调用RDD上的action,Worker则执行RDD分区计算任务。

RDD的转化 ( 重点掌握 )

RDD整体上分为 TRANSFORMATIONS 跟 ACTIONS 两种

Value类型

map(func) 重点

将RDD创建的集合转换为另外一个映射集合,例如,如果将一个Array中的数全部 *2 输出,那么就会用到map方法。例如

//创建一个array
scala> sc.makeRDD(1 to 10)
res0: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:25


//集合内每个元素*2
scala> res0.map(_*2)
res1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:27


//打印输出
scala> res1.collect()
res2: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)        
mapPartitions(func)

类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]。假设有N个元素,有M个分区,那么map的函数的将被调用N次,而mapPartitions被调用M次,一个函数一次处理所有分区。同样以上述的需求为例:

//创建一个array
scala> sc.makeRDD(1 to 10)
res0: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:25


//集合内每个元素*2
scala> res0.mapPartitions(x=>x.map(_*2))
res1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:27


//打印输出
scala> res1.collect()
res2: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)       
mapPartitionsWithIndex(func)

类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U];

glom

将每一个分区形成一个数组,形成新的RDD类型时RDD[Array[T]]

scala> val rdd = sc.parallelize(1 to 16,4)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[65] at parallelize at <console>:24
scala> rdd.glom().collect()
res25: Array[Array[Int]] = Array(Array(1, 2, 3, 4), Array(5, 6, 7, 8), Array(9, 10, 11, 12), Array(13, 14, 15, 16))
groupBy(func) 重点

分组,按照传入函数的返回值进行分组。将相同的key对应的值放入一个迭代器。

scala> val rdd = sc.parallelize(1 to 4)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[65] at parallelize at <console>:24

scala> val group = rdd.groupBy(_%2)
group: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[2] at groupBy at <console>:26

scala> group.collect
res0: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(2, 4)), (1,CompactBuffer(1, 3)))

上述例子解释是创建一个1到4的序列,然后把能被2整除的放进一个元祖中,不能被2整除的放入另外一个元祖中。那么分组的条件就是%2

filter(func) 重点

过滤。返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成。比如创建一个RDD(由字符串组成),过滤出一个新RDD(包含”xiao”子串)

scala> var sourceFilter = sc.parallelize(Array("xiaoming","xiaojiang","xiaohe","dazhi"))
sourceFilter: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[10] at parallelize at <console>:24

scala> sourceFilter.collect()
res9: Array[String] = Array(xiaoming, xiaojiang, xiaohe, dazhi)

scala> val filter = sourceFilter.filter(_.contains("xiao"))
filter: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at filter at <console>:26

scala> filter.collect()
res10: Array[String] = Array(xiaoming, xiaojiang, xiaohe)

sortBy(func,[ascending], [numTasks]) 重点

使用func先对数据进行处理,按照处理后的数据比较结果排序,默认为正序。

//创建一个RDD
scala> val rdd = sc.parallelize(List(2,1,3,4))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[21] at parallelize at <console>:24

//按照自身大小排序
scala> rdd.sortBy(x => x).collect()
res11: Array[Int] = Array(1, 2, 3, 4)

//按照与3余数的大小排序
scala> rdd.sortBy(x => x%3).collect()
res12: Array[Int] = Array(3, 4, 1, 2)

Key-Value类型

partitionBy

pairRDD进行分区操作,如果原有的partionRDD和现有的partionRDD是一致的话就不进行分区, 否则会生成ShuffleRDD,即会产生shuffle过程。

groupByKey

作用:groupByKey也是对每个key进行操作,但只生成一个sequence。

//创建一个pairRDD
scala> val words = Array("one", "two", "two", "three", "three", "three")
words: Array[String] = Array(one, two, two, three, three, three)

scala> val wordPairsRDD = sc.parallelize(words).map(word => (word, 1))
wordPairsRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[4] at map at <console>:26

//将相同key对应值聚合到一个sequence中
scala> val group = wordPairsRDD.groupByKey()
group: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[5] at groupByKey at <console>:28

//打印结果
scala> group.collect()
res1: Array[(String, Iterable[Int])] = Array((two,CompactBuffer(1, 1)), (one,CompactBuffer(1)), (three,CompactBuffer(1, 1, 1)))

//计算相同key对应值的相加结果
scala> group.map(t => (t._1, t._2.sum))
res2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[6] at map at <console>:31

//打印结果
scala> res2.collect()
res3: Array[(String, Int)] = Array((two,2), (one,1), (three,3))
reduceByKey(func, [numTasks])

在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,reduce任务的个数可以通过第二个可选的参数来设置。

//创建一个pairRDD
scala> val rdd = sc.parallelize(List(("female",1),("male",5),("female",5),("male",2)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[46] at parallelize at <console>:24

//算相同key对应值的相加结果
scala> val reduce = rdd.reduceByKey((x,y) => x+y)
reduce: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[47] at reduceByKey at <console>:26

//打印结果
scala> reduce.collect()
res29: Array[(String, Int)] = Array((female,6), (male,7))

reduceByKey和groupByKey的区别

1.reduceByKey:按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[k,v].

2.groupByKey:按照key进行分组,直接进行shuffle。

aggregateByKey

在kv对的RDD中,,按key将value进行分组合并,合并时,将每个value和初始值作为seq函数的参数,进行计算,返回的结果作为一个新的kv对,然后再将结果按照key进行合并,最后将每个分组的value传递给combine函数进行计算(先将前两个value进行计算,将返回结果和下一个value传给combine函数,以此类推),将key与计算结果作为一个新的kv对输出。

(1)zeroValue:给每一个分区中的每一个key一个初始值;

(2)seqOp:函数用于在每一个分区中用初始值逐步迭代value;

(3)combOp:函数用于合并每个分区中的结果。

//创建一个pairRDD
scala> val rdd = sc.parallelize(List(("a",3),("a",2),("c",4),("b",3),("c",6),("c",8)),2)
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24

//取出每个分区相同key对应值的最大值,然后相加
scala> val agg = rdd.aggregateByKey(0)(math.max(_,_),_+_)
agg: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[1] at aggregateByKey at <console>:26

//打印结果
scala> agg.collect()
res0: Array[(String, Int)] = Array((b,3), (a,3), (c,12))
reduceByKey和groupByKey的区别
  1. reduceByKey:按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[k,v].
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
  1. groupByKey:按照key进行分组,直接进行shuffle。
val createCombiner = (v: V) => CompactBuffer(v) ,它把一个V变成一个C(例    如,创建一个单元素列表)
val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v ,将一个V合并到一个C中(例如,将它添加到列表的末尾)
val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2 ,将两个C合并成一个C。
  1. 开发指导:reduceByKey比groupByKey,建议使用。但是需要注意是否会影响业务逻辑。

相关文章

  • SparkCore(一)(RDD和一些算子)

    什么是RDD RDD(Resilient Distributed Dataset)叫做分布式数据集,是Spark中...

  • Spark性能调优-RDD算子调优篇(深度好文,面试常问,建议收

    RDD算子调优 不废话,直接进入正题! 1. RDD复用 在对RDD进行算子时,要避免相同的算子和计算逻辑之下对R...

  • Spark之RDD算子-创建算子

    RDD算子是Spark计算框架中定义的对RDD进行操作的各种函数,从RDD算子的功能可将RDD算子分为四类,创建算...

  • RDD常见算子

    RDD算子的分类 RDD算子从对数据操作上讲,大致分为两类: 转换(transformations)和行动(act...

  • spark面试题

    1、rdd有哪些算子? 主要分为转换算子和action算子。 transformation:map、filte...

  • RDD算子

    一、RDD算子简介 RDD算子分为两类:Transformation(转换)与Action(行动)Transfor...

  • SparkCore之RDD

    RDD 五大特性 A list of partitions一组分区:多个分区,在RDD中用分区的概念。 A fun...

  • Spark的算子(函数)

    Spark的算子 1、RDD基础 什么是RDD? RDD(Resilient Distributed Datase...

  • Spark 控制算子源码解析

    Spark 控制算子源码解析 RDD persist() 算子 使用指定的level来标记RDD进行存储。 可以看...

  • 算子

    spark--------- rdd算子学习 ---------转换算子: 返回值还是一个rdd就是转换是懒加载的...

网友评论

      本文标题:SparkCore(一)(RDD和一些算子)

      本文链接:https://www.haomeiwen.com/subject/nuxihltx.html