美文网首页Spark
Spark从入门到入土(五):SparkSQL原理与实战

Spark从入门到入土(五):SparkSQL原理与实战

作者: 那些年搬过的砖 | 来源:发表于2019-07-11 17:07 被阅读0次

SparkSQL是spark家族中一个结构化或半结构化数据的处理模块。对SQL的处理跟关系型数据库SQL类似,将SQL解析成一棵树,通过规则的模式匹配,对树进行绑定、优化,得到查询结果。
SparkSQL提供了一种特殊的RDD-DataFrame,相当于关系型数据的一个表,在Java API中,由行(row)组成的数据集(DataSet)表示为一个DataFrame。
用户程序在执行过程中,下图表示了从SQL语句到DataFrame的整个执行过程。


SparkSession

在spark1.x时代,SparkSQL的入口都是通过SQLContext 或者HiveContext完成,从1.6以后,引入了SparkSession概念,替代了SQLContext,实现对数据的加载、转换、处理等工作。

可以通过SparkSession.builder来创建一个SparkSession,也可以通过stop停止

SparkSQL与MongoDB的集成

Mongo对Spark的支持可参见Mongo官方文档MongoDB

public static void main(String[] args) throws AnalysisException {
        logger.info("开始执行告警统计spark任务");

        SparkSession spark = SparkSession.builder()
                .master("local")
                .appName("alarmService")
                .config("spark.mongodb.input.uri", MONGODB_INPUT_URL)
                .config("spark.mongodb.output.uri", MONGODB_OUTPUT_URL)
                .getOrCreate();
        Dataset ds = MongoSpark.load(spark, EmAlarmBean.class);
        ds.registerTempTable("test");
        ds = spark.sql(getSql());
        MongoSpark.save(ds);
    }

    private static String getSql() {
        String sql = "select orgId,from_unixtime(createTimestamp, 'yyyyMMdd') AS statisticDate, " +
                "sum(if(levelDictId='4001',1,0)) level1," +
                "sum(if(levelDictId='4002',1,0)) level2," +
                "sum(if(levelDictId='4003',1,0)) level3" +
                " from test " +
                " where (deviceType=0 or deviceType=2 or deviceType=3) and createTimestamp is not null and orgId is not null " +
                " group by from_unixtime(createTimestamp, 'yyyyMMdd'),orgId";
        return sql;
    }

相关文章

网友评论

    本文标题:Spark从入门到入土(五):SparkSQL原理与实战

    本文链接:https://www.haomeiwen.com/subject/tfkdkctx.html