2020/07/04 -
引言
这部分内容在我当时处理流量数据的时候是用的最多的东西,即使后续的时候是使用DataFrame形式的数据,其实也是按照key/value形式来进行这部分操作。我应该明白,这种形式的数据是处理的基础,对比hadoop的处理方式就明白了,最简单的例子就是wordcount,就是按照word为key;然后即使是DataFrame有时候也会经常使用这种形式来进行聚合,或者groupby这类操作。上面的内容是说,这种操作的重要性,但是没有说明为什么要做这种操作?这个问题应该结合数据来进行回答,让我凭空来回答这个东西,实际上我也弄不清楚。其实数据库也是做这些操作。你说,他能groupby,你只说出了这个东西有个这个功能,但是你没有说出这个功能的实际价值。当然,让我举例子,我也能举出来,比如说,我按照某个ip为键,然后group这个东西,其实就能看到这个IP的流量流量数据,交互的节点个数等等。
但我觉得,应该按照这种操作是一种原子操作来理解,那么也就是说,在解决问题的时候,我将我的问题转化为或者将某个步骤分解为这种按照key/value来处理的过程。这样来理解,就潜在中明白了这个东西能干什么。
这章的主要内容有以下几个方面:
1)将数据转化为key/value形式
2)key/value形式的数据都支持哪些操作
3)数据分区,partitioning
key/value形式的RDD同样拥有两种形式的操作,一种是针对单个RDD中的key进行聚合或者什么操作,另一种是面向两个RDD,对他们相同的key来进行一些操作。
小节内容:
- 创建key/value对RDD
- 面向pair RDD的转化操作
- pair RDD的动作
- 高级数据分区
- 小节
1. 创建key/value的对RDD
在之前的操作中,使用sc.textFile来获取RDD得到是按行组织的数据,这种形式属于最基础的形式,并不是pair形式,进行一定的转化。创建pairRDD有多种方式,在后续第五章中讲解的加载数据的形式中,是直接返回pair形式的RDD的,这让我想起来当时处理hadoop形式数据的时候那种感觉。另外一种就是利用map将前面创建的文本形式数据转化为pair形式的RDD。
总结一下就是两种方案:1)利用加载数据的形式,直接返回pairRDD,这种应该会需要数据的支持,同时指定类似分隔符一样的东西。2)通过map转化为pairRDD。转化方法一般就是将数据中的某些东西转化为元组(),但是经过测试,转化为[]也是可以的。但是数据必须是只有两个数据,不然groupByKey就会报错。错误信息是
for k, v in iterator:
ValueError: too many values to unpack
所以就是说,只能是两个元素的元组或者数组。
2. 面向pair RDD的转化操作
对于pair RDD,之前第三章中讲到的转化操作都是可行的,但是需要注意的是,函数必须是处理两个参数。而对于能够处理key的转化操作,可以不用考虑这个问题。
大致的操作如下:
单个RDD按key操作
key
其中groupByKey返回的RDD在collect之后,他的数据是不展示的,返回的是一个数组,然后每个元素是一个key和可遍历的数据,这个可遍历的数据是不展示的。
groupByKey
然后还有一个,我测试过程中发现的,groupBy这个函数,他最少一个有一个参数,这个参数是一个函数。
他的工作流程是这样的,就是按照这个函数返回的数值作为键,这个我感觉其实不是面向对RDD的,但是我没有在前一章节中找到这个函数,也就是说,这个书上并没列举出所有的东西。
聚合操作
reduceByKey,类似reduce,但是是按照key来进行,返回一个RDD。
flodByKey类似flod
通过reduceByKey和mapValues可以获取每个key的平均值,他是这么操作的,感觉稍有复杂。
a.mapValues(lambda x:(x,1)).reduceByKey(lamdba x,y : x[0] + y[0], x[1] + y[1])
这个流程是这样,先将每个key的value都按照1计数,然后统计总和和个数
但是我感觉是不是应该有一些更好的方式呢?就是有内置的函数可以帮助处理这些事情。我记得pandas就有这种,就是agg().aver类似这种。(留个疑问)
为了实现单词技术的方式,(这里RDD中的元素是每行都是文本),然后先使用flatMap将每一个元素变为单词,然后map(x,1),最后reduceByKey。
x.flatMap(Lambda x:x.split(" ")).map(lambda x: (x,1)).reduceByKey(lambda x,y:x+y)
但是通过下面这种方式会更快x.flagMap(lamdba x:x.split(" ")).countByValue()
最后一个combineByKey类似combine,他能够提供更加灵活的组合方式,他是现在单个分区上进行某种操作,主要是三个参数,都是函数:1)第一次在这个分区见到这个数据的时候2)第二次见到这个key的时候3)在多个分区之间进行组合的时候
而且其他的对key进行操作的流程都是基于这个借口来做的。但一般推荐直接使用相应的函数。我暂时不是很理解这个函数的作用在什么地方。
调整并行度
我刚开始的时候,还挺疑惑,就是为什么上面提到的这些函数都有一个numPartitions参数。一般来说,不需要调整这个东西的参数,spark会自动来弄。但是这个东西存在的意义是什么呢?我仔细思考了一下有点明白了。正常情况下,rdd本身就是多个分区的,然后一些操作的并行度也是这个数量来决定的。但是,就比如hadoop最后的mapreduce过程,最后的时候,这些数据都被重新分配了,那么这个时候多少个分区,是可以用户来指定的。通过指定这个分区来调整后续的性能。
不过这个数量,我也不是很清楚,是为什么呢?是不是一些导致这个RDD碎片化太严重呢?
当在执行这些操作之外的地方进行分区的调整的话,可以使用repartition或者coalesce。repartition会完全打乱数据然后重新分区,这个过程就会导致中间大量的网络交互,coalesce是一个优化版,但是也仅限于减少RDD分区的时候。
重新分区
但是最好,首先通过getNumPartitions()知道一下有多少个分区。
group数据
这个过程前面已经说过了,就是返回一个列表,但是如果你要进行的操作是可以利用reduceByKey完成的,那就是hi用reduceByKey的形式,这种更快。
同时group还能在两个RDD上操作
其他的还有join,排序
3. pair RDD的动作
动作4. 高级数据分区
在分布式的数据处理过程中,如果有大量的网络交互,那么大量的时间就会消耗在这个过程中;我对这个事情深有体会,就是一旦有数据倾斜,在读取数据的时候就很长时间。
对于那种只使用一次的数据来说,分区数量是无所谓的;而那种能够多次使用的,特别是key/value形式的操作就能有效提高性能。(个人猜测是因为按照key来进行重分区,所以有些操作,其实是对性能没有影响的,这让我想起来了,就是他的那个任务图上的时间信息,这些估计就是能够调优的部分;其实我觉得,这个分区是不是也跟worker这些信息有关呢,就是调整这个过程有没有关系呢?)
示例1
一个流式处理的场景,在程序运行后,一个用户表将被读取,后续的操作是,没五分钟与新来的数据进行join。那么这个join的过程呢,就是两个数据都进行打乱,然后按照key发往同一个机器。
join示例
因为这是一个流式场景,也就是所,每次执行他都要这样进行,这里面多余的操作就是userData进行传递的过程,也就是左边的数据部分。
添加partitionBy再添加了这个东西之后,就可以再首次的时候将这个数据进行重新分区。注意看这里的一个东西,persist(),这个东西我觉得应该很重要,这个可能也是影响性能的关键。
对于这种调用过partitionBy函数的RDD,spark会利用这个信息,后续的时候在进行数据传输就会不一样。
使用partitionBy之后的join
这样弄的结果,就是只有events被shuffle。那么后面为什么还要添加一个persist呢。
为什么要persist
原因是partitionBy是一个转化操作,他是返回一个新的RDD,那么也就是说,如果你不cache或者persist的话,每次还是再原来的RDD上重新来计算。所以这一步很关键。整体上就是因为分区后的RDD才是我要多次使用的RDD。下面这张图是83页的内容,也说明了这个问题。
另一个证明
其他的一些按照key来进行操作的函数,都会使用分区后的信息来增快速度;但map这种就没有这种效果,反而会导致失去分区信息。
python中partitionBy的参数只需要传递分区个数即可。
关于使用partitionBy的过程中,一方面要考虑你的这些操作是否能从partitionBy中收到好处;另一方面,就是你的操作会不会将原始的分区好的RDD给打乱,这是需要仔细考虑的(p83)。例如map,不要想着你的操作如果产生了键值对,他就会帮你重新分,或者必然分,使用了map,spark就默认认为你的分区乱了。
同时还有一些针对两个RDD的操作,他们生成的新的RDD的分区取决于上层的RDD。如果两个都有分区,取决于第一个(这个第一个是什么意思呢?),反正就是其中的一个;如果是只有其中一个有,那就是取决于这个分区。
示例2 PageRank
这个示例前面的差不多,处理的逻辑稍微复杂了一点。这里要注意的地方
1)在partitionBy之后,要使用persist进行持久化
2)为了不打乱分区信息,不要使用map,而是用mapValues.
这里再多说两句,你可以看到,partitionsBy在这里的示例中,都是直接在textFile之后进行的;而如果你在别的地方使用的话,但是这个也是惰性操作,也就是说,他也是actions来启动,在这个过程中shuffle数据。
5. 小节
本章节主要讲解了pair类型的RDD的基础操作,同时详细介绍了分区的特性。在实际操作过程中,分区的特性可以大额提高性能,他主要就是通过分区之后,然后固化,来减少这个整体的网络消耗。
网友评论