map
sc.parallelize([4,5,7,3,8,6,2,9,1]).map(lambda x: (x*2, x**2)).collect()
[(8, 16),
(10, 25),
(14, 49),
(6, 9),
(16, 64),
(12, 36),
(4, 4),
(18, 81),
(2, 1)]
flatMap
sc.parallelize([4,5,7,3,8,6,2,9,1]).flatMap(lambda x: (x*2, x**2)).collect()
[8, 16, 10, 25, 14, 49, 6, 9, 16, 64, 12, 36, 4, 4, 18, 81, 2, 1]
glom
sc.parallelize([4,5,7,3,8,6,2,9,1,0], 2).glom().collect()
[[4, 5, 7, 3, 8], [6, 2, 9, 1, 0]]
mapPartitions
rdd = sc.parallelize([4,5,7,3,8,6,2,9,1,0], 2)
def f(iter):
yield sum(iter)
y = rdd.mapPartitions(f)
print(rdd.glom().collect())
print(y.glom().collect())
[[4, 5, 7, 3, 8], [6, 2, 9, 1, 0]]
[[27], [18]]
mapPartitionsWithIndex
rdd = sc.parallelize([4,5,7,3,8,6,2,9,1,0], 2)
def f(index, iter):
yield (index, sum(iter))
y = rdd.mapPartitionsWithIndex(f)
print(rdd.glom().collect())
print(y.glom().collect())
[[4, 5, 7, 3, 8], [6, 2, 9, 1, 0]]
[[(0, 27)], [(1, 18)]]
getNumPartitions
sc.parallelize([4,5,7,3,8,6,2,9,1,0], 2).getNumPartitions()
2
distinct
sc.parallelize([1, 2, 3, 4, 5, 1, 2, 3, 4, 5]).distinct().collect()
[1, 2, 3, 4, 5]
sample
rdd = sc.parallelize(range(7), 2)
samList = [rdd.sample(False, 0.5) for i in range(5)]
print('rdd.collect()的值是{}'.format(rdd.collect()))
for index, d in zip(range(len(samList)), samList):
print('sample: {0} y = {1}'.format(index, d.collect()))
rdd.collect()的值是[0, 1, 2, 3, 4, 5, 6]
sample: 0 y = [0, 1, 3, 5]
sample: 1 y = [0, 1, 6]
sample: 2 y = [5, 6]
sample: 3 y = [0, 1, 2, 3, 5, 6]
sample: 4 y = [0, 5]
takeSample
rdd = sc.parallelize(range(15), 2)
samList = [rdd.takeSample(False, 4) for i in range(5)]
print('rdd.collect()的值是{}'.format(rdd.glom().collect()))
for index, d in zip(range(len(samList)), samList):
print('sample: {0} y = {1}'.format(index, d))
rdd.collect()的值是[[0, 1, 2, 3, 4, 5, 6], [7, 8, 9, 10, 11, 12, 13, 14]]
sample: 0 y = [14, 11, 1, 12]
sample: 1 y = [10, 14, 2, 11]
sample: 2 y = [12, 2, 10, 14]
sample: 3 y = [12, 2, 14, 5]
sample: 4 y = [3, 8, 7, 10]
union
rdd = sc.parallelize([1, 1, 2, 3])
rdd1 = sc.parallelize([5, 3, 4, 6])
print(rdd.union(rdd1).collect())
[1, 1, 2, 3, 5, 3, 4, 6]
intersection
rdd = sc.parallelize([1, 1, 2, 3])
rdd1 = sc.parallelize([5, 3, 4, 6])
print(rdd.intersection(rdd1).collect())
[3]
sortByKey
tmp = [('a', 1), ('f', 2), ('d', 3), ('c', 4), ('b', 5)]
rdd = sc.parallelize(tmp, 2)
print(rdd.glom().collect())
sort1 = rdd.sortByKey(True,1).glom().collect()
sort2 = rdd.sortByKey(True,2).glom().collect()
print(sort1)
print(sort2)
[[('a', 1), ('f', 2)], [('d', 3), ('c', 4), ('b', 5)]]
[[('a', 1), ('b', 5), ('c', 4), ('d', 3), ('f', 2)]]
[[('a', 1), ('b', 5), ('c', 4)], [('d', 3), ('f', 2)]]
未完待续。。。
网友评论