美文网首页Spark
Spark从入门到入土(五):SparkSQL原理与实战

Spark从入门到入土(五):SparkSQL原理与实战

作者: 那些年搬过的砖 | 来源:发表于2019-07-11 17:07 被阅读0次

    SparkSQL是spark家族中一个结构化或半结构化数据的处理模块。对SQL的处理跟关系型数据库SQL类似,将SQL解析成一棵树,通过规则的模式匹配,对树进行绑定、优化,得到查询结果。
    SparkSQL提供了一种特殊的RDD-DataFrame,相当于关系型数据的一个表,在Java API中,由行(row)组成的数据集(DataSet)表示为一个DataFrame。
    用户程序在执行过程中,下图表示了从SQL语句到DataFrame的整个执行过程。


    SparkSession

    在spark1.x时代,SparkSQL的入口都是通过SQLContext 或者HiveContext完成,从1.6以后,引入了SparkSession概念,替代了SQLContext,实现对数据的加载、转换、处理等工作。

    可以通过SparkSession.builder来创建一个SparkSession,也可以通过stop停止

    SparkSQL与MongoDB的集成

    Mongo对Spark的支持可参见Mongo官方文档MongoDB

    public static void main(String[] args) throws AnalysisException {
            logger.info("开始执行告警统计spark任务");
    
            SparkSession spark = SparkSession.builder()
                    .master("local")
                    .appName("alarmService")
                    .config("spark.mongodb.input.uri", MONGODB_INPUT_URL)
                    .config("spark.mongodb.output.uri", MONGODB_OUTPUT_URL)
                    .getOrCreate();
            Dataset ds = MongoSpark.load(spark, EmAlarmBean.class);
            ds.registerTempTable("test");
            ds = spark.sql(getSql());
            MongoSpark.save(ds);
        }
    
        private static String getSql() {
            String sql = "select orgId,from_unixtime(createTimestamp, 'yyyyMMdd') AS statisticDate, " +
                    "sum(if(levelDictId='4001',1,0)) level1," +
                    "sum(if(levelDictId='4002',1,0)) level2," +
                    "sum(if(levelDictId='4003',1,0)) level3" +
                    " from test " +
                    " where (deviceType=0 or deviceType=2 or deviceType=3) and createTimestamp is not null and orgId is not null " +
                    " group by from_unixtime(createTimestamp, 'yyyyMMdd'),orgId";
            return sql;
        }
    

    相关文章

      网友评论

        本文标题:Spark从入门到入土(五):SparkSQL原理与实战

        本文链接:https://www.haomeiwen.com/subject/tfkdkctx.html