美文网首页
Python 操作Spark —— 基本使用

Python 操作Spark —— 基本使用

作者: 枫隐_5f5f | 来源:发表于2019-04-11 17:30 被阅读0次

    pySpark API文档

    输入文件内容

    zhangsan,77,88,99
    lisi,56,78,89
    wanger,78,77,67
    

    1.map

    一对一处理函数, 应用于RDD的每个元素,并返回一个RDD

    import sys
    from pyspark import SparkConf,SparkContext
    
    conf = SparkConf().setMaster("local").setAppName("api_test")
    sc = SparkContext(conf=conf)
    
    lines = sc.textFile("/test_file")
    
    def sp(line):
            ss = line.strip().split(",")
            name =  str(ss[0])
            eng = str(ss[1])
            math = str(ss[2])
            lang = str(ss[3])
            return name,eng,math,lang
    
    rdd = lines.map(sp).collect()
    #print (rdd)
    [('zhangsan', '77', '88', '99'), ('lisi', '56', '78', '89'), ('wanger', '78', '77', '67')]
    
    for line in rdd:
            print (line)
    
    ('zhangsan', '77', '88', '99')
    ('lisi', '56', '78', '89')
    ('wanger', '78', '77', '67')
    

    2. flatMap

    一对多函数, 将数据按照定义的规则拆分成多条记录 并返回一个新的RDD

    import sys
    from pyspark import SparkConf,SparkContext
    
    conf = SparkConf().setMaster("local").setAppName("api_test")
    sc = SparkContext(conf=conf)
    
    def flatMap_func(x):
            return x.strip().split(",")
    
    lines = sc.textFile("/test_file")
    rdd = lines.flatMap(flatMap_func).collect()
    print (rdd)
    #把一个文件里的内容按行读取  并按照“,”分割成单词
    [u'zhangsan', u'77', u'88', u'99', u'lisi', u'56', u'78', u'89', u'wanger', u'78', u'77', u'67']
    

    3.filter

    过滤掉不符合条件的元素,返回一个新的RDD 函数需传入两个参数

    import sys
    from pyspark import SparkConf,SparkContext
    
    conf = SparkConf().setMaster("local").setAppName("api_test")
    sc = SparkContext(conf=conf)
    
    
    def sp(line):
            ss = line.strip().split(",")
            name =  str(ss[0])
            eng = int(ss[1])
            math = int(ss[2])
            lang = int(ss[3])
            return name,eng,math,lang
    
    
    def flatMap_func(x):
            return x.strip().split(",")
    
    def filter_func(x):
            if x[3] > 80:
                    return x
    
    lines = sc.textFile("/test_file")
    rdd = lines.map(sp) \
            .filter(filter_func)\
            .collect()
    
    print (rdd)
    
    #过滤掉了lang小于80的记录
    [('zhangsan', 77, 88, 99), ('lisi', 56, 78, 89)]
    

    4.reduce

    汇总RDD的所有元素

    import sys
    from pyspark import SparkConf,SparkContext
    
    conf = SparkConf().setMaster("local").setAppName("api_test")
    sc = SparkContext(conf=conf)
    
    li = [1,2,3,4,5]
    vec = sc.parallelize(li)
    
    rdd = vec.reduce(lambda x,y:int(x)+int(y))
    print (rdd)
    #15
    
    li = ["asd","asd","word"]
    vec = sc.parallelize(li)
    
    rdd = vec.reduce(lambda x,y:x+y)
    print (rdd)
    #asdasdword
    

    5. countByValue

    统计RDD中每个元素出现的次数

    import sys
    from pyspark import SparkConf,SparkContext
    
    conf = SparkConf().setMaster("local").setAppName("api_test")
    sc = SparkContext(conf=conf)
    
    li = ["asd","ad","asd","hello","word","word"]
    vec = sc.parrallelize(li)
    
    re = vec.countByValue()
    print (re)
    #defaultdict(<type 'int'>, {'asd': 2, 'word': 2, 'hello': 1, 'ad': 1})
    

    6.reduceByKey

    按key聚合 可自定义函数

    import sys
    from pyspark import SparkConf,SparkContext
    
    conf = SparkConf().setMaster("local").setAppName("api_test")
    sc = SparkContext(conf=conf)
    
    
    def sp(line):
            ss = line.strip().split(",")
            name =  str(ss[0])
            eng = int(ss[1])
            math = int(ss[2])
            lang = int(ss[3])
            return name,eng,math,lang
    
    
    def flatMap_func(x):
            return x.strip().split(",")
    
    def filter_func(x):
            if x[3] > 80:
                    return x
    
    lines = sc.textFile("/test_file")
    rdd = lines.flatMap(flatMap_func) \
            .map(lambda x:(x,1)) \
            .reduceByKey(lambda x,y:x+y) \
            .collect()
    
    print (rdd)
    
    #[(u'77', 2), (u'wanger', 1), (u'56', 1), (u'99', 1), (u'lisi', 1), (u'88', 1), (u'89', 1), (u'67', 1), (u'zhangsan', 1), (u'78', 2)]
    

    7.sortBy 排序
    from pyspark import SparkConf,SparkContext
    import sys
    
    reload(sys)
    sys.setdefaultencoding("utf8")
    
    
    conf = SparkConf().setAppName("Name").setMaster("local")
    sc = SparkContext(conf=conf)
    
    infile_path = "file:///home/njliu/prc/pyspark/RDD/The_Man_of_Property.txt"
    infile = sc.textFile(infile_path)
    re = infile.flatMap(lambda l:l.strip().split(" ")) \
            .map(lambda x:(x,1)) \
            .reduceByKey(lambda x,y:x +y) \
            .sortBy(lambda x:x[1],False) \
    
    for line in re.collect():
            print ("\t".join([line[0],str(line[1])]))
    
    

    7.groupByKey
    from pyspark import SparkConf,SparkContext
    
    def sub_process(k,v):
            tmp_list = []
            for tu in v:
                    tmp_list.append(tu)
            res_list = sorted(tmp_list,key=lambda x:x[1],reverse=True)
            res_list.insert(0,k)
            return res_list
    
    if __name__ == "__main__":
            conf = SparkConf().setAppName("A Name").setMaster("local")
            sc = SparkContext(conf=conf)
    
            infiles = sc.textFile("file:///home/njliu/prc/pyspark/RDD/uis.data")
            res = infiles.map(lambda line:line.strip().split("\t")) \
                    .filter(lambda x:x[2]>1.5) \
                    .map(lambda x:(x[0],(str(x[1]),str(x[2])))) \
                    .groupByKey() \
                    .map(lambda (k,v):sub_process(k,v))\
    
            for line in res.collect():
                    print (line)
    
    

    相关文章

      网友评论

          本文标题:Python 操作Spark —— 基本使用

          本文链接:https://www.haomeiwen.com/subject/wzztwqtx.html