美文网首页Spark 应用
将代码从 spark 1.x 移植到 spark 2.x

将代码从 spark 1.x 移植到 spark 2.x

作者: breeze_lsw | 来源:发表于2016-09-09 23:11 被阅读820次

    1. SparkSession

    sparkSession可以视为sqlContexthiveContext以及StreamingContext的结合体,这些Context的API都可以通过sparkSession使用。

    创建SparkSession

    val spark = SparkSession.builder
        .master("local[2]")
        .appName("spark session example")
        .getOrCreate()
    

    使用enableHiveSupport就能够支持hive,相当于hiveContext

    val spark = SparkSession.builder
        .master("local[2]")
        .appName("spark session example")
        .enableHiveSupport()
        .getOrCreate()
    

    API操作,与之前的Context基本一致

    //读取csv数据
    val df0 = spark.read
      .option("header","true")
      .csv("src/main/resources/test.csv")
    
    //读取parquet数据
    val df1 = spark.read.parquet("...")
    
    //读取json数据
    val df2 = spark.read.json("...")
    
    //sql查询
    val df3 = spark.sql("xxx")
    
    

    Spark 2.0向后兼容,所以hiveContext以及sqlContext依旧可用,不过官方建议开发者开始使用SparkSession

    2. DataSet,RDD,DataFrame

    • RDD

      类型安全,面向对象编程风格,序列化以及反序列化开销大。

    • DataFrame

      提供了查询优化器,分区剪枝,自动判断是否使用broadcast join等功能,对rdd进行了大量的优化。对spark了解不深的编程/分析人员非常友好。

      可以视为Row类型的Dataset (Dataset[Row]),非类型安全,不是面向对象编程风格。

    • DataSet

      继承了RDD和DataFrame的优点。数据以编码的二进制形式存储,将对象的schema映射为SparkSQL类型,不需要反序列化就可以进行shuffle等操作,每条记录存储的则是强类型值,类型安全,面向对象编程风格。

    Dataset的创建

    dataset可以从rdd,dataFrame转化,也可以从原始数据直接生成。

    通过toDS方法创建

    val ds1 = Seq("a","b").toDS()
    ds1.show
    
    //+-----+
    //|value|
    //+-----+
    //|    a|
    //|    b|
    //+-----+
    

    通过createDataSet创建

    case class Person(name: String, age: Int)
    val data = Seq(Person("lsw", 23), Person("yyy", 22))
    val ds2 = spark.createDataset(data)
    ds2.show
    
    //+----+---+
    //|name|age|
    //+----+---+
    //| lsw| 23|
    //| yyy| 22|
    //+----+---+
    

    DataSet与RDD使用上的区别

    Dataset 结合了 rdd 和 DataFrame 上大多数的API,所以spark 1.x基于 rdd 或 DataFrame 的代码可以很容易的改写成spark 2.x版本

    1. 数据读取

      RDDs

      sparkContext.textFile("/path/to/data.txt")
      

      Datasets

      //返回 DataFrame
      val df = spark.read.text("/path/to/data.txt")
      //返回 DataSet[String]
      val ds1 = spark.read.textFile("/path/to/data.txt")
      //或者读取成DataFram再转化成Dataset
      val ds2 = spark.read.text("/path/to/data.txt").as[String]
      
    2. 常用API

      RDDs

      //flatMap,filter
      val lines = sc.textFile("/path/to/data.txt")
      val res = lines
        .flatMap(_.split(" "))
        .filter(_ != "")
      
      //reduce
      val rdd = sc.makeRDD(Seq(1, 2, 3, 4))
      rdd.reduce((a, b) => a + b)
      

      Datasets

      //flatMap,filter
      val lines = spark.read.textFile("/path/to/data.txt")
      val res = lines
        .flatMap(_.split(" "))
        .filter(_ != "")
      
      //reduce
      val ds = Seq(1, 2, 3, 4).toDs
      ds.reduce((a, b) => a + b)
      
    3. reduceByKey

      RDDs

      val reduceCountByRDD = wordsPair
        .reduceByKey(_+_)
      

      Datasets

      val reduceCountByDs = wordsPairDs
        .mapGroups((key,values) =>(key,values.length))
      
    4. RDD,DataFrame,Dataset的相互转化

      import spark.implicits._
      //Dataset转化为RDD
      val ds2rdd = ds.rdd
      //Dataset转为DataFrame
      val ds2df = ds.toDF
      
      //RDD转化为Dataset
      val rdd2ds = rdd.toDS
      //RDD转化为DataFrame
      val rdd2df = rdd.toDF
      
      //DataFrame转化为RDD
      val df2rdd = df.rdd
      //DataFrame转化为DataSet
      val df2ds = df.as[Type]
      
      
    5. wordCount

      data.txt

      hello world
      hello spark
      

      RDDs

      val rdd = sc.textFile("src/main/resources/data.txt")
      val wordCount = rdd
        .map(word => (word,1))
        .reduceByKey(_+_)
      

      Datasets

      import spark.implicits._
      val wordCount1 = lines
        .flatMap(r => r.split(" "))
        .groupByKey(r => r)
        .mapGroups((k, v) => (k, v.length))
      wordCount1.show
      //  +-----+--------+
      //  |value|count(1)| 
      //  +-----+--------+
      //  |hello|       2|
      //  |spark|       1|
      //  |world|       1|
      //  +-----+--------+
        
      //也可以直接使用count函数
      val wordCount2 = lines
        .flatMap(r => r.split(" "))
        .groupByKey(v => v)
        .count()
      wordCount2.show
      //  +-----+---+
      //  |   _1| _2|
      //  +-----+---+
      //  |hello|  2|
      //  |spark|  1|
      //  |world|  1|
      //  +-----+---+
      

    Dataset性能提升(来自官方)

    这里写图片描述
    这里写图片描述
    这里写图片描述

    3.Catalog

    Spark 2.0中添加了标准的API(称为catalog)来访问Spark SQL中的元数据。这个API既可以操作Spark SQL,也可以操作Hive元数据。

    获取catalog

    从SparkSession中获取catalog

    val catalog = spark.catalog
    

    查询临时表和元数据中的表

    返回Dataset[Table]

    catalog.listTable.show
    //  +----+--------+-----------+---------+-----------+
    //  |name|database|description|tableType|isTemporary|
    //  +----+--------+-----------+---------+-----------+
    //  |table|   null|      null|TEMPORARY|        true|
    //  |act | default|      null| EXTERNAL|       false|
    //  +----+--------+-----------+---------+-----------+
    

    创建临时表

    使用createTempViewcreateOrReplaceTempView取代registerTempTable

    例如

    df.createTempView("table")
    df.createOrReplaceTempView("table")
    
    • createTempView

      创建临时表,如果已存在同名表则报错。

    • createOrReplaceTempView

      创建临时表,如果存在则进行替换,与老版本的registerTempTable功能相同。

    销毁临时表

    使用dropTempView取代dropTempTable,销毁临时表的同事会清除缓存的数据。

    spark.dropTempView("table")
    

    缓存表

    对数据进行缓存

    //缓存表有两种方式
    df.cache
    catalog.cacheTable("table")
    
    //判断数据是否缓存
    catalog.isCached("table")
    

    catalog相较于之前的API,对metadata的操作更加的简单,直观。

    参考

    http://blog.madhukaraphatak.com/categories/spark-two/
    http://www.jianshu.com/p/c0181667daa0
    https://databricks.com/blog/2016/01/04/introducing-apache-spark-datasets.html

    相关文章

      网友评论

      本文标题:将代码从 spark 1.x 移植到 spark 2.x

      本文链接:https://www.haomeiwen.com/subject/rvhyettx.html