美文网首页
Spark(txt类型文件转换为DataFrame进行sql查询

Spark(txt类型文件转换为DataFrame进行sql查询

作者: AlexYao | 来源:发表于2020-03-28 16:07 被阅读0次

1.达成目标

读取采用逗号分隔的txt文件数据,通过sql对于txt文件中的数据进行查询

2.实现

2.1 数据示例

Michael, 29
Andy, 30
Justin, 19
Justins, 16
Justinv, 20
Justind, 99

2.2 思路

读取文件,形成javaRDD对象,对于每行数据进行分隔,形成以Row封装的JavaRDD<Row>对象,定义数据的格式后,结合Row对象,生成新的Dataset<Row>,调用sqlContext的sql方法,对于Dataset<Row>进行查询,得到结果

2.3 实现

代码:

package com.surfilter.spark.java;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

/**
 *
 */

/**
 * description: 将RDD通过java编程方式进行转换,生成DataFrame对象 <br>
 * date: 2020-3-28 <br>
 * author: Alex <br>
 * version: 1.0 <br>
 */
public class RDD2DataFrameByTxt {
    public static void main(String args[]) {
        //创建一个sparksession
        SparkSession spark = SparkSession
                .builder()
                .appName("RDD2DataFrameByTxt")
                .master("local")
                .getOrCreate();
        //读取文件,创建一个javaRDD,读取文件的textFile获取的是RDD方法,需要使用toJavaRDD,转换为javaRDD对象
        //这里的数据结构为  "yaohao",
        JavaRDD<String> lines = spark.sparkContext().textFile("/Users/yaohao/tools/spark-2.4.5-bin-hadoop2.7/examples/src/main/resources/people.txt", 1).toJavaRDD();
        //将一行数据转变为Row进行封装
        JavaRDD<Row> df = lines.map(line -> {
            String[] sp = line.split(",");
            return RowFactory.create(sp[0], Integer.parseInt(sp[1].trim()));
        });

        //数据的字段结构
        StructType rfSchema = new StructType(new StructField[]{
                new StructField("name", DataTypes.StringType, true, Metadata.empty()),
                new StructField("age", DataTypes.IntegerType, true, Metadata.empty())});

        //生成一个结构化后的数据
        Dataset<Row> rfDataset = spark.createDataFrame(df, rfSchema);
        //在结果集不增加的情况下,缓存RDD,方便后续进行查询以及分析
        rfDataset.cache();
        //可以理解为注册城一张表,支持后面采用sql方式进行查询
        rfDataset.registerTempTable("person");

        Dataset<Row> result = rfDataset.sqlContext().sql("select sum(age) from person where age>=18");
        result.show();
    }
}

相关文章

网友评论

      本文标题:Spark(txt类型文件转换为DataFrame进行sql查询

      本文链接:https://www.haomeiwen.com/subject/agmjuhtx.html