spark读写数据仓库

作者: 藤风 | 来源:发表于2019-06-06 14:37 被阅读1次

1、使用场景

 随着业务及数据量的增长,数据库中的数据大致可以分为两类,一类为操作型数据,另一类为分析型数据。其中,操作型数据通常与日常业务紧密相关且可进行增删改查,而分析型数据通常为历史数据,用于统计分析,仅能查询不可增删改。此外,分析型数据有时需要对业务数据进行数据清洗得到。因此,可以将分析型数据导入数据仓库hive中,spark再定时从hive中取出数据进行分析。以城市空气质量预测为例,空气监测点分布在城市中的各个地方,定时地将数据上传至平台中,为了对城市空气质量进行预测,需定期将城市中各监测点的小时数据取平均值后存入hive中,spark再定期从hive中取出数据进行预测分析。

2、spark存入hive

 spark存入hive表有两种方式,一种调用方式DF.write.saveAsTable,另一种方式调用hiveContext.sql将数据导入hive中。首先,spark从数据库中读取原始数据并进行数据清洗,求出城市中所有点的平均值代码如下:

    mpInfoList = spark.read.format("com.mongodb.spark.sql.DefaultSource") \
        .option("spark.mongodb.input.uri", MONITOR_POINT_INFO_URL) \
        .option("pipeline", matchCity) \
        .load().select("ID").rdd.map(lambda x: x.ID).collect()
    print(mpInfoList)
    airQualityData = spark.read.format("com.mongodb.spark.sql.DefaultSource")\
        .option("spark.mongodb.input.uri", INPUT_URL)\
        .load()
    airQualityData = airQualityData.filter(airQualityData.NodeIdentifier.isin(mpInfoList))

    airQualityData = airQualityData.groupBy('ComponentTime')\
        .avg('pm25', 'temp', 'press', 'humi', 'wind_speed', 'wind_dir')\
        .orderBy(airQualityData.ComponentTime)\
        .withColumnRenamed('avg(pm25)', 'pm25')\
        .withColumnRenamed('avg(temp)', 'temp')\
        .withColumnRenamed('avg(press)', 'press')\
        .withColumnRenamed('avg(humi)', 'humi')\
        .withColumnRenamed('avg(wind_speed)', 'wind_speed')\
        .withColumnRenamed('avg(wind_dir)', 'wind_dir')

随后,将清洗后的数据存入hive中,代码如下:

data.write.saveAsTable("test.airData", None, "overwrite", None)

3、spark从hive中读取数据

调用sparkSession.sql从hive中读取数据,代码如下:

    data = spark.sql("select * from test.airData")
    data1 = data.orderBy(data.ComponentTime)

相关文章

网友评论

    本文标题:spark读写数据仓库

    本文链接:https://www.haomeiwen.com/subject/zigrxctx.html