Spark SQL
DataFrame 的创建以及基本操作
DataFrame可以理解成关系型数据库中的表,它与 RDD 的差别在于 DataFrame 有 schema 信息
public class DataFrameCreate {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local").setAppName("DataFrame");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
DataFrame dataFrame = sqlContext.read()
.json(Thread.currentThread().getContextClassLoader().getResource("student.txt").getPath());
// 打印 DataFrame
dataFrame.show();
// 打印 DataFrame 的 Schema 信息
dataFrame.printSchema();
// 查询某一列的数据
dataFrame.select(dataFrame.col("name")).show();
// 查询某几列的数据,并对列进行计算
dataFrame.select(dataFrame.col("name"), dataFrame.col("age").plus(1)).show();
// 根据某一列的值进行过滤
dataFrame.filter(dataFrame.col("age").gt(20)).show();
// 根据某一列进行分组,然后再进行聚合
dataFrame.groupBy(dataFrame.col("name")).count().show();
}
}
object DataFrameCreate extends App {
val conf = new SparkConf().setMaster("local").setAppName("DataFrame")
val sc = SparkContext.getOrCreate(conf)
// 创建 SQLContext
val sqlContext = SQLContext.getOrCreate(sc)
// 创建 DataFrame 对象
val dataFrame = sqlContext.read.json(Thread.currentThread().getContextClassLoader.getResource("student.txt").getPath)
dataFrame.show()
dataFrame.printSchema()
dataFrame.select(dataFrame.col("name")).show()
dataFrame.select(dataFrame.col("name"), dataFrame.col("age").plus(1)).show()
dataFrame.filter(dataFrame.col("age").gt(20)).show()
dataFrame.groupBy(dataFrame.col("name")).count().show()
}
RDD转换为DataFrame
使用反射的方式将RDD转换为DataFrame
/**
* Java语言实现
* 通过反射的方式将RDD转换为DataFrame
*/
public class RDD2DataFrameReflection {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local").setAppName("RDD2DataFrameReflection");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
// 读取文件生成RDD,然后将RDD中的line转换为Person类型
JavaRDD<Person> initRDD = sc.textFile(Thread.currentThread().getContextClassLoader().getResource("person.txt").getPath())
.map(line -> line.split(" "))
.map(array -> new Person(array[0], Integer.parseInt(array[1]), array[2]));
// 通过反射创建DataFrame,然后注册成一张临时表(person)
sqlContext.createDataFrame(initRDD, Person.class).registerTempTable("person");
// 执行sql查询,查询出年龄大于18岁的数据
DataFrame personDataFrame = sqlContext.sql("select name,age,sex from person where age > 18");
// 打印数据
personDataFrame.show();
// 将DataFrame再次转换成RDD,此时RDD中的数据类型为RDD,需要将其转换成Person类型。最后打印结果。
personDataFrame.javaRDD()
.map(row -> new Person(row.getString(0), row.getInt(1), row.getString(2)))
.collect()
.forEach(System.out::println);
}
/**
* 使用反射将RDD转换为DataFrame时,JavaBean的class必须使用public修饰,并且需要实现Serializable接口
*/
public static class Person implements Serializable {
private String name;
private Integer age;
private String sex;
public Person() {
}
public Person(String name, Integer age, String sex) {
this.name = name;
this.age = age;
this.sex = sex;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
public String getSex() {
return sex;
}
public void setSex(String sex) {
this.sex = sex;
}
@Override
public String toString() {
return "Person{" +
"name='" + name + '\'' +
", age=" + age +
", sex='" + sex + '\'' +
'}';
}
}
}
// 使用scala语言实现RDD和DataFrame之间的转换
object RDD2DataFrameReflection {
// 申明一个case class 作为bean对象
case class Person(name: String, age: Int, sex: String)
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("RDD2DataFrame")
val sc = SparkContext.getOrCreate(conf)
val sqlContext = SQLContext.getOrCreate(sc)
// scala中RDD转成DataFrame需要导入隐式转换的包
import sqlContext.implicits._
// 读取txt文件生成RDD,调用toDF转换成DataFrame,最后将DataFrame注册成一张临时表
sc.textFile(Thread.currentThread().getContextClassLoader.getResource("person.txt").getPath)
.map(_.split(" "))
.map(array => Person(array(0), array(1).trim.toInt, array(2)))
.toDF()
.registerTempTable("person")
// 直接调用DataFrame的rdd方法即可得到RDD,此时RDD中的数据类型为Row,需要转换成Person。最后进行打印
sqlContext.sql("select name,age,sex from person where age > 18")
.rdd
.map(row => Person(row.getString(0), row.getInt(1), row.getString(2)))
.collect
.foreach(println)
}
}
使用编码的方式动态创建元数据
// 使用Java语言实现RDD和DataFrame的互相转换(自定义元数据的方式)
public class RDD2DataFrameProgrammatically {
public static void main(String[] args) {
// 创建SparkContext和SQLContext
SparkConf conf = new SparkConf().setMaster("local").setAppName("RDD2DataFrame");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
// 从文件中读取数据生成RDD,将RDD中的类型转换成Row类型
JavaRDD<Row> rdd = sc.textFile(Thread.currentThread().getContextClassLoader().getResource("person.txt").getPath())
.map(line -> line.split(" "))
.map(array -> RowFactory.create(array[0], Integer.parseInt(array[1]), array[2]));
// 构建元数据信息
List<StructField> structFields = Arrays.asList(
DataTypes.createStructField("name", DataTypes.StringType, true),
DataTypes.createStructField("age", DataTypes.IntegerType, true),
DataTypes.createStructField("sex", DataTypes.StringType, true)
);
StructType structType = DataTypes.createStructType(structFields);
// 将RDD转换成DataFrame,并将其注册成一张临时表
sqlContext.createDataFrame(rdd, structType).registerTempTable("person");
sqlContext.sql("select name,age,sex from person where age > 18")
.javaRDD()
.map(row -> new RDD2DataFrameReflection.Person(row.getString(0), row.getInt(1), row.getString(2)))
.collect()
.forEach(System.out::println);
}
}
// 使用scala语言实现RDD和DataFrame的互相转换(自定义元数据的方式)
object RDD2DataFrameProgrammatically {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("RDD2DataFrame")
val sc = SparkContext.getOrCreate(conf)
val sqlContext = SQLContext.getOrCreate(sc)
// 从文本文件中读取数据生成RDD
val rdd = sc.textFile(Thread.currentThread().getContextClassLoader.getResource("person.txt").getPath)
.map(_.split(" "))
.map(array => Row(array(0), array(1).toInt, array(2)))
// 构建元数据信息
val structType = StructType(Array(
StructField("name", StringType, true),
StructField("age", IntegerType, true),
StructField("sex", StringType, true)
))
// 将RDD转换成DataFrame并将其注册成一张临时表
sqlContext.createDataFrame(rdd, structType).registerTempTable("person")
// 查询出年龄大于18的用户信息,将其转成RDD再进行打印
sqlContext.sql("select name,age,sex from person where age > 18")
.rdd
.map(row => Person(row.getString(0), row.getAs[Integer](1), row.getString(2)))
.collect()
.foreach(println)
}
}
load && save 方法
// Java 语言演示 load 和 save 方法的具体使用
public static void loadAndSave() {
SparkConf conf = new SparkConf().setAppName("LoadAndSave").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
// 不指定类型时 load 和 save 默认的格式为 parquet
sqlContext.read()
.load(Thread.currentThread().getContextClassLoader().getResource("student.parquet").getPath())
.write()
.save(Thread.currentThread().getContextClassLoader().getResource("student_copy.parquet").getPath());
// 手动指定 load 和 save 方法操作的文件类型
sqlContext.read()
.format("json")
.load(Thread.currentThread().getContextClassLoader().getResource("student.txt").getPath())
.show();
sqlContext.read()
.format("json")
.load(Thread.currentThread().getContextClassLoader().getResource("student.txt").getPath())
.write()
.format("json")
.save("file:///Users/garen/Documents/student_copy");
sqlContext.read()
.format("json")
.load(Thread.currentThread().getContextClassLoader().getResource("student.txt").getPath())
.write()
.format("json")
.mode(SaveMode.ErrorIfExists)
.save("file:///Users/garen/Documents/student_copy");
}
def loadAndSave(): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("LoadAndSave")
val sc = SparkContext.getOrCreate(conf)
val sqlContext = SQLContext.getOrCreate(sc)
sqlContext.read
.format("json")
.load(Thread.currentThread().getContextClassLoader.getResource("student.txt").getPath)
.show()
val dataFrame = sqlContext.read
.format("json")
.load(Thread.currentThread().getContextClassLoader.getResource("student.txt").getPath)
dataFrame.filter(dataFrame.col("age").gt(18))
.write
.format("json")
.mode(SaveMode.ErrorIfExists)
.save("file:///D:/demo")
}
parquet数据源操作
使用编程方式加载parquet数据源
// java读取parquet文件
public static void loadParquet() {
SparkConf conf = new SparkConf().setMaster("local").setAppName("parquet");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
sqlContext.read().parquet(Thread.currentThread().getContextClassLoader().getResource("/tarball/users.parquet").getPath()).registerTempTable("person");
sqlContext.sql("select name from person")
.javaRDD()
.map(row -> row.getString(0))
.collect()
.forEach(System.out::println);
}
// scala读取parquet文件
def parquet(): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("parquet")
val sc = SparkContext.getOrCreate(conf)
val sqlContext = SQLContext.getOrCreate(sc)
sqlContext.read
.parquet(Thread.currentThread.getContextClassLoader.getResource("tarball/users.parquet").getPath)
.registerTempTable("users")
sqlContext.sql("select * from users")
.rdd
.map(_.getString(0))
.collect
.foreach(println)
}
自动分区推断
// Java语言
public static void parquetPartion() {
SparkConf conf = new SparkConf().setMaster("local").setAppName("ParquetPartion");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
// 打印schema信息,会多出两个分区字段("gender","country")
// 分区列数据类型:支持numeric和string类型的自动推断,通过“spark.sql.sources.partitionColumnTypeInference.enabled”配置开启或关闭(默认开启),关闭后分区列全为string类型。
sqlContext.read()
.parquet("hdfs://spark1:9000/spark-study/users/gender=male/country=US/users.parquet")
.printSchema();
}
自动合并元数据
def mergeSchema(): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local").setAppName("MergeSchema")
// 方法一:通过 conf 对象,设置 spark.sql.parquet.mergeSchema 为 true
// conf.set("spark.sql.parquet.mergeSchema", "true")
val sc: SparkContext = SparkContext.getOrCreate(conf)
val sqlContext: SQLContext = SQLContext.getOrCreate(sc)
val nameAndAgeArray: Array[(String, Int)] = Array(("Tom", 23), ("Garen", 21), ("Peter", 25))
val nameAndScoreArray: Array[(String, Int)] = Array(("Tome", 100), ("Garen", 98), ("Peter", 95))
import sqlContext.implicits._
sc.parallelize(nameAndAgeArray)
.toDF("name", "age")
.write
.format("parquet")
.mode(SaveMode.Append)
.save("hdfs://users/hadoop/persons")
sc.parallelize(nameAndScoreArray)
.toDF("name", "score")
.write
.format("parquet")
.mode(SaveMode.Append)
.save("hdfs://users/hadoop/persons")
// 方法二:通过 option 设置 MergeSchema 为 true
sqlContext.read.option("mergeSchema", "true")
.parquet("hdfs://users/hadoop/persons")
.printSchema()
sqlContext.read.option("mergeSchema", "true")
.parquet("data/persons")
.show()
}
/**
* root
* |-- name: string (nullable = true)
* |-- score: integer (nullable = true)
* |-- age: integer (nullable = true)
*
* +-----+-----+----+
* | name|score| age|
* +-----+-----+----+
* | Tom| null| 23|
* |Garen| null| 21|
* |Peter| null| 25|
* | Tome| 100|null|
* |Garen| 98|null|
* |Peter| 95|null|
* +-----+-----+----+
*/
Json 数据源操作
需求:查询分数大于80的学生信息以及成绩信息
public static void jsonOption() {
SparkConf conf = new SparkConf().setMaster("local").setAppName("JsonOption");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
// 方式一: 从 json 文件创建学生信息的 DataFrame,并注册成临时表
sqlContext.read().format("json")
.load(Thread.currentThread().getContextClassLoader().getResource("student.txt").getPath())
.registerTempTable("student_info");
JavaRDD<String> rdd = sc.parallelize(Arrays.asList(
"{\"id\":\"1001\",\"score\":100}",
"{\"id\":\"1002\",\"score\":79}",
"{\"id\":\"1003\",\"score\":98}"
));
// 方式二: 从 json 格式的 RDD 创建学生分数信息的 DataFrame,并注册成临时表
sqlContext.read().json(rdd).registerTempTable("student_score");
// 查询分数大于80的学生信息和成绩,并保存到 json 文件中
sqlContext.sql("select a.id,a.name,a.age,b.score from student_info a left join student_score b on a.id = b.id where b.score > 80")
.write()
.format("json")
.mode(SaveMode.Overwrite)
.save("data/student_score");
}
def jsonOption(): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local").setAppName("JsonOption")
val sc: SparkContext = SparkContext.getOrCreate(conf)
val sqlContext = SQLContext.getOrCreate(sc)
// 从文件中读取json数据,并注册成临时表
sqlContext.read
.format("json")
.load(Thread.currentThread().getContextClassLoader.getResource("student.txt").getPath)
.registerTempTable("student_info")
// 从rdd中读取数据,并注册成临时表
val jsonArray: Array[String] = Array(
"{\"id\":\"1001\",\"score\":100}",
"{\"id\":\"1002\",\"score\":90}",
"{\"id\":\"1003\",\"score\":80}"
)
val rdd = sc.parallelize(jsonArray)
sqlContext.read.json(rdd).registerTempTable("student_score")
sqlContext.sql("select a.id,a.name,a.age,b.score from student_info a left join student_score b on a.id = b.id where b.score > 80")
.write
.format("json")
.mode(SaveMode.Overwrite)
.save("data/students_score")
}
Hive数据源操作
操作hive数据源完成查询成绩大于80的用户信息的功能。对hive数据源进行操作时需要将hive-site.xml和mysql的驱动包拷贝到spark的目录下。
public static void hiveOption() {
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("HiveOption");
JavaSparkContext sc = new JavaSparkContext(conf);
HiveContext hiveContext = new HiveContext(sc);
// 如果student_info表存在则先删除
hiveContext.sql("DROP TABLE IF EXISTS hive.student_info").show();
// 创建studnet_info表结构
hiveContext.sql("CREATE TABLE IF NOT EXISTS hive.student_info (name STRING,age INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ' '").show();
// 加载数据到student_info表中
hiveContext.sql("LOAD DATA INPATH '/user/hadoop/test_data/student_info.txt' INTO TABLE student_info").show();
// 如果student_score表存在则先删除
hiveContext.sql("DROP TABLE IF EXISTS hive.student_score").show();
// 创建student_score表结构
hiveContext.sql("CREATE TABLE IF NOT EXISTS hive.student_score (name STRING,score INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ' '").show();
// 加载数据到student_score表中
hiveContext.sql("LOAD DATA INPATH '/user/hadoop/test_data/student_score.txt' INTO TABLE student_score").show();
// 查询出成绩大于80的学生信息并保存为hive的一张新表
hiveContext.sql("select a.name,a.age,b.score from student_info a left join student_score b on a.name = b.name where b.score > 80")
.saveAsTable("student_info_score");
// 通过hive中的表直接创建DataFrame
Row[] rows = hiveContext.table("student_info_score").collect();
Arrays.asList(rows).forEach(System.out::println);
}
JDBC 数据源操作
public static void jdbc() {
SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("JDBCOption");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = SQLContext.getOrCreate(sc.sc());
// 从student_info表中读取数据,并转换成 RDD
Map<String, String> options = new HashMap<>();
options.put("url", "jdbc:mysql://hadoop-cdh:3306/testdb");
options.put("user", "root");
options.put("password", "Slb930802.");
options.put("dbtable", "student_info");
DataFrame student_info_df = sqlContext.read().format("jdbc").options(options).load();
JavaPairRDD<String, Integer> student_info_rdd = student_info_df.javaRDD()
.mapToPair(row -> new Tuple2<String, Integer>(row.getString(0), row.getInt(1)));
// 从student_score表中读取数据,并转换成 RDD
options.put("dbtable", "student_score");
DataFrame student_score_df = sqlContext.read().format("jdbc").options(options).load();
JavaPairRDD<String, Integer> student_score_rdd = student_score_df.javaRDD()
.mapToPair(row -> new Tuple2<String, Integer>(row.getString(0), row.getInt(1)));
// 使用 join 算子连接两个 RDD,再筛选出分数大于80的学生信息
JavaRDD<Row> good_student_info_rdd = student_info_rdd.join(student_score_rdd)
.map(t -> RowFactory.create(t._1(), t._2()._1(), t._2()._2()))
.filter(row -> row.getInt(2) > 80);
// 通过自定义元数据的方式将 RDD 转换成 DataFrame 并打印
List<StructField> structFields = Arrays.asList(
DataTypes.createStructField("name", DataTypes.StringType, true),
DataTypes.createStructField("age", DataTypes.IntegerType, true),
DataTypes.createStructField("score", DataTypes.IntegerType, true)
);
StructType structType = DataTypes.createStructType(structFields);
sqlContext.createDataFrame(good_student_info_rdd, structType).show();
// 将 RDD 的数据保存到 mysql 数据库中
good_student_info_rdd.foreach(row -> {
Class.forName("com.mysql.jdbc.Driver");
Connection conn = null;
PreparedStatement ps = null;
try {
conn = DriverManager.getConnection("jdbc:mysql://hadoop-cdh:3306/testdb", "root", "Slb930802.");
ps = conn.prepareStatement("INSERT INTO good_student_info VALUES(?,?,?)");
ps.setString(1, row.getString(0));
ps.setInt(2, row.getInt(1));
ps.setInt(3, row.getInt(2));
ps.executeUpdate();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (ps != null) ps.close();
if (conn != null) conn.close();
}
});
}
SparkSQL内置函数
统计UV
def uv(): Unit = {
val conf = new SparkConf().setMaster("local[4]").setAppName("uv")
val sc = SparkContext.getOrCreate(conf)
val sqlContext = SQLContext.getOrCreate(sc)
// 构造日志信息,并转换成DataFrame的格式
val logArray = Array(
"20180115,fpanpxc2gvjd2py0jd2254ng",
"20180115,fpanpxc2gvjd2py0jd2254ng",
"20180115,4xlt0txdgy220f3owkhxv4y1",
"20180116,kz5t0lvcjsbsqacjqwqyhfu4")
val logRDD = sc.parallelize(logArray)
.map(_.split(","))
.map(array => RowFactory.create(array(0), array(1)))
val structType = StructType(Array(
StructField("data", StringType, false),
StructField("sessionId", StringType, false)
))
val logDF = sqlContext.createDataFrame(logRDD, structType)
// 使用SparkSQL的内置函数对字段做聚合操作
// 使用SparkSQL的内置函数需要导入以下两个包
import sqlContext.implicits._
import org.apache.spark.sql.functions._
logDF.groupBy("data").agg(countDistinct('sessionId).as("uv")).show()
}
统计每日的总销售额
def totalSale(): Unit = {
val conf = new SparkConf().setMaster("local[4]").setAppName("TotalSale")
val sc = SparkContext.getOrCreate(conf)
val sqlContext = SQLContext.getOrCreate(sc)
// 构建DataFrame
val logArray = Array("20180115,34.3",
"20180115,35.6",
"20180116,99.9")
val logRDD = sc.parallelize(logArray)
.map(_.split(","))
.map(array => Row(array(0), array(1).toDouble))
val structType = StructType(Array(
StructField("data", StringType, false),
StructField("sale", DoubleType, false)
))
// 做分组聚合,统计每日交易额
import sqlContext.implicits._
import org.apache.spark.sql.functions._
sqlContext.createDataFrame(logRDD, structType)
.groupBy("data")
.agg('data, sum('sale).as("total_sale"))
.show()
}
SparkSQL窗口函数
row_number()窗口函数
select url,rate,row_number() over(partition by url order by rate desc) as r from window_test2;
网友评论