SparkSQL是spark家族中一个结构化或半结构化数据的处理模块。对SQL的处理跟关系型数据库SQL类似,将SQL解析成一棵树,通过规则的模式匹配,对树进行绑定、优化,得到查询结果。
SparkSQL提供了一种特殊的RDD-DataFrame,相当于关系型数据的一个表,在Java API中,由行(row)组成的数据集(DataSet)表示为一个DataFrame。
用户程序在执行过程中,下图表示了从SQL语句到DataFrame的整个执行过程。

SparkSession
在spark1.x时代,SparkSQL的入口都是通过SQLContext 或者HiveContext完成,从1.6以后,引入了SparkSession概念,替代了SQLContext,实现对数据的加载、转换、处理等工作。
可以通过SparkSession.builder来创建一个SparkSession,也可以通过stop停止
SparkSQL与MongoDB的集成
Mongo对Spark的支持可参见Mongo官方文档MongoDB
public static void main(String[] args) throws AnalysisException {
logger.info("开始执行告警统计spark任务");
SparkSession spark = SparkSession.builder()
.master("local")
.appName("alarmService")
.config("spark.mongodb.input.uri", MONGODB_INPUT_URL)
.config("spark.mongodb.output.uri", MONGODB_OUTPUT_URL)
.getOrCreate();
Dataset ds = MongoSpark.load(spark, EmAlarmBean.class);
ds.registerTempTable("test");
ds = spark.sql(getSql());
MongoSpark.save(ds);
}
private static String getSql() {
String sql = "select orgId,from_unixtime(createTimestamp, 'yyyyMMdd') AS statisticDate, " +
"sum(if(levelDictId='4001',1,0)) level1," +
"sum(if(levelDictId='4002',1,0)) level2," +
"sum(if(levelDictId='4003',1,0)) level3" +
" from test " +
" where (deviceType=0 or deviceType=2 or deviceType=3) and createTimestamp is not null and orgId is not null " +
" group by from_unixtime(createTimestamp, 'yyyyMMdd'),orgId";
return sql;
}
网友评论