1.达成目标
读取采用逗号分隔的txt文件数据,通过sql对于txt文件中的数据进行查询
2.实现
2.1 数据示例
Michael, 29
Andy, 30
Justin, 19
Justins, 16
Justinv, 20
Justind, 99
2.2 思路
读取文件,形成javaRDD对象,对于每行数据进行分隔,形成以Row封装的JavaRDD<Row>对象,定义数据的格式后,结合Row对象,生成新的Dataset<Row>,调用sqlContext的sql方法,对于Dataset<Row>进行查询,得到结果
2.3 实现
代码:
package com.surfilter.spark.java;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
/**
*
*/
/**
* description: 将RDD通过java编程方式进行转换,生成DataFrame对象 <br>
* date: 2020-3-28 <br>
* author: Alex <br>
* version: 1.0 <br>
*/
public class RDD2DataFrameByTxt {
public static void main(String args[]) {
//创建一个sparksession
SparkSession spark = SparkSession
.builder()
.appName("RDD2DataFrameByTxt")
.master("local")
.getOrCreate();
//读取文件,创建一个javaRDD,读取文件的textFile获取的是RDD方法,需要使用toJavaRDD,转换为javaRDD对象
//这里的数据结构为 "yaohao",
JavaRDD<String> lines = spark.sparkContext().textFile("/Users/yaohao/tools/spark-2.4.5-bin-hadoop2.7/examples/src/main/resources/people.txt", 1).toJavaRDD();
//将一行数据转变为Row进行封装
JavaRDD<Row> df = lines.map(line -> {
String[] sp = line.split(",");
return RowFactory.create(sp[0], Integer.parseInt(sp[1].trim()));
});
//数据的字段结构
StructType rfSchema = new StructType(new StructField[]{
new StructField("name", DataTypes.StringType, true, Metadata.empty()),
new StructField("age", DataTypes.IntegerType, true, Metadata.empty())});
//生成一个结构化后的数据
Dataset<Row> rfDataset = spark.createDataFrame(df, rfSchema);
//在结果集不增加的情况下,缓存RDD,方便后续进行查询以及分析
rfDataset.cache();
//可以理解为注册城一张表,支持后面采用sql方式进行查询
rfDataset.registerTempTable("person");
Dataset<Row> result = rfDataset.sqlContext().sql("select sum(age) from person where age>=18");
result.show();
}
}
网友评论