rdd = sc.parallelize([('B',1),('B',2),('A',5),('A',4),('A',3)]).repartition(3)
print("partitions details:",rdd.glom().collect())
inOfPartion = (lambda el: [(el, el ** 2)])
mergeValInOfPartion = (lambda agg1,e:agg1+[(e,e**3)])
mergeOutOfPartition = (lambda p1,p2:p1+p2)
rslt = rdd.combineByKey(inOfPartion,mergeValInOfPartion,mergeOutOfPartition)
print("result details:",rslt.collect())
1.rdd 分为三个分区,分区的数据如下:
partitions details: [[], [('B', 1), ('A', 5), ('A', 3)], [('B', 2), ('A', 4)]]
分区1:[]
分区2:[('B', 1), ('A', 5), ('A', 3)]
分区3:[('B', 2), ('A', 4)]
inOfPartion : 针对于同一个分区相同健的第一个值的处理,比如分区2中键为A,inOfPartion处理的就是('B',1),('A',5), 而不是('A',3),处理方法为:[(e1,e1**2)]
mergeValInOfPartion : 在inOfPartion的基础上,处理同一分区相同健的不同值,比如
('A',3), 对于这个值的处理方法是: [(e,e**3)] (当然怎么处理自己定义)
mergeOutOfPartition : 处理不同的分区相同的健的值,如把第二分区中A与第三分区中的值进行合并处理。最后处理的结果如下:
result details: [('B', [(1, 1), (2, 4)]), ('A', [(5, 25), (3, 27), (4, 16)])]
注意在以后的使用过程中,尽量使初始值的处理方式与对相同健值的处理方式一样,否则不一样的话,数据的顺序可能会导致不同的结果
如果将A中的顺序颠倒,结果就不一样了,如下所示:
rdd = sc.parallelize([('B',1),('B',2),('A',3),('A',4),('A',5)]).repartition(3)
result details: [('B', [(1, 1), (2, 4)]), ('A', [(3, 9), (5, 125), (4, 16)])]
网友评论