Spark Chapter 4 RDD 常用算子

作者: 深海suke | 来源:发表于2019-08-12 21:47 被阅读0次

    # 1 RDD 常用操作

    *transformations*, which create a new dataset from an existing one

    RDDA ---transformations-->RDDB

    RDDB = RDDA.map(...)

    【面试重点】lazy——transformations 不会立刻计算内容,只会记录关系,只有遇到action才会进行计算。

    操作包含:map/filter/gtoup by/distinct

    *actions*, which return a value to the driver program after running a computation on the dataset.

    操作包含:count,reduce、collect/^

    【特点】

    * transformation are lazy,nothing actually happens until an action is called;

    * action triggers the computation;

    * action returns values to driver or writes data to external storage;

    # 2 Transformation算子

    ## 1map

    map(func):将func函数作用到每个数据集上面,生成一个新的分布式数据集返回

    ```

    def my_map2():

        data = ['cat','dog','tiger','lion']

        rdda = sc.parallelize(data)

        rddb = rdda.map(lambda x:(x,1))

        print(rddb.collection())

    ```

    ## 2filter

    选出所有func返回值为true的值的元素,生成一个新的分布式数据集返回

    ```

    def my_map():

        data = [1,2,3,4,5]

        rdd1 = sc.parallelize(data)

        rdd2 = rdd1.map(lambda x:x*2)

        rdd3 = rdd2.filter(lambda x:x>5)

        print(rdd3.collect())

    <==>

    # 链式编程

     print(sc.parallelize(data).map(lambda x:x*2).filter(lambda x:x>5).collect())

    ```

    ## 3flatMap

    输入的item能够被map到0or多个items输出,返回值是一个Sequence

    ```

    def my_flatMap():

      data = ['hello spark','hello world','hello world']

      rdd = sc.parallelize(data)

      print(rdd.flatMap(lambda line:line.split(" ")).collection())

    ```

     不是很懂这个区别

    ## 4groupByKey

    把相同的Key的数据分发到一起

    ```

    def my_groupBy():

      data = ['hello spark','hello world','hello world']

      rdd = sc.parallelize(data)

      rdd1 = rdd.flatMap(lambda line:line.split(" ")).map(lambda x:(x,1))

      rdd2 = rdd1.groupByKey()

      print(rdd2.collect())#直接输出会输出spark的参数,无法直接输出值

      print(rdd2.map(lambda x:{x[0]:list(x[1])}).collect()) #应该使用这种方式实现数据输出

    ```

    ## 5reduceByKey

    把相同的Key的数据分发到一起,并进行相应的计算

    ```

    def my_groupBy():

      data = ['hello spark','hello world','hello world']

      rdd = sc.parallelize(data)

      rdd1 = rdd.flatMap(lambda line:line.split(" ")).map(lambda x:(x,1))

      rdd2 = rdd1.reduceByKey(lambda a,b:a+b)

      print(rdd2.collect())

    ```

    【面试考点:基础版本word count】

    ## 6) sortByKey()

    默认升序列

    需求1 :把wc的结果按照从大到小排列

    ```

    def my_groupBy():

      data = ['hello spark','hello world','hello world']

      rdd = sc.parallelize(data)

      rdd1 = rdd.flatMap(lambda line:line.split(" ")).map(lambda x:(x,1))

      rdd2 = rdd1.reduceByKey(lambda a,b:a+b)

      rdd3 = rdd2.map(lambda x:(x[1],x[0])).sortByKey(False).map(lambda x:(x[1],x[0]))

      print(rdd3.collect())

    ```

    ## 7) Union

    ```

    def my_union():

      a = sc.parallize([1,2,3])

      b = sc.parallize([2,3,4])

      a.union(b).collect()

    ```

    ## 8) distinct

    ```

    def my_distinct():

      a = sc.parallize([1,2,3])

      b = sc.parallize([2,3,4])

      a.union(b).distinct().collect()

    ```

    ## 9) join

    inner join

    outer join:left/right/full

    ```

    def my_join():

      a = sc.parallize([("A","a1"),("C","c1"),("D","d1")])

      b = sc.parallize([("B","b1"),("C","c2"),("C","c3")])

      a.join(b).collect() #这个表示内连接

      #结果[("C",("c1","c2")),("C",("c1","c3"))]

      a.leftOuterJoin(b).collect() #左外链接

      #结果:[("A",("a1",None)),("C",("c1","c2")),("C",("c1","c3")),("D",("d1",None))]

      a.RightOuterJoin(b).collect() #右外链接

      #结果:[("B",(None,"b1")),("C",("c1","c2")),("C",("c1","c3"))]

      a.fullOuterJoin(b).collection() #全连接

      #结果:[("A",("a1",None)),("B",(None,"b1")),("C",("c1","c2")),("C",("c1","c3")),("D",("d1",None))]

    ```

    # 3 Action算子

    ## 1) 常用算子

    collect、count、take、reduce、saveAsTextFile、foreach

    ## 2) 示例

    ```

    def my_action():

      data = [1,2,3,4,5,6,7]

      rdd = sc.parallize(data)

      rdd.collect() #显示

      rdd.count() #计数

      rdd.take(3) #取前三个[1,2,3]

      rdd.max()/rdd.min()/rdd.sum() #集合计算

      rdd.reduce(lambda a,b:a+b)

      rdd.foreach(lambda x:print(x))

      rdd.saveAsTextFile() #文件存储

    ```

    # 4 Spark RDD实战

    ## 1)词评统计案例,多角度迭代

    1)input:1个文件,多个文件(文件夹),不同后缀名

    2)开发步骤分析:

    文本内容的每一行转成一个个的单词:flatMap

    单词==>(单词,1):map

    把所有单词相同的计数相加,得到最终的结果:reduceByKey

    ### 版本1 写到控制台

    ```

    from pyshark import SparkConf,SparkContext

    if __name__ = "__main__":

        if len(sys.srgv)!=2:

            print("Usage:wordcount",file = sys.stderr)

        conf = SparkConf()

        sc = SparkContext(conf = conf)

        def printResult():

            counts = sc.textFile(sys.argv[1])\

                .flatMap(lambda line:line.split("\t"))\

                .map(lambda x:(x,1))\

                .reduceByKey(lambda a,b:a+b)

            output = counts.collect()

            for(word,count) in output:

                print("%s:%i"%(word,count))

        printResult()

        sc.stop()

    ```

    tips: 在脚本的参数中添加文本地址的参数 传入相关脚本参数

    在命令行提交代码:

    ```

    ./spark -submit --master local[2] --name spark0402 /home/hadoop/script/spark0402.py file:///home/hadoop/data/hello.txt

    ```

    支持文件匹配模式

    ```

    1 file:///home/hadoop/data/hello.txt

    2 file:///home/hadoop/data/

    3 file:///home/hadoop/data/*.txt

    ```

    tips:

    复制文件

    cp ../hello.txt 1

    cp ../hello.txt 2

    cp ../hello.txt 1.txt

    ### 版本2 保存到文件

    ```

    if len(sys.srgv)!=3:

        print("Usage:wordcount ",file = sys.stderr)

        sys.exit(-1)

    def saveFile():

        sc.textFile(sys.argv[1]) \

            .flatMap(lambda line: line.split("\t")) \

            .map(lambda x: (x, 1)) \

            .reduceByKey(lambda a, b: a + b).\

            .saveAsTextFile(sys.argv[2])

    ```

    tips: 查看分区的内容

    ```

    more part-0000*

    ```

    ### 作业:降序排列

    ## 2Top N 案例实战

    1)input:1/n文件文件夹后缀名

    2)求某个维度的topn

    3)开发步骤分析

    文本内容的每一行根据需求提出你所需要的字段

    单词==>(单词,1):map

    把所有单词相同的计数相加,得到最终的结果:reduceByKey

    取最多出现次数的降序:sortByKey

    ```

    counts = sc.textFile(sys.argv[1])\

            .flatMap(lambda line:line.split("\t"))\

             .map(lambda x:(x[5],1))\

             .reduceByKey(lambda a,b:a+b)\

             .map(lambda x:(x[1],x[0]))\

             .sortByKey()\

             .map(lambda x:(x[1],x[0])).take(5)

    for(word,count) in output:

      print("%s:%i"%(word,count))         

    ```

    思考:如果统计数据会发生数据倾斜

    ## 3)平均数案例实战

    开发步骤分析:

    1)取出年龄map

    2)计算年龄总和reduce

    3)计算记录综述count

    4)求平均数

    ```

    ageData = sc.textFile(sys.argv[1])\

            .flatMap(lambda x:x.split("\t")[1])\

    totalAge = ageData.map(age:int(age)).reduce(a,b:a+b)

    counts = ageData.count()       

    avgAge = totalAge/counts

    print(ageAge) 

    ```

    相关文章

      网友评论

        本文标题:Spark Chapter 4 RDD 常用算子

        本文链接:https://www.haomeiwen.com/subject/kgwpjctx.html