美文网首页
20 spark弹性分布式数据集

20 spark弹性分布式数据集

作者: 6cc89d7ec09f | 来源:发表于2018-03-10 14:40 被阅读7次

    RDD

    参考 http://blog.csdn.net/qq_16103331/article/details/53443890
    五大特性

    image.png
    两个动作
    对于RDD,有两种类型的动作,一种是Transformation,一种是Action。它们本质区别是:
    Transformation返回值还是一个RDD。它使用了链式调用的设计模式,对一个RDD进行计算后,变换成另外一个RDD,然后这个RDD又可以进行另外一次转换。这个过程是分布式的 
    Action返回值不是一个RDD。它要么是一个Scala的普通集合,要么是一个值,要么是空,最终或返回到Driver程序,或把RDD写入到文件系统中
    
    所以我可以根据算子的返回类型来判断这个算子是Transformation还是action
    
    Transformations转换操作,返回值还是一个 RDD,如 map、 filter、 union; 
    Actions行动操作,返回结果或把RDD持久化起来,如 count、 collect、 save。
    

    lazy懒加载

    RDD是一个懒执行,直到action阶段才会真正执行
    

    三大操作方式


    image.png

    TransFormation函数


    image.png
    action函数
    image.png

    练习:

     val rdd1 = sc.textFile("file:///opt/datas/stu.txt")
    val rdd2 = sc.textFile("file:///opt/datas/stu2.txt")
     val allRDD = rdd1.union(rdd2)  //融合
    allRDD.cache  //缓存到内存
    val lines = allRDD.flatMap(x => x.split(" ")).map(x => (x,1)).reduceByKey((a,b) => (a+b)).filter(x => (x._2 > 1)).map(x => (x._2,x._1)).sortByKey(false).map(x => (x._2,x._1)).saveAsTextFile("hdfs://bigdata-pro01.kfk.com:8020/user/kfk/wordcount")
    

    DataFrame

    操作大全 :http://blog.csdn.net/dabokele/article/details/52802150

    image.png
    测试:
    dataFrame show来展示
    toDF rdd转dataframe
    val rdd1 = sc.textFile("file:///opt/datas/stu.txt")
     val rdd2 = sc.textFile("file:///opt/datas/stu2.txt")
    allRDD = rdd1.union(rdd2)
    lines = allRDD.flatMap(x => s.split(" ")).map(x => (x,1)).reduceByKey((a,b) => (a+b)).filter(x => (x._2 > 1)).map(x => (x._2,x._1)).sortByKey(false).map(x => (x._2,x._1)).toDF //rdd转dataFrame
    lines.printSchema //打印元数据
    lines.select("_1","_2").show //select操作
    
    ##dataset转dataFrame,指定列名
    val dataFrame = spark.read.textFile("file:///opt/datas/stu.txt").flatMap(x => x.split(" ")).map(x => (x,1)).toDF("name","count")
    dataFrame.select("name","count").groupBy("name").count.show
    

    dataSet

    image.png

    创建方法

    spark.read.textFile
    

    练习

    //定义例类
    case class Person(username:String,count:Int) 
    //创建dataset
    val dataSet = spark.read.textFile("file:///opt/datas/stu.txt")  
    //dataset转换成类
    val res0 = dataSet.flatMap(x => x.split(" ")).map(x => (x,1)).map(x => (Person(x._1,x._2)))
    // 创建成表
    res0.createOrReplaceTempView("person")
    //分组计数并排序
    spark.sql("select username,count(1) from person group by username order by count(1) desc").show
    

    dataFrame转dataSet


    image.png
    //创建rdd
    val rdd = sc.textFile("file:///opt/datas/stu.txt")
    //转成dataframe
    val df = rdd.flatMap(x => x.split(" ")).map(x => (x,1)).reduceByKey((a,b) =>(a+b)).toDF("username","count")
    //转成dataset,前提有case class Person,且字段名一样
    val ds = df.as[Person]
    //降序排列
    ds.orderBy($"count".desc).show
    

    三大数据集之间的转成和对比

    image.png
    image.png
    image.png
    image.png
    image.png

    RDD、DataFrame、Dataset三者有许多共性,有各自适用的场景常常需要在三者之间转换

    DataFrame/Dataset转RDD:
    这个转换很简单

    val rdd1=testDF.rdd
    val rdd2=testDS.rdd
    

    RDD转DataFrame:

    import spark.implicits._
    val testDF = rdd.map {line=>
          (line._1,line._2)
        }.toDF("col1","col2")
    

    一般用元组把一行的数据写在一起,然后在toDF中指定字段名

    RDD转Dataset:

    import spark.implicits._
    case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型
    val testDS = rdd.map {line=>
          Coltest(line._1,line._2)
        }.toDS
    

    可以注意到,定义每一行的类型(case class)时,已经给出了字段名和类型,后面只要往case class里面添加值即可

    Dataset转DataFrame:
    这个也很简单,因为只是把case class封装成Row

    import spark.implicits._
    val testDF = testDS.toDF
    

    DataFrame转Dataset:

    import spark.implicits._
    case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型
    val testDS = testDF.as[Coltest]
    

    这种方法就是在给出每一列的类型后,使用as方法,转成Dataset,这在数据类型是DataFrame又需要针对各个字段处理时极为方便

    特别注意:
    在使用一些特殊的操作时,一定要加上 import spark.implicits._ 不然toDF、toDS无法使用

    相关文章

      网友评论

          本文标题:20 spark弹性分布式数据集

          本文链接:https://www.haomeiwen.com/subject/xjhmfftx.html