美文网首页我爱编程程序员大数据,机器学习,人工智能
Spark入门教程(五)创建弹性分布式数据集Rdd以及Trans

Spark入门教程(五)创建弹性分布式数据集Rdd以及Trans

作者: 胖滚猪学编程 | 来源:发表于2018-02-19 13:59 被阅读0次

    本文全部手写原创,请勿复制粘贴、转载请注明出处,谢谢配合!

    什么是弹性分布式数据集Rdd?


    • 概念:RDD(Resilient Distributed Datasets)简单来说,就是Spark中元素的集合,如数组、集合、文本等等都能称作RDD,但是和普通数据集相比,它有个特点就是:分布式。每个RDD都有多个分区,这些分区分布在集群不同的节点中。
    • 优势:其实只要理解了分布式的优势就不难理解Rdd的优势了,Spark会自动将Rdd的数据分发到集群上,将操作并行化执行。比如一个非常大的数据量(PB级),一台物理机处理肯定是不行的(除非是超级计算机但那非常昂贵),所以分布到多台机器上处理,不仅提高了处理速度还廉价许多。
      (如果还是不清楚没有关系,下面会演示几个案例,建议自己手动敲一敲代码感受一下就懂了)
    • Spark中对数据的所有操作不外乎:RDD create(创建) 、RDD transformation(转换)、RDD action(行动)

    Rdd的创建


    创建Rdd有两种方式:

    • (1)从外部存储系统(如共享文件系统,HDFS,HBase等)中引用数据集。使用textFile()方法

      //从本地文件系统读取数据集
      scala> val rdd =sc.textFile("file:///usr/local/spark-2.2.0-bin-hadoop2.6.0-cdh5.11.1/README.md")
      rdd: org.apache.spark.rdd.RDD[String] = file:///usr/local/spark-2.2.0-bin-hadoop2.6.0-cdh5.11.1/README.md MapPartitionsRDD[1] at textFile at <console>:24
       //从HDFS中读取数据集
      scala> val rdd1 =sc.textFile("hdfs://master:8020/user/test.rdd")
      rdd1: org.apache.spark.rdd.RDD[String] = hdfs://master:8020/user/test.rdd MapPartitionsRDD[5] at textFile at <console>:24
      
      //打印操作 注:collect()是将所有元素先收集到一台节点上,有可能会内存溢出,如果只是为了打印部分元素,建议使用take()
        scala> rdd1.collect().foreach(println)
        hello
        spark
        love
        you
      
        scala> rdd1.take(2)
        res5: Array[String] = Array(hello, spark)
      
    • (2)从驱动程序中的现有集合(如List,Array等),使用parallelize()方法

        //区间转Rdd
        scala>val rdd1 = sc.parallelize(1 to 5)
        rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:24
      
        scala>rdd1.collect().foreach(println)
        1
        2
        3
        4
        5
      
        //List集合转Rdd
        scala> val list = List(1,"a",'b')
        list: List[Any] = List(1, a, b)
      
        scala> sc.parallelize(list)
        res12: org.apache.spark.rdd.RDD[Any] = ParallelCollectionRDD[10] at parallelize at <console>:27
      
        scala> res12.collect().foreach(println)
        1
        a
        b
      
        //Array数组转Rdd
        scala> val array = Array("a","b")
        array: Array[String] = Array(a, b)
      
        scala> sc.parallelize(array)
        res8: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[9] at parallelize at <console>:27
      
        scala> res8.collect().foreach(println)
        a
        b
      

    Rdd的转换 transformation


    转换操作就是从现有的Rdd生成一个新的Rdd的操作,比如filter操作,它从现有的Rdd筛选出符合条件的数据,创建一个新的Rdd。
    还是那句话,概念不多说,看实际操作最为直观!

    单个Rdd操作

    • map(func) 对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD
    scala> val rdd = sc.parallelize(1 to 5)
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[11] at parallelize at <console>:24
    
    scala> rdd.map(x=>(x+2))
    res14: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[12] at map at <console>:27
    
    scala> res14.collect()
    res15: Array[Int] = Array(3, 4, 5, 6, 7)
    
    scala> rdd.map(x=>(x,1))
    res18: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[14] at map at <console>:27
    
    scala> res18.collect()
    res19: Array[(Int, Int)] = Array((1,1), (2,1), (3,1), (4,1), (5,1))
    
    • flatMap(func) 和map类似,但是它会把map“扁平化”,每个输入项可以映射到0个或更多个输出项。常用于统计单词数。举例对比一下和map的区别更容易理解:
    scala> val rdd = sc.textFile("hdfs://master/user/spark.hello")
    rdd: org.apache.spark.rdd.RDD[String] = hdfs://master/user/spark.hello MapPartitionsRDD[16] at textFile at <console>:24
    
    scala> rdd.map(line => line.split(" "))
    res20: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[17] at map at <console>:27
    
    scala> res20.collect()
    res22: Array[Array[String]] = Array(Array(hello, spark, it, is, perfect), Array(i, want, to, learn))
    
    scala> rdd.flatMap(line => line.split(" "))
    res0: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at flatMap at <console>:27
    
    scala> res0.collect()
    res1: Array[String] = Array(hello, spark, it, is, perfect, i, want, to, learn)
    

    map输出是Array[Array[String]]、flatMap输出是Array[String],等于将map再次打平。

    • reduceByKey(func,[numTask]) 根据函数定义的规则,按照key进行整合,类似MapReduce中的Reduce操作。比如,在map阶段(spark,1) (spark,1),定义方法(x,y)=>(x+y),即value进行加法运算,1+1 所以使用reduceByKey后变为(spark,2)
    [root@master hadoop-2.6.0-cdh5.11.1]# hadoop fs -cat /user/wordcount.test
    spark i love you
    spark i want learn you
    

    wordcount案例:

    scala> val rdd = sc.textFile("hdfs://master/user/wordcount.test")
    rdd: org.apache.spark.rdd.RDD[String] = hdfs://master/user/wordcount.test MapPartitionsRDD[4] at textFile at <console>:24
    
    scala> val wordmap = rdd.flatMap(line=>line.split(" ")).map(x=>(x,1))
    wordmap: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[6] at map at <console>:26
    
    scala> wordmap.collect()
    res2: Array[(String, Int)] = Array((spark,1), (i,1), (love,1), (you,1), (spark,1), (i,1), (want,1), (learn,1), (you,1))
    
    scala> val wordreduce = wordmap.reduceByKey((x,y)=>(x+y))
    wordreduce: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[7] at reduceByKey at <console>:28
    
    scala> wordreduce.collect()
    res3: Array[(String, Int)] = Array((learn,1), (spark,2), (you,2), (love,1), (i,2), (want,1))
    
    • filter(func) 很好理解,就是过滤的意思
    scala> val rdd = sc.parallelize(Array(1,2,3))
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:24
    
    scala> rdd.filter( x => (x==2)).collect()
    res8: Array[Int] = Array(2)
    
    scala> val rdd = sc.parallelize(List("sp","sa","vv"))
    rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[11] at parallelize at <console>:24
    
    scala> rdd.filter(x=>x.contains('s')).collect()
    res10: Array[String] = Array(sp, sa)
    
    • groupByKey([numTasks]) 学过SQL的应该都知道group by,这里一样的意思,就是按照key来进行分组。numTasks是可选的,用来设置任务数量。
    scala> val rdd = sc.parallelize(List((1,"sp"),(1,"sa"),(2,"vv")))
    rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[14] at parallelize at <console>:24
    
    scala> rdd.groupByKey().collect()
    res12: Array[(Int, Iterable[String])] = Array((1,CompactBuffer(sp, sa)), (2,CompactBuffer(vv)))
    
    • distinct([numTasks]) 去除重复元素,但注意:开销大,需要将所有数据通过网络传输进行Shuffle混洗
    scala> val rdd = sc.parallelize(List("sp","sp","vv"))
    rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[17] at parallelize at <console>:24
    
    scala> rdd.distinct().collect()
    res13: Array[String] = Array(sp, vv)
    
    • sortByKey([ascending], [numTasks]) 排序,ascending表示递增的,默认ascending为true 即按递增排序 可以改为false,也可以自定义排序函数。
    scala> val rdd = sc.parallelize(List((1,"sp"),(3,"sa"),(2,"vv")))
    rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[21] at parallelize at <console>:24
    
    scala> rdd.sortByKey(false).collect()
    res14: Array[(Int, String)] = Array((3,sa), (2,vv), (1,sp))
    
    • 还有很多很多函数,大家可以参考:SparkAPI(请依据自己的版本号,这个是2.2.0的)
      官方其实说的很详细,但是很多同学可能没有scala基础就会有点看不懂,举个例子解释一下,其他大同小异。比如:
      API截图
      zeroValue是一个初始的给定值,V是范型的意思,可以是Int型等等,但是这个类型必须和RDD中的V一样。
      比如下例:zeroValue为3,是int型,rdd中的List(K,V) ,V也是int型,然后再根据func定义的规则对这两个V进行操作。最后返回RDD[(K,V)] 。
      3(初始值)+1+2=6 即(1,6)。 3+5+6=14 即(2,14) 建议:((x,y) => x+y)可以简写为(+) scala语言真的非常优雅!
    scala> val rdd = sc.parallelize(List((1,1),(1,2),(2,6),(2,5)))
    rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[23] at parallelize at <console>:24
    
    scala> rdd.foldByKey(3)(_+_).collect()
    res15: Array[(Int, Int)] = Array((1,6), (2,14))
    

    多个Rdd操作

    • 交集差集并集
    scala> val list1 = sc.parallelize(List("spark","spark","hello"))
    list1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[25] at parallelize at <console>:24
    
    scala> val list2 = sc.parallelize(List("spark","love","you"))
    list2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[26] at parallelize at <console>:24
    
    //并集不去重
    scala> list1.union(list2).collect()
    res16: Array[String] = Array(spark, spark, hello, spark, love, you)
    
    //交集去重
    scala> list1.intersection(list2).collect()
    res17: Array[String] = Array(spark)
    
    //差集(只存在于第一个dataset、不存在于第二个)不去重
    scala> list1.subtract(list2).collect()
    res18: Array[String] = Array(hello)
    
    • Join
    scala> val list1 = sc.parallelize(List((1,"spark"),(2,"spark"),(3,"hello")))
    list1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[38] at parallelize at <console>:24
    
    scala> val list2 = sc.parallelize(List((1,"spark"),(3,"you"),(4,"good")))
    list2: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[39] at parallelize at <console>:24
    //内连接
    scala> list1.join(list2).collect()
    res20: Array[(Int, (String, String))] = Array((1,(spark,spark)), (3,(hello,you)))
        
    // 左外连接,左边rdd全部显示,右边没有的补null
    scala> list1.leftOuterJoin(list2).collect()
    res21: Array[(Int, (String, Option[String]))] = Array((1,(spark,Some(spark))), (3,(hello,Some(you))), (2,(spark,None)))
    
    // 右外连接,右边rdd全部显示,左边没有的补null    
    scala> list1.rightOuterJoin(list2).collect()
    res22: Array[(Int, (Option[String], String))] = Array((4,(None,good)), (1,(Some(spark),spark)), (3,(Some(hello),you)))
    

    最后再强调一句,函数太多了,此文只列举了常见的,最好的办法就是直接看官方API !!
    Spark官方文档
    SparkAPI

    相关文章

      网友评论

        本文标题:Spark入门教程(五)创建弹性分布式数据集Rdd以及Trans

        本文链接:https://www.haomeiwen.com/subject/eimitftx.html