一.Spark DataFrame概述
在Spark语义中,DtatFrame是一个分布式的行集合,可以想象为一个关系型数据库的表,或一个带有列头的Excel表格。它和RDD一样,有这样一些特点:
- Immuatable: 一旦RDD、DataFrame被创建,就不能更改,只能通过tranformation生成新的RDD、DataFrame
- Lazy Evaluations: 只有action才会出发Transformation的执行。
- Distributed: DataFrame和RDD一样都是分布式的。
1.1 创建DataFrame
支持的数据源:
- Parquet Files
- ORC Files
- JSON Files
- Hive Tables
- JDBC
- Avro Files
创建DataFrame的语法:
Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json");
Spark SQL的起点: SparkSession
代码:
import org.apache.spark.sql.SparkSession;
SparkSession spark = SparkSession
.builder()
.appName("Java Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate();
使用SparkSession,应用程序可以从现有的RDD、Hive表或Spark数据源中创建DataFrames。
1.1.1 通过json文件创建DataFrame
Json测试文件:
{"name": "Michael", "age": 12}
{"name": "Andy", "age": 13}
{"name": "Justin", "age": 8}
代码:
package org.example;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
public class SparkSQLTest4 {
public static void main(String[] args){
SparkSession spark = SparkSession
.builder()
.appName("SparkSQLTest4")
.config("spark.some.config.option", "some-value")
.getOrCreate();
Dataset<Row> df = spark.read().json("file:///home/pyspark/test.json");
df.show();
spark.stop();
}
}
测试记录:
![](https://img.haomeiwen.com/i2638478/fc6b2e2335f3adc6.png)
1.1.2 通过CSV文件创建DataFrame
csv测试文件:
![](https://img.haomeiwen.com/i2638478/690ec297ef41a5f9.png)
代码:
package org.example;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
public class SparkSQLTest5 {
public static void main(String[] args){
SparkSession spark = SparkSession
.builder()
.appName("SparkSQLTest4")
.config("spark.some.config.option", "some-value")
.getOrCreate();
Dataset<Row> df = spark.read().format("csv").option("header", "true").load("file:///home/pyspark/emp.csv");
df.show();
spark.stop();
}
}
测试记录:
![](https://img.haomeiwen.com/i2638478/85af2ce8336ebfa0.png)
1.1.3 通过hive table创建DataFrame
代码:
package org.example;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
public class SparkSQLTest2 {
public static void main(String[] args){
SparkSession spark = SparkSession
.builder()
.appName("SparkSQLTest2")
.config("spark.some.config.option", "some-value")
.getOrCreate();
Dataset<Row> sqlDF = spark.sql("SELECT * FROM test.ods_fact_sale limit 100");
sqlDF.show();
spark.stop();
}
}
测试记录:
![](https://img.haomeiwen.com/i2638478/ac7c8c4d209b7576.png)
1.1.4 通过jdbc数据源创建DataFrame
代码:
package org.example;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
public class SparkSQLTest3 {
public static void main(String[] args){
SparkSession spark = SparkSession
.builder()
.appName("SparkSQLTest3")
.config("spark.some.config.option", "some-value")
.getOrCreate();
Dataset<Row> jdbcDF = spark.read()
.format("jdbc")
.option("url", "jdbc:mysql://10.31.1.123:3306/test")
.option("dbtable", "(SELECT * FROM EMP) tmp")
.option("user", "root")
.option("password", "abc123")
.load();
jdbcDF.printSchema();
jdbcDF.show();
spark.stop();
}
}
测试记录:
![](https://img.haomeiwen.com/i2638478/4e726d8def43fbd5.png)
二.Spark SQL实战
我们选用经典scoot用户下的4张表来模拟Spark SQL实战:
emp
dept
bonus
salgrade
2.1 DataFrame的统计信息
生成DataFrame的时候会保留统计信息,有点类似关系型数据库的统计信息
代码:
package org.example;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
public class SparkSQLTest7 {
public static void main(String[] args){
SparkSession spark = SparkSession
.builder()
.appName("SparkSQLTest7")
.config("spark.some.config.option", "some-value")
.getOrCreate();
spark.sql("use test");
Dataset<Row> sqlDF = spark.sql("SELECT * FROM emp");
sqlDF.describe().show();
spark.stop();
}
}
测试记录:
从下图可以看出,DataFrame给每一列都做了统计信息。
- count 是列不为空的总数
- mean 平均值
- stddev 标准偏差
- min 最小值
-
max 最大值
image.png
2.2 DataFrame的select操作
有些应用场景,我们只需要DataFrame的部分列,此时可以通过select实现:
代码:
package org.example;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
public class SparkSQLTest8 {
public static void main(String[] args){
SparkSession spark = SparkSession
.builder()
.appName("SparkSQLTest8")
.config("spark.some.config.option", "some-value")
.getOrCreate();
spark.sql("use test");
Dataset<Row> sqlDF = spark.sql("SELECT * FROM emp");
sqlDF.select("ename","hiredate").show();
spark.stop();
}
}
测试记录:
![](https://img.haomeiwen.com/i2638478/40eebd7c5dbbf583.png)
2.3 DataFrame对列的操作
有些应用场景,我们需要对列进行别名、新增列、删除列等操作。
代码:
package org.example;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
public class SparkSQLTest9 {
public static void main(String[] args){
SparkSession spark = SparkSession
.builder()
.appName("SparkSQLTest8")
.config("spark.some.config.option", "some-value")
.getOrCreate();
spark.sql("use test");
Dataset<Row> sqlDF = spark.sql("SELECT * FROM emp");
//输出看有哪些列
System.out.println("\n" + "\n" + "\n");
for ( String col:sqlDF.columns() ){
System.out.println(col);
}
System.out.println("\n" + "\n" + "\n");
//删除一列
sqlDF.drop("comm").show();
//新增(或替换)一列
//sqlDF.withColumn("new_comm", "sal").show();
//给列进行重命名
sqlDF.withColumnRenamed("comm","comm_new").show();
spark.stop();
}
}
测试记录:
显示列的信息:
![](https://img.haomeiwen.com/i2638478/c26dbf42c84bfaf4.png)
删除一列:
![](https://img.haomeiwen.com/i2638478/ce5f2ad9889d9e80.png)
替换列名:
![](https://img.haomeiwen.com/i2638478/e69fed9bcb966a7f.png)
2.3 过滤数据
过滤数据用的是filter,其实也可以用where,where是filter的别名
代码:
package org.example;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
public class SparkSQLTest10 {
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.appName("SparkSQLTest10")
.config("spark.some.config.option", "some-value")
.getOrCreate();
spark.sql("use test");
Dataset<Row> sqlDF = spark.sql("SELECT * FROM emp");
sqlDF.where("comm is not null").show();
spark.stop();
}
}
测试记录:
![](https://img.haomeiwen.com/i2638478/f7b40de0d76440c1.png)
2.4 简单的聚合操作
常用的聚合操作:
操作 | 描述 |
---|---|
avg/mean | 平均值 |
count | 统计个数 |
countDistinct | 统计唯一的个数 |
max | 求最大值 |
min | 求最小值 |
sum | 求和 |
sumDistinct | 统计唯一值的合计 |
skewness | 偏态 |
stddev | 标准偏差 |
2.4.1 简单聚合
代码:
package org.example;
import org.apache.spark.sql.*;
public class SparkSQLTest11 {
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.appName("SparkSQLTest11")
.config("spark.some.config.option", "some-value")
.getOrCreate();
spark.sql("use test");
Dataset<Row> sqlDF = spark.sql("SELECT * FROM emp");
sqlDF.groupBy("deptno").agg(functions.avg("sal").alias("avg_sal"),
functions.max("comm").alias("max_comm")).show();
spark.stop();
}
}
测试记录:
![](https://img.haomeiwen.com/i2638478/dafffc314f04955e.png)
2.5 自定义函数
一些比较复杂的场景,我们希望使用自定义函数来实现。
代码:
package org.example;
import org.apache.spark.sql.*;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.types.DataTypes;
public class SparkSQLTest12 {
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.appName("SparkSQLTest12")
.config("spark.some.config.option", "some-value")
.getOrCreate();
spark.udf().register("plusOne", new UDF1<Integer, Integer>() {
@Override
public Integer call(Integer x) {
return x + 1;
}
}, DataTypes.IntegerType);
spark.sql("SELECT plusOne(5)").show();
spark.stop();
}
}
测试记录:
![](https://img.haomeiwen.com/i2638478/d3412772e1b97de7.png)
2.6 表连接
语法:
DataFrame.join(other, on=None, how=None)
other 需要连接的DataFrame
on str, list or Column, 可选项
how str, 可选项
default inner. Must be one of: inner, cross, outer, full, fullouter, full_outer, left, leftouter, left_outer, right, rightouter, right_outer, semi, leftsemi, left_semi, anti, leftanti and left_anti
2.6.1 内连接
代码:
package org.example;
import org.apache.spark.sql.*;
public class SparkSQLTest13 {
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.appName("SparkSQLTest13")
.config("spark.some.config.option", "some-value")
.getOrCreate();
spark.sql("use test");
Dataset<Row> df1 = spark.sql("SELECT * FROM emp");
Dataset<Row> df2 = spark.sql("SELECT * FROM dept");
Dataset<Row> df3 = df1.join(df2, df1.col("deptno").equalTo(df2.col("deptno")) ,"inner").select(df1.col("empno"),df1.col("ename"),df2.col("dname"),df2.col("loc"));
df3.show();
spark.stop();
}
}
测试记录:
![](https://img.haomeiwen.com/i2638478/badcf79714113f65.png)
2.6.2 外连接
这里我们使用一个右连接
代码:
package org.example;
import org.apache.spark.sql.*;
public class SparkSQLTest14 {
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.appName("SparkSQLTest14")
.config("spark.some.config.option", "some-value")
.getOrCreate();
spark.sql("use test");
Dataset<Row> df1 = spark.sql("SELECT * FROM emp");
Dataset<Row> df2 = spark.sql("SELECT * FROM dept");
Dataset<Row> df3 = df1.join(df2, df1.col("deptno").equalTo(df2.col("deptno")) ,"right").select(df1.col("empno"),df1.col("ename"),df2.col("dname"),df2.col("loc"));
df3.show();
spark.stop();
}
}
测试记录:
![](https://img.haomeiwen.com/i2638478/a5cc265f360ab62c.png)
2.7 排序
语法:
DataFrame.orderBy(*cols, **kwargs)
-- 返回按指定列排序的新DataFrame
参数: ascending bool or list,可选项
布尔值或布尔值列表(默认为True)。排序升序与降序。为多个排序顺序指定列表。如果指定了列表,则列表的长度必须等于cols的长度。
代码:
package org.example;
import org.apache.spark.sql.*;
public class SparkSQLTest15 {
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.appName("SparkSQLTest15")
.config("spark.some.config.option", "some-value")
.getOrCreate();
spark.sql("use test");
Dataset<Row> df1 = spark.sql("SELECT * FROM emp");
Dataset<Row> df2 = spark.sql("SELECT * FROM dept");
Dataset<Row> df3 = df1.join(df2, df1.col("deptno").equalTo(df2.col("deptno")) ,"right").select(df1.col("empno"),df1.col("ename"),df2.col("dname"),df2.col("loc"));
Dataset<Row> df4 = df3.orderBy(df3.col("dname").desc(),df3.col("ename").asc() );
df4.show();
spark.stop();
}
}
测试记录:
![](https://img.haomeiwen.com/i2638478/56362b7a0f6743a2.png)
2.8 SparkSQL操作文件
SparkSession上的sql函数允许应用程序以编程方式运行sql查询,并将结果作为Dataset返回。
代码:
package org.example;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
public class SparkSQLTest16 {
public static void main(String[] args){
SparkSession spark = SparkSession
.builder()
.appName("SparkSQLTest16")
.config("spark.some.config.option", "some-value")
.getOrCreate();
Dataset<Row> df = spark.read().json("file:///home/pyspark/test.json");
df.createOrReplaceTempView("people");
spark.sql("select * from people where age = 12").show();
spark.stop();
}
}
测试记录:
![](https://img.haomeiwen.com/i2638478/b4938896c8bad527.png)
网友评论