4.1 动机
Spark为包含键值对类型的RDD提供了一些专有的操作——pair RDD。pair RDD是很多程序的构成要素,因为他们提供并行操作各个键或跨节点重新进行数据分组的操作接口。
4.2 创建Pair RDD
当需要把一个普通的RDD转为pair RDD,可以调用map()函数来实现,传递函数需要返回键值对。
在python中,为了让提取键值对之后的数据能够在函数中调用,需要返回一个由二元组组成的RDD。
pairs = lines.map(lambda x:(x.split("")[0],x))
当用Scala或者python从一个内存数据集创建pair RDD时,只需要对这个由二元组组成的集合调用SparkContext.parallelize()方法。
4.3 Pair RDD的转化操作
pair RDD支持所有RDD支持的函数
4.3.1 聚合操作
reduceByKey()会为数据集中的每个键进行并行的规约操作,每个规约操作会将键相同的值合并起来,返回一个由各键和对应键规约出来的结果值组成的新的RDD,所以是转化操作。
foldByKey()使用一个与RDD和合并函数中的数据类型相同的零值作为初始值,所使用的合并函数对零值与另一个元素进行合并,结果仍为该元素。
combineByKey()是最为常用的基于键进行聚合的函数。combineByKey会遍历分区中所有的元素,如果访问的是一个新元素,combineByKey()会使用一个叫createCombiner()的函数来创建那个键对应的累加器的初始值。这个过程会发生在每个分区。如果这是一个在处理当前分区之前已经遇到的键,它会使用mergeValue()方法将该键的累加器对应的当前值与这个新的值进行合并。mergeCombiners()方法将来自各分区的累加器结果合并。
sumCount = nums.combineByKey((lambda x:(x,1),
(lambda x,y:(x[0]+y,x[1]+1)),
(lambda x,y:(x[0]+y[0],x[1]+y[1])))
sumCount.map(lambda key,xy:(key ,xy[0]/xy[1])).collectAsMap()
并行度调优
每个RDD都有固定数目的分区,分区数目决定了在RDD上执行操作时的并行度。spark会猜测集群的大小给出一个默认的分区数,但是有时需要你自己对并行度进行调优获得一个好的性能提升。
本章讨论的绝大多数操作符都可以接受第二个参数,即指定分组结果或聚合结果的RDD分区数。spark提供了repartition()函数用于分组和聚合之外操作的分区调优,它会把数据通过网络进行混洗,并创建出新的分区集合,混洗开销大,优化版的repartition叫做coalesce(),在Python中,rdd.getNumPartition()查看RDD的分区数。
4.3.2 数据分组
若数据已经以预期方式提取了键,groupByKey()就会使用RDD中的键来对数据进行分组。
groupBy()可用于未成对数据,也可以根据除键相同以外的条件进行分组。
cogroup()对多个共享一个键的RDD进行分组。
4.3.3 链接
——连接方式:右外连接、左外连接、交叉连接、内连接。
普通的join为内连接。只有在两个pair RDD中都存在的键才叫输出。
leftOuterJoin(other)结果pair RDD中,源RDD的每一个键都有对应的记录。每个键相应的值是由一个源RDD中的值与一个包含第二个RDD的值的option对象组成的二元组。
rightOuterJoin()预期结果中的键必须出现在第二个RDD中,而二元组中的可缺失的部分则来自于源RDD而非第二个RDD。
4.3.4 数据排序
sortByKey()函数接收一个ascending的参数,表示我们是否想要让结果按升序排序(默认true)。当然我们也可以提供自定的比较函数进行自定义的排序。
rdd.sortByKey(ascending = True,numPartition = None,keyfunc = lambda x :str(x))
4.4 Pair RDD的行动操作
所有基础的RDD行动操作也都在pair RDD上可用。
countByKey()对每个键对应的元素分别计数
collectAsMap()将结果以映射表的形式返回,以便查询。
lookup()返回给定键对应的所有值
4.5 数据分区(进阶)
spark程序可以通过控制RDD分区方式减少通信开销。分区并不是一直都有好处,只有当数据集多次在诸如连接这种基于键的操作中使用时,分区才会帮助。
经管spark没有给出显示控制每个键具体落在哪一个工作节点上的方法,但spark可以确保同一组的键出现在同一个节点上。
在Python中,不能将HashPartitioner对象传给partitionBy,而只需把需要的分区数传递过去。
4.5.1 获取RDD的分区方式(略)
4.5.2 从分区中获益的操作
Spark的许多操作都引入了将数据根据跨节点进行混洗的过程。而这些操作都能从混洗中获益(比如避免数据倾斜),但像join这样的二元操作,预先进行数据分区会导致其中至少一个RDD不发生数据混洗(因为,当其中一个RDD打乱后,在join的时候,这种关联就打乱了。混洗通常也是针对某个key而言,找到一个分布比较均衡的key作为混洗对象,有益于join等二元操作)。
4.5.3 影响分区方式的操作
Spark内部知道各操作会如何影响分区方式,并将会对数据进行分区的操作的结果RDD自动设置为对应的分区器(参考哈希分区)。不过,转化操作的结果不一定会按已知的分区方式分区,这时输出的RDD可能就会没有设置分区器。
最后,对于二元操作,输出数据的分区方式取决于父RDD的分区方式。默认情况下,结果会采用哈希分区,分区的数量和操作的并行度一样。
4.5.4 示例:PageRank
——用来根据外部文档指向一个文档的链接,对集合中每个文档的重要程度赋一个度量值。它是执行多次连接的一个迭代算法。
两个数据集:(PageID ,linkList)和(pageID,rank)
核心:在每次迭代中,对页面p,向其每个相邻页面(有直接链接的页面)发送一个值为rank(p)/numNeighbors(p),将每个页面的排序值设为0.15+0.85*contributionsReceived
注:为了最大化分区相关的优化潜在作用,你应该在无需改变元素的键时尽量使用mapValues()或flatMapValues()。
4.5.5自定义分区方式
好的partitioner可以减少很多的网络通信开销,Spark提供HashPartitioner和RangePartitioner分区方式满足绝大部分的用例,同时也允许自定义的分区方式。
实现自定义的分区器,需要继承org.apache.spark.Partitioner类实现下面三个方法:
1)numPartitioner:Int:返回创建出来的分区数。
2)getPartitioner(key:Any):Int:返回给定键的分区编号(0到numPartitions-1)。
3)equals():Java判断相等性的标准方法。这个方法的实现非常重要,Spark需要用这个方法来检查你的分区器对象是否和其他分区器实例相同,这样spark才可以判断两个RDD的分区方式是否相同。
在Python中,不需要扩展partitioner类,而是把一个特定的哈希函数作为一个额外的参数传给RDD.partitionBy()函数。
import urlparse
def hash_domain(url):
return hash(urlparse.urlparse(url).netloc)
rdd.partitionBy(20,hash_domain)#创建20个分区
如果你想要对多个RDD使用相同的分区方式,就应该使用相同的一个函数对象,比如一个全局函数,而不是为每个RDD创建一个新的函数对象。
网友评论