美文网首页Spark程序员
Spark-SQL之DataFrame基本操作

Spark-SQL之DataFrame基本操作

作者: 文哥的学习日记 | 来源:发表于2017-08-16 20:05 被阅读190次

    这篇文章将带大家一起学习Spark中DataFrame的基本操作。

    1、创建DataFrame

    本文所使用的DataFrame是通过读取mysql数据库获得的,代码如下:

    val spark = SparkSession
          .builder()
          .appName("Spark SQL basic example")
          .enableHiveSupport()
          //.config("spark.some.config.option", "some-value")
          .getOrCreate()
        import spark.implicits._
        val url = "jdbc:mysql://localhost:3306/test"
        val df = spark.read
          .format("jdbc")
          .option("url", url)
          .option("dbtable", "pivot")
          .option("user", "root")
          .option("password", "admin")
          .load()
    

    2、DataFrame基本动作运算

    2.1 show展示数据

    可以用show() 方法来展示数据,show有以下几种不同的使用方式:
    show():显示所有数据
    show(n) :显示前n条数据
    show(true): 最多显示20个字符,默认为true
    show(false): 去除最多显示20个字符的限制
    show(n, true):显示前n条并最多显示20个自负

    代码为:

    df.show()
    df.show(3)
    df.show(true)
    df.show(false)
    df.show(3,true)
    

    上面的输出为:

    +---+----+----+--------------------+
    | id|user|type|           visittime|
    +---+----+----+--------------------+
    |  1|   1| 助手1|2017-08-10 13:44:...|
    |  2|   1|APP1|2017-08-04 13:44:...|
    |  3|   2| 助手1|2017-08-05 13:44:...|
    |  4|   2| 助手1|2017-08-07 13:44:...|
    |  5|   3|APP1|2017-08-02 13:44:...|
    |  6|   3|APP1|2017-08-01 13:44:...|
    |  7|   3| 助手2|2017-08-14 13:44:...|
    |  8|   3|APP2|2017-08-03 13:44:...|
    |  9|   2|APP2|2017-08-11 13:44:...|
    | 10|   2| 助手1|2017-07-14 13:44:...|
    | 11|   1|APP1|2017-07-15 13:45:...|
    | 12|   1| 助手2|2017-07-07 13:45:...|
    +---+----+----+--------------------+
    
    +---+----+----+--------------------+
    | id|user|type|           visittime|
    +---+----+----+--------------------+
    |  1|   1| 助手1|2017-08-10 13:44:...|
    |  2|   1|APP1|2017-08-04 13:44:...|
    |  3|   2| 助手1|2017-08-05 13:44:...|
    +---+----+----+--------------------+
    only showing top 3 rows
    
    +---+----+----+--------------------+
    | id|user|type|           visittime|
    +---+----+----+--------------------+
    |  1|   1| 助手1|2017-08-10 13:44:...|
    |  2|   1|APP1|2017-08-04 13:44:...|
    |  3|   2| 助手1|2017-08-05 13:44:...|
    |  4|   2| 助手1|2017-08-07 13:44:...|
    |  5|   3|APP1|2017-08-02 13:44:...|
    |  6|   3|APP1|2017-08-01 13:44:...|
    |  7|   3| 助手2|2017-08-14 13:44:...|
    |  8|   3|APP2|2017-08-03 13:44:...|
    |  9|   2|APP2|2017-08-11 13:44:...|
    | 10|   2| 助手1|2017-07-14 13:44:...|
    | 11|   1|APP1|2017-07-15 13:45:...|
    | 12|   1| 助手2|2017-07-07 13:45:...|
    +---+----+----+--------------------+
    
    +---+----+----+---------------------+
    |id |user|type|visittime            |
    +---+----+----+---------------------+
    |1  |1   |助手1 |2017-08-10 13:44:19.0|
    |2  |1   |APP1|2017-08-04 13:44:26.0|
    |3  |2   |助手1 |2017-08-05 13:44:29.0|
    |4  |2   |助手1 |2017-08-07 13:44:32.0|
    |5  |3   |APP1|2017-08-02 13:44:38.0|
    |6  |3   |APP1|2017-08-01 13:44:41.0|
    |7  |3   |助手2 |2017-08-14 13:44:48.0|
    |8  |3   |APP2|2017-08-03 13:44:45.0|
    |9  |2   |APP2|2017-08-11 13:44:53.0|
    |10 |2   |助手1 |2017-07-14 13:44:57.0|
    |11 |1   |APP1|2017-07-15 13:45:03.0|
    |12 |1   |助手2 |2017-07-07 13:45:08.0|
    +---+----+----+---------------------+
    
    +---+----+----+--------------------+
    | id|user|type|           visittime|
    +---+----+----+--------------------+
    |  1|   1| 助手1|2017-08-10 13:44:...|
    |  2|   1|APP1|2017-08-04 13:44:...|
    |  3|   2| 助手1|2017-08-05 13:44:...|
    +---+----+----+--------------------+
    only showing top 3 rows
    

    2.2 collect获取所有数据到数组

    不同于前面的show方法,这里的collect方法会将df中的所有数据都获取到,并返回一个Array对象。

    df.collect().foreach(println)
    

    输出为:

    [1,1,助手1,2017-08-10 13:44:19.0]
    [2,1,APP1,2017-08-04 13:44:26.0]
    [3,2,助手1,2017-08-05 13:44:29.0]
    [4,2,助手1,2017-08-07 13:44:32.0]
    [5,3,APP1,2017-08-02 13:44:38.0]
    [6,3,APP1,2017-08-01 13:44:41.0]
    [7,3,助手2,2017-08-14 13:44:48.0]
    [8,3,APP2,2017-08-03 13:44:45.0]
    [9,2,APP2,2017-08-11 13:44:53.0]
    [10,2,助手1,2017-07-14 13:44:57.0]
    [11,1,APP1,2017-07-15 13:45:03.0]
    [12,1,助手2,2017-07-07 13:45:08.0]
    

    2.3 collectAsList:获取所有数据到List

    功能和collect类似,只不过将返回结构变成了List对象,使用方法如下:

    println(df.collectAsList())
    

    输出为:

    [[1,1,助手1,2017-08-10 13:44:19.0], [2,1,APP1,2017-08-04 13:44:26.0], [3,2,助手1,2017-08-05 13:44:29.0], [4,2,助手1,2017-08-07 13:44:32.0], [5,3,APP1,2017-08-02 13:44:38.0], [6,3,APP1,2017-08-01 13:44:41.0], [7,3,助手2,2017-08-14 13:44:48.0], [8,3,APP2,2017-08-03 13:44:45.0], [9,2,APP2,2017-08-11 13:44:53.0], [10,2,助手1,2017-07-14 13:44:57.0], [11,1,APP1,2017-07-15 13:45:03.0], [12,1,助手2,2017-07-07 13:45:08.0]]
    

    2.4describe(cols: String*):获取指定字段的统计信息

    这个方法可以动态的传入一个或多个String类型的字段名,结果仍然为DataFrame对象,用于统计数值类型字段的统计值,比如count, mean, stddev, min, max等。

    df .describe("user" ).show()
    

    输出为:

    +-------+------------------+
    |summary|              user|
    +-------+------------------+
    |  count|                12|
    |   mean|               2.0|
    | stddev|0.8528028654224418|
    |    min|                 1|
    |    max|                 3|
    +-------+------------------+
    

    2.5first, head, take, takeAsList:获取若干行记录

    这里列出的四个方法比较类似,其中
    (1)first获取第一行记录
    (2)head获取第一行记录,head(n: Int)获取前n行记录
    (3)take(n: Int)获取前n行数据
    (4)takeAsList(n: Int)获取前n行数据,并以List的形式展现
    以Row或者Array[Row]的形式返回一行或多行数据。first和head功能相同。
    take和takeAsList方法会将获得到的数据返回到Driver端,所以,使用这两个方法时需要注意数据量,以免Driver发生OutOfMemoryError

    3、单个DataFrame操作

    3.1 使用where筛选条件

    where(conditionExpr: String):SQL语言中where关键字后的条件 ,传入筛选条件表达式,可以用and和or。得到DataFrame类型的返回结果, 比如我们想得到用户1或者使用助手1的操作记录:

    df.where("user=1 or type ='助手1'").show()
    

    输出为

    +---+----+----+--------------------+
    | id|user|type|           visittime|
    +---+----+----+--------------------+
    |  1|   1| 助手1|2017-08-10 13:44:...|
    |  2|   1|APP1|2017-08-04 13:44:...|
    |  3|   2| 助手1|2017-08-05 13:44:...|
    |  4|   2| 助手1|2017-08-07 13:44:...|
    | 10|   2| 助手1|2017-07-14 13:44:...|
    | 11|   1|APP1|2017-07-15 13:45:...|
    | 12|   1| 助手2|2017-07-07 13:45:...|
    +---+----+----+--------------------+
    

    3.2 filter:根据字段进行筛选

    传入筛选条件表达式,得到DataFrame类型的返回结果。和where使用条件相同,比如我们想得到用户1或者使用助手1的操作记录:

    df.filter("user=1 or type ='助手1'").show()
    

    结果和上面相同:

    +---+----+----+--------------------+
    | id|user|type|           visittime|
    +---+----+----+--------------------+
    |  1|   1| 助手1|2017-08-10 13:44:...|
    |  2|   1|APP1|2017-08-04 13:44:...|
    |  3|   2| 助手1|2017-08-05 13:44:...|
    |  4|   2| 助手1|2017-08-07 13:44:...|
    | 10|   2| 助手1|2017-07-14 13:44:...|
    | 11|   1|APP1|2017-07-15 13:45:...|
    | 12|   1| 助手2|2017-07-07 13:45:...|
    +---+----+----+--------------------+
    

    3.3 select:获取指定字段值

    根据传入的String类型字段名,获取指定字段的值,以DataFrame类型返回,比如我们想要查找user和type两列:

    df.select("user","type").show()
    

    结果为:

    +----+----+
    |user|type|
    +----+----+
    |   1| 助手1|
    |   1|APP1|
    |   2| 助手1|
    |   2| 助手1|
    |   3|APP1|
    |   3|APP1|
    |   3| 助手2|
    |   3|APP2|
    |   2|APP2|
    |   2| 助手1|
    |   1|APP1|
    |   1| 助手2|
    +----+----+
    

    还有一个重载的select方法,不是传入String类型参数,而是传入Column类型参数,Column类型即DataFrame中的一列。可以实现select id, id+1 from pivot这种逻辑。

    df.select(df("user"),df("user")+1).show()
    

    输出为

    +----+----------+
    |user|(user + 1)|
    +----+----------+
    |   1|       2.0|
    |   1|       2.0|
    |   2|       3.0|
    |   2|       3.0|
    |   3|       4.0|
    |   3|       4.0|
    |   3|       4.0|
    |   3|       4.0|
    |   2|       3.0|
    |   2|       3.0|
    |   1|       2.0|
    |   1|       2.0|
    +----+----------+
    

    3.4selectExpr:可以对指定字段进行特殊处理

    可以直接对指定字段调用UDF函数,或者指定别名等。传入String类型参数,得到DataFrame对象。 比如,将type字段重新命名为visittype,同时截取visittime的date:

    df.selectExpr("user","type as visittype","to_date(visittime)").show()
    

    输出为:

    +----+---------+--------------------------------+
    |user|visittype|to_date(CAST(visittime AS DATE))|
    +----+---------+--------------------------------+
    |   1|      助手1|                      2017-08-10|
    |   1|     APP1|                      2017-08-04|
    |   2|      助手1|                      2017-08-05|
    |   2|      助手1|                      2017-08-07|
    |   3|     APP1|                      2017-08-02|
    |   3|     APP1|                      2017-08-01|
    |   3|      助手2|                      2017-08-14|
    |   3|     APP2|                      2017-08-03|
    |   2|     APP2|                      2017-08-11|
    |   2|      助手1|                      2017-07-14|
    |   1|     APP1|                      2017-07-15|
    |   1|      助手2|                      2017-07-07|
    +----+---------+--------------------------------+
    

    3.5 col/apply:获取指定字段

    只能获取一个字段,返回对象为Column类型。 示例略

    3.6 drop:去除指定字段,保留其他字段

    返回一个新的DataFrame对象,其中不包含去除的字段,一次只能去除一个字段。比如我们去除type字段:

    df.drop("type").show()
    

    输出为:

    +---+----+--------------------+
    | id|user|           visittime|
    +---+----+--------------------+
    |  1|   1|2017-08-10 13:44:...|
    |  2|   1|2017-08-04 13:44:...|
    |  3|   2|2017-08-05 13:44:...|
    |  4|   2|2017-08-07 13:44:...|
    |  5|   3|2017-08-02 13:44:...|
    |  6|   3|2017-08-01 13:44:...|
    |  7|   3|2017-08-14 13:44:...|
    |  8|   3|2017-08-03 13:44:...|
    |  9|   2|2017-08-11 13:44:...|
    | 10|   2|2017-07-14 13:44:...|
    | 11|   1|2017-07-15 13:45:...|
    | 12|   1|2017-07-07 13:45:...|
    +---+----+--------------------+
    

    3.7 limit

    limit方法获取指定DataFrame的前n行记录,得到一个新的DataFrame对象。和take与head不同的是,limit方法不是Action操作。比如获得前3条记录:

    df.limit(3).show()
    

    输出为

    +---+----+----+--------------------+
    | id|user|type|           visittime|
    +---+----+----+--------------------+
    |  1|   1| 助手1|2017-08-10 13:44:...|
    |  2|   1|APP1|2017-08-04 13:44:...|
    |  3|   2| 助手1|2017-08-05 13:44:...|
    +---+----+----+--------------------+
    

    3.8 orderBy和sort

    orderBy和sort:按指定字段排序,默认为升序 ,例如,按照时间字段进行排序:

    df.orderBy("visittime").show(false)
    

    输出为:

    +---+----+----+---------------------+
    |id |user|type|visittime            |
    +---+----+----+---------------------+
    |12 |1   |助手2 |2017-07-07 13:45:08.0|
    |10 |2   |助手1 |2017-07-14 13:44:57.0|
    |11 |1   |APP1|2017-07-15 13:45:03.0|
    |6  |3   |APP1|2017-08-01 13:44:41.0|
    |5  |3   |APP1|2017-08-02 13:44:38.0|
    |8  |3   |APP2|2017-08-03 13:44:45.0|
    |2  |1   |APP1|2017-08-04 13:44:26.0|
    |3  |2   |助手1 |2017-08-05 13:44:29.0|
    |4  |2   |助手1 |2017-08-07 13:44:32.0|
    |1  |1   |助手1 |2017-08-10 13:44:19.0|
    |9  |2   |APP2|2017-08-11 13:44:53.0|
    |7  |3   |助手2 |2017-08-14 13:44:48.0|
    +---+----+----+---------------------+
    

    如果想要降序排序,可以使用如下的方法:

    df.orderBy(df("visittime").desc).show(false)
    

    输出为:

    +---+----+----+---------------------+
    |id |user|type|visittime            |
    +---+----+----+---------------------+
    |7  |3   |助手2 |2017-08-14 13:44:48.0|
    |9  |2   |APP2|2017-08-11 13:44:53.0|
    |1  |1   |助手1 |2017-08-10 13:44:19.0|
    |4  |2   |助手1 |2017-08-07 13:44:32.0|
    |3  |2   |助手1 |2017-08-05 13:44:29.0|
    |2  |1   |APP1|2017-08-04 13:44:26.0|
    |8  |3   |APP2|2017-08-03 13:44:45.0|
    |5  |3   |APP1|2017-08-02 13:44:38.0|
    |6  |3   |APP1|2017-08-01 13:44:41.0|
    |11 |1   |APP1|2017-07-15 13:45:03.0|
    |10 |2   |助手1 |2017-07-14 13:44:57.0|
    |12 |1   |助手2 |2017-07-07 13:45:08.0|
    +---+----+----+---------------------+
    

    3.9 group by数据分组

    groupBy方法有两种调用方式,可以传入String类型的字段名,也可传入Column类型的对象。
    使用方法如下:

    df.groupBy("user")
    df.groupBy(df("user"))
    

    groupBy方法之后得到的是GroupedData类型对象,不能直接接show方法来展示DataFrame,还需要跟一些分组统计函数,常用的统计函数有:
    max(colNames: String)方法,获取分组中指定字段或者所有的数字类型字段的最大值,只能作用于数字型字段
    min(colNames: String
    )方法,获取分组中指定字段或者所有的数字类型字段的最小值,只能作用于数字型字段
    mean(colNames: String)方法,获取分组中指定字段或者所有的数字类型字段的平均值,只能作用于数字型字段
    sum(colNames: String
    )方法,获取分组中指定字段或者所有的数字类型字段的和值,只能作用于数字型字段
    count()方法,获取分组中的元素个数
    例如下面的例子:

    df.groupBy("user").max("id").show()
    df.groupBy(df("user")).max("id").show()
    

    输出为:

    +----+-------+
    |user|max(id)|
    +----+-------+
    |   3|      8|
    |   1|     12|
    |   2|     10|
    +----+-------+
    

    我们还经常想要实现一个类似excel数据透视表的功能,这里就需要用到pivot函数,比如要统计每个用户通过各种渠道下单的次数:

    df.groupBy(df("user")).pivot("type").count().show()
    

    输出为:

    +----+----+----+----+----+
    |user|APP1|APP2| 助手1| 助手2|
    +----+----+----+----+----+
    |   3|   2|   1|null|   1|
    |   1|   2|null|   1|   1|
    |   2|null|   1|   3|null|
    +----+----+----+----+----+
    

    3.10 distinct数据去重

    使用distinct:返回当前DataFrame中不重复的Row记录。该方法和接下来的dropDuplicates()方法不传入指定字段时的结果相同。

    3.11 dropDuplicates:根据指定字段去重

    跟distinct方法不同的是,此方法可以根据指定字段去重。例如我们想要去掉相同用户通过相同渠道下单的数据:

    df.dropDuplicates("user","type").show()
    

    输出为:

    +---+----+----+--------------------+
    | id|user|type|           visittime|
    +---+----+----+--------------------+
    |  8|   3|APP2|2017-08-03 13:44:...|
    |  1|   1| 助手1|2017-08-10 13:44:...|
    |  7|   3| 助手2|2017-08-14 13:44:...|
    | 12|   1| 助手2|2017-07-07 13:45:...|
    |  3|   2| 助手1|2017-08-05 13:44:...|
    |  5|   3|APP1|2017-08-02 13:44:...|
    |  9|   2|APP2|2017-08-11 13:44:...|
    |  2|   1|APP1|2017-08-04 13:44:...|
    +---+----+----+--------------------+
    

    3.11 agg方法实现聚合操作

    聚合操作调用的是agg方法,该方法有多种调用方式。一般与groupBy方法配合使用。
    比如我们查找最大的id,并把所有的user值相加,这里只是为了演示代码的作用:

    df.agg("id"->"max","user"->"sum").show()
    

    输出为:

    +-------+---------+
    |max(id)|sum(user)|
    +-------+---------+
    |     12|     24.0|
    +-------+---------+
    

    3.12 withColumn添加新的一列

    我们可以使用withColumn方法为DataFrame添加新的一列,这个方法指定两个参数,一个是列名,一个是值,值需要是Column对象:

    df.withColumn("sex",df("user")%2).show()
    

    输出为

    +---+----+----+--------------------+---+
    | id|user|type|           visittime|sex|
    +---+----+----+--------------------+---+
    |  1|   1| 助手1|2017-08-10 13:44:...|1.0|
    |  2|   1|APP1|2017-08-04 13:44:...|1.0|
    |  3|   2| 助手1|2017-08-05 13:44:...|0.0|
    |  4|   2| 助手1|2017-08-07 13:44:...|0.0|
    |  5|   3|APP1|2017-08-02 13:44:...|1.0|
    |  6|   3|APP1|2017-08-01 13:44:...|1.0|
    |  7|   3| 助手2|2017-08-14 13:44:...|1.0|
    |  8|   3|APP2|2017-08-03 13:44:...|1.0|
    |  9|   2|APP2|2017-08-11 13:44:...|0.0|
    | 10|   2| 助手1|2017-07-14 13:44:...|0.0|
    | 11|   1|APP1|2017-07-15 13:45:...|1.0|
    | 12|   1| 助手2|2017-07-07 13:45:...|1.0|
    +---+----+----+--------------------+---+
    

    4、两个DataFrame操作

    首先,我们先来创建一个用户性别表,并读入新的DataFrame中。

    val df2 = spark.read
          .format("jdbc")
          .option("url", url)
          .option("dbtable", "user")
          .option("user", "root")
          .option("password", "admin")
          .load()
    
    df2.show()
    
    +----+---+
    |user|sex|
    +----+---+
    |   1|  男|
    |   2|  女|
    |   5|  男|
    +----+---+
    

    4.1 join链接

    首先,我们可以通过join函数实现两个DataFrame的链接操作,并要指定链接字段:

    df.join(df2,"user").show()
    

    输出为:

    +----+---+----+--------------------+---+
    |user| id|type|           visittime|sex|
    +----+---+----+--------------------+---+
    |   1|  1| 助手1|2017-08-10 13:44:...|  男|
    |   1|  2|APP1|2017-08-04 13:44:...|  男|
    |   1| 11|APP1|2017-07-15 13:45:...|  男|
    |   1| 12| 助手2|2017-07-07 13:45:...|  男|
    |   2|  3| 助手1|2017-08-05 13:44:...|  女|
    |   2|  4| 助手1|2017-08-07 13:44:...|  女|
    |   2|  9|APP2|2017-08-11 13:44:...|  女|
    |   2| 10| 助手1|2017-07-14 13:44:...|  女|
    +----+---+----+--------------------+---+
    

    如果我们有多个字段,可以使用:

    df.join(df2,Seq("id","user"))
    

    上面两个指定链接字段的形式称为using形式,因为类似于a join b using column1的形式,当然也可以使用Column类型来join,注意是三个等号:

    df.join(df2,df("user")===df2("user"))
    

    我们可以看到,默认的链接方式是内链接,当然我们已可以使用其他的方式,通过第三个参数来指定。我们可以指定的类型有inner, outer, left_outer, right_outer, leftsemi类型,不过只有using形式指定两个及以上字段以及使用Column类型来链接的时候可以指定链接方式。

    比如下面的方式是错误的:

    df.join(df2,"user","outer").show()
    

    比如我们使用外链接:

    df.join(df2,df("user")===df2("user"),"outer").show()
    

    结果为:

    +----+----+----+--------------------+----+----+
    |  id|user|type|           visittime|user| sex|
    +----+----+----+--------------------+----+----+
    |   1|   1| 助手1|2017-08-10 13:44:...|   1|   男|
    |   2|   1|APP1|2017-08-04 13:44:...|   1|   男|
    |  11|   1|APP1|2017-07-15 13:45:...|   1|   男|
    |  12|   1| 助手2|2017-07-07 13:45:...|   1|   男|
    |   5|   3|APP1|2017-08-02 13:44:...|null|null|
    |   6|   3|APP1|2017-08-01 13:44:...|null|null|
    |   7|   3| 助手2|2017-08-14 13:44:...|null|null|
    |   8|   3|APP2|2017-08-03 13:44:...|null|null|
    |   3|   2| 助手1|2017-08-05 13:44:...|   2|   女|
    |   4|   2| 助手1|2017-08-07 13:44:...|   2|   女|
    |   9|   2|APP2|2017-08-11 13:44:...|   2|   女|
    |  10|   2| 助手1|2017-07-14 13:44:...|   2|   女|
    |null|null|null|                null|   5|   男|
    +----+----+----+--------------------+----+----+
    

    未完待续,在学习过程中不断补充!

    相关文章

      网友评论

        本文标题:Spark-SQL之DataFrame基本操作

        本文链接:https://www.haomeiwen.com/subject/yceirxtx.html