pyspark

作者: Tim在路上 | 来源:发表于2018-09-17 20:33 被阅读4次

    pyspark version

    输出spark的版本

    print("pyspark version"+str(sc.version))
    map

    sc = spark context, parallelize creates an RDD from the passed object

    x = sc.parallelize([1,2,3])
    y = x.map(lambda x: (x,x**2))

    collect copies RDD elements to a list on the driver

    print(x.collect())
    print(y.collect())
    [1, 2, 3]
    [(1, 1), (2, 4), (3, 9)]
    map进行分片,collect进行合并,和parallelize负责并行创建数组
    但是rdd中的map只能生成一个指定的新的rdd.任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。
    这里指的是1->1,2->4,3->9
    flatMap
    x = sc.parallelize([1,2,3])
    y = x.flatMap(lambda x: (x, 100x, x*2))
    print(x.collect())
    print(y.collect())
    [1, 2, 3]
    [1, 100, 1, 2, 200, 4, 3, 300, 9]
    原RDD中的元素经flatmap处理后可生成多个元素来构建新RDD。
    这里指的是1->100,1 2->200,4 3->300,9
    mapPartitions
    mapPartitions是map的一个变种。map的输入函数是应用于RDD中每个元素,而mapPartitions的输入函数是应用于每个分区,也就是把每个分区中的内容作为整体来处理的。
    x = sc.parallelize([1,2,3], 2)#这句话一个参数是创建一个列表参数,第二个参数是创建时分区的个数

    mapPartitions

    x = sc.parallelize([1,2,3], 2)
    def f(iterator): yield sum(iterator) #创建一个求和的函数
    y = x.mapPartitions(f)#x调用mapPartitions传入函数f

    glom() flattens elements on the same partition

    print(x.glom().collect())
    print(y.glom().collect())
    [[1], [2, 3]]
    [[1], [5]]
    mapPartitionsWithIndex
    这个函数同上一个函数是一致的,只是加上了index标签
    x = sc.parallelize([1,2,3], 2)
    def f(partitionIndex, iterator): yield (partitionIndex,sum(iterator))
    y = x.mapPartitionsWithIndex(f)

    glom() flattens elements on the same partition

    print(x.glom().collect())
    print(y.glom().collect())
    [[1], [2, 3]]
    [[(0, 1)], [(1, 5)]]
    getNumPartitions
    输出分区的个数
    x = sc.parallelize([1,2,3], 2)
    y = x.getNumPartitions()
    print(x.glom().collect())
    print(y)
    [[1], [2, 3]]
    2
    filter

    filter

    x = sc.parallelize([1,2,3])
    y = x.filter(lambda x: x%2 == 1) # filters out even elements
    print(x.collect())
    print(y.collect())
    过滤器
    distinct

    distinct

    x = sc.parallelize(['A','A','B'])
    y = x.distinct()
    print(x.collect())
    print(y.collect())

    ['A', 'A', 'B']
    ['A', 'B']
    去重,distinct去重是保留了后面的元素,舍去了前面的元素
    sample

    sample

    x = sc.parallelize(range(7))

    call 'sample' 5 times

    ylist = [x.sample(withReplacement=False, fraction=0.5) for i in range(5)]
    print('x = ' + str(x.collect()))
    for cnt,y in zip(range(len(ylist)), ylist):#zip就是将两个集合合并到一起
    print('sample:' + str(cnt) + ' y = ' + str(y.collect()))

    x = [0, 1, 2, 3, 4, 5, 6]
    sample:0 y = [0, 2, 5, 6]
    sample:1 y = [2, 6]
    sample:2 y = [0, 4, 5, 6]
    sample:3 y = [0, 2, 6]
    sample:4 y = [0, 3, 4]
    抽样,每个数的取出存在一定的概率
    takeSample

    takeSample

    x = sc.parallelize(range(7))

    call 'sample' 5 times

    ylist = [x.takeSample(withReplacement=False, num=3) for i in range(5)]
    print('x = ' + str(x.collect()))
    for cnt,y in zip(range(len(ylist)), ylist):
    print('sample:' + str(cnt) + ' y = ' + str(y)) # no collect on y

    x = [0, 1, 2, 3, 4, 5, 6]
    sample:0 y = [0, 2, 6]
    sample:1 y = [6, 4, 2]
    sample:2 y = [2, 0, 4]
    sample:3 y = [5, 4, 1]
    sample:4 y = [3, 1, 4]
    从样例中随机取出三个数字
    union

    union

    x = sc.parallelize(['A','A','B'])
    y = sc.parallelize(['D','C','A'])
    z = x.union(y)
    print(x.collect())
    print(y.collect())
    print(z.collect())

    ['A', 'A', 'B']
    ['D', 'C', 'A']
    ['A', 'A', 'B', 'D', 'C', 'A']
    合并并不去重
    intersection

    intersection

    x = sc.parallelize(['A','A','B'])
    y = sc.parallelize(['A','C','D'])
    z = x.intersection(y)
    print(x.collect())
    print(y.collect())
    print(z.collect())
    ['A', 'A', 'B']
    ['A', 'C', 'D']
    ['A']
    交集
    sortByKey

    sortByKey

    x = sc.parallelize([('B',1),('A',2),('C',3)])
    y = x.sortByKey()
    print(x.collect())
    print(y.collect())
    [('B', 1), ('A', 2), ('C', 3)]
    [('A', 2), ('B', 1), ('C', 3)]
    排序
    glom

    glom

    x = sc.parallelize(['C','B','A'], 2)
    y = x.glom()
    print(x.collect())
    print(y.collect())

    ['C', 'B', 'A']
    [['C'], ['B', 'A']]
    分区合并
    cartesian

    cartesian

    x = sc.parallelize(['A','B'])
    y = sc.parallelize(['C','D'])
    z = x.cartesian(y)
    print(x.collect())
    print(y.collect())
    print(z.collect())

    ['A', 'B']
    ['C', 'D']
    [('A', 'C'), ('A', 'D'), ('B', 'C'), ('B', 'D')]
    类似与笛卡尔积进行组合
    groupBy

    groupBy

    x = sc.parallelize([1,2,3])
    y = x.groupBy(lambda x: 'A' if (x%2 == 1) else 'B' )
    print(x.collect())

    y is nested, this iterates through it

    print([(j[0],[i for i in j[1]]) for j in y.collect()])

    [1, 2, 3]
    [('A', [1, 3]), ('B', [2])]
    分组
    pipe

    pipe

    x = sc.parallelize(['A', 'Ba', 'C', 'AD'])
    y = x.pipe('grep -i "A"') # calls out to grep, may fail under Windows
    print(x.collect())
    print(y.collect())

    ['A', 'Ba', 'C', 'AD']
    ['A', 'Ba', 'AD']
    管道可以输入命令来进行再次操作
    foreach
    from future import print_function
    x = sc.parallelize([1,2,3])
    def f(el):
    '''side effect: append the current RDD elements to a file'''
    f1=open("./foreachExample.txt", 'a+')
    print(el,file=f1)

    first clear the file contents

    open('./foreachExample.txt', 'w').close()

    y = x.foreach(f) # writes into foreachExample.txt

    print(x.collect())
    print(y) # foreach returns 'None'

    print the contents of foreachExample.txt

    with open("./foreachExample.txt", "r") as foreachExample:
    print (foreachExample.read())

    [1, 2, 3]
    None
    3
    1
    2
    循环操作,但是操作的过程可能是并发的并不是按顺序
    foreachPartition

    foreachPartition

    from future import print_function
    x = sc.parallelize([1,2,3],5)
    def f(parition):
    '''side effect: append the current RDD partition contents to a file'''
    f1=open("./foreachPartitionExample.txt", 'a+')
    print([el for el in parition],file=f1)

    first clear the file contents

    open('./foreachPartitionExample.txt', 'w').close()

    y = x.foreachPartition(f) # writes into foreachExample.txt

    print(x.glom().collect())
    print(y) # foreach returns 'None'

    print the contents of foreachExample.txt

    with open("./foreachPartitionExample.txt", "r") as foreachExample:
    print (foreachExample.read())

    [[], [1], [], [2], [3]]
    None
    []
    []
    [1]
    [2]
    [3]
    按分布进行循环遍历
    reduce

    reduce

    x = sc.parallelize([1,2,3])
    y = x.reduce(lambda obj, accumulated: obj + accumulated) # computes a cumulative sum
    print(x.collect())
    print(y)

    [1, 2, 3]
    6
    合并
    fold

    fold

    x = sc.parallelize([1,2,3])
    neutral_zero_value = 0 # 0 for sum, 1 for multiplication
    y = x.fold(neutral_zero_value,lambda obj, accumulated: accumulated + obj) # computes cumulative sum
    print(x.collect())
    print(y)

    [1, 2, 3]
    6
    折叠
    aggregate

    aggregate

    x = sc.parallelize([2,3,4])
    neutral_zero_value = (0,1) # sum: x+0 = x, product: 1*x = x
    seqOp = (lambda aggregated, el: (aggregated[0] + el, aggregated[1] * el))
    combOp = (lambda aggregated, el: (aggregated[0] + el[0], aggregated[1] * el[1]))
    y = x.aggregate(neutral_zero_value,seqOp,combOp) # computes (cumulative sum, cumulative product)
    print(x.collect())
    print(y)
    [2, 3, 4]
    (9, 24)
    聚集
    histogram

    histogram (example #1)

    x = sc.parallelize([1,3,1,2,3])
    y = x.histogram(buckets = 2)
    print(x.collect())
    print(y)

    [1, 3, 1, 2, 3]
    ([1, 2, 3], [2, 3])

    histogram (example #2)

    x = sc.parallelize([1,3,1,2,3])
    y = x.histogram([0,0.5,1,1.5,2,2.5,3,3.5])
    print(x.collect())
    print(y)

    [1, 3, 1, 2, 3]
    ([0, 0.5, 1, 1.5, 2, 2.5, 3, 3.5], [0, 0, 2, 0, 1, 0, 2])
    输出的第一参数是桶的范围,第二个参数为每一个桶中数据的频数
    variance

    variance

    x = sc.parallelize([1,3,2])
    y = x.variance() # divides by N
    print(x.collect())
    print(y)
    [1, 3, 2]
    0.666666666667
    方差
    stdev

    stdev

    x = sc.parallelize([1,3,2])
    y = x.stdev() # divides by N
    print(x.collect())
    print(y)

    [1, 3, 2]
    0.816496580928
    标准差

    sampleStdev

    x = sc.parallelize([1,3,2])
    y = x.sampleStdev() # divides by N-1
    print(x.collect())
    print(y)
    [1, 3, 2]
    1.0
    抽样标准差除数为N-1
    countByValue

    countByValue

    x = sc.parallelize([1,3,1,2,3])
    y = x.countByValue()
    print(x.collect())
    print(y)

    [1, 3, 1, 2, 3]
    defaultdict(<type 'int'>, {1: 2, 2: 1, 3: 2})
    top

    top

    x = sc.parallelize([1,3,1,2,3])
    y = x.top(num = 3)
    print(x.collect())
    print(y)

    [1, 3, 1, 2, 3]
    [3, 3, 2]
    排序取前几个,从大到小
    takeOrdered

    takeOrdered

    x = sc.parallelize([1,3,1,2,3])
    y = x.takeOrdered(num = 3)
    print(x.collect())
    print(y)

    [1, 3, 1, 2, 3]
    [1, 1, 2]
    从小到大排序取值
    take

    take

    x = sc.parallelize([1,3,1,2,3])
    y = x.take(num = 3)
    print(x.collect())
    print(y)

    [1, 3, 1, 2, 3]
    [1, 3, 1]
    不排序直接取
    first

    first

    x = sc.parallelize([1,3,1,2,3])
    y = x.first()
    print(x.collect())
    print(y)

    [1, 3, 1, 2, 3]
    1
    取第一个
    collectAsMap

    collectAsMap

    x = sc.parallelize([('C',3),('A',1),('B',2)])
    y = x.collectAsMap()
    print(x.collect())
    print(y)

    [('C', 3), ('A', 1), ('B', 2)]
    {'A': 1, 'C': 3, 'B': 2}
    将列表转化为map
    keys

    keys

    x = sc.parallelize([('C',3),('A',1),('B',2)])
    y = x.keys()
    print(x.collect())
    print(y.collect())

    [('C', 3), ('A', 1), ('B', 2)]
    ['C', 'A', 'B']
    只取出key值
    values
    只取出value
    countByKey

    countByKey

    x = sc.parallelize([('B',1),('B',2),('A',3),('A',4),('A',5)])
    y = x.countByKey()
    print(x.collect())
    print(y)

    [('B', 1), ('B', 2), ('A', 3), ('A', 4), ('A', 5)]
    defaultdict(<type 'int'>, {'A': 3, 'B': 2})
    统计key的次数
    join

    join

    x = sc.parallelize([('C',4),('B',3),('A',2),('A',1)])
    y = sc.parallelize([('A',8),('B',7),('A',6),('D',5)])
    z = x.join(y)
    print(x.collect())
    print(y.collect())
    print(z.collect())

    [('C', 4), ('B', 3), ('A', 2), ('A', 1)]
    [('A', 8), ('B', 7), ('A', 6), ('D', 5)]
    [('A', (2, 8)), ('A', (2, 6)), ('A', (1, 8)), ('A', (1, 6)), ('B', (3, 7))]
    合并
    leftOuterJoin
    rightOuterJoin
    partitionBy

    partitionBy

    x = sc.parallelize([(0,1),(1,2),(2,3)],2)
    y = x.partitionBy(numPartitions = 3, partitionFunc = lambda x: x) # only key is passed to paritionFunc
    print(x.glom().collect())
    print(y.glom().collect())

    [[(0, 1)], [(1, 2), (2, 3)]]
    [[(0, 1)], [(1, 2)], [(2, 3)]]
    分区,每一个分区是一个列表
    combineByKey

    combineByKey

    x = sc.parallelize([('B',1),('B',2),('A',3),('A',4),('A',5)])
    createCombiner = (lambda el: [(el,el2)])
    mergeVal = (lambda aggregated, el: aggregated + [(el,el
    2)]) # append to aggregated
    mergeComb = (lambda agg1,agg2: agg1 + agg2 ) # append agg1 with agg2
    y = x.combineByKey(createCombiner,mergeVal,mergeComb)
    print(x.collect())
    print(y.collect())

    [('B', 1), ('B', 2), ('A', 3), ('A', 4), ('A', 5)]
    [('A', [(3, 9), (4, 16), (5, 25)]), ('B', [(1, 1), (2, 4)])]
    通过key进行数据合并
    aggregateByKey

    aggregateByKey

    x = sc.parallelize([('B',1),('B',2),('A',3),('A',4),('A',5)])
    zeroValue = [] # empty list is 'zero value' for append operation
    mergeVal = (lambda aggregated, el: aggregated + [(el,el**2)])
    mergeComb = (lambda agg1,agg2: agg1 + agg2 )
    y = x.aggregateByKey(zeroValue,mergeVal,mergeComb)
    print(x.collect())
    print(y.collect())

    [('B', 1), ('B', 2), ('A', 3), ('A', 4), ('A', 5)]
    [('A', [(3, 9), (4, 16), (5, 25)]), ('B', [(1, 1), (2, 4)])]
    foldByKey
    通过key进行聚集
    foldByKey

    foldByKey

    x = sc.parallelize([('B',1),('B',2),('A',3),('A',4),('A',5)])
    zeroValue = 1 # one is 'zero value' for multiplication
    y = x.foldByKey(zeroValue,lambda agg,x: agg*x ) # computes cumulative product within each key
    print(x.collect())
    print(y.collect())

    [('B', 1), ('B', 2), ('A', 3), ('A', 4), ('A', 5)]
    [('A', 60), ('B', 2)]
    按key值折叠
    groupByKey

    groupByKey

    x = sc.parallelize([('B',5),('B',4),('A',3),('A',2),('A',1)])
    y = x.groupByKey()
    print(x.collect())
    print([(j[0],[i for i in j[1]]) for j in y.collect()])

    [('B', 5), ('B', 4), ('A', 3), ('A', 2), ('A', 1)]
    [('A', [3, 2, 1]), ('B', [5, 4])]
    flatMapValues

    flatMapValues

    x = sc.parallelize([('A',(1,2,3)),('B',(4,5))])
    y = x.flatMapValues(lambda x: [i**2 for i in x]) # function is applied to entire value, then result is flattened
    print(x.collect())
    print(y.collect())

    [('A', (1, 2, 3)), ('B', (4, 5))]
    [('A', 1), ('A', 4), ('A', 9), ('B', 16), ('B', 25)]
    对map的值进行操作,并拆分为单维map
    mapValues
    仅仅对map值操作,其他不改变
    groupWith

    groupWith

    x = sc.parallelize([('C',4),('B',(3,3)),('A',2),('A',(1,1))])
    y = sc.parallelize([('B',(7,7)),('A',6),('D',(5,5))])
    z = sc.parallelize([('D',9),('B',(8,8))])
    a = x.groupWith(y,z)
    print(x.collect())
    print(y.collect())
    print(z.collect())
    print("Result:")
    for key,val in list(a.collect()):
    print(key, [list(i) for i in val])

    [('C', 4), ('B', (3, 3)), ('A', 2), ('A', (1, 1))]
    [('B', (7, 7)), ('A', 6), ('D', (5, 5))]
    [('D', 9), ('B', (8, 8))]
    Result:
    D [[], [(5, 5)], [9]]
    C [[4], [], []]
    B [[(3, 3)], [(7, 7)], [(8, 8)]]
    A [[2, (1, 1)], [6], []]

    相关文章

      网友评论

        本文标题:pyspark

        本文链接:https://www.haomeiwen.com/subject/bnydnftx.html