美文网首页
spark python分析环境空气污染数据

spark python分析环境空气污染数据

作者: reco171 | 来源:发表于2019-01-18 10:05 被阅读0次

spark python分析环境空气污染数据
在测试中发现不能实现spark读取dataframe后,遍历dataframe元素,分别再spark read操作,这样会出现序列化(serialize)错误。所以只能分别通过spark read方式、psycopg2方式读取postgresql数据。
完整代码例子如下:

from pyspark.sql import SparkSession
import psycopg2

import os

sparkClassPath = os.getenv('SPARK_CLASSPATH', '/home/hadoop/spark-2.4.0-bin-hadoop2.6/jars/*')
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .master("local") \
    .config("spark.driver.extraClassPath", sparkClassPath) \
    .getOrCreate()
gradeDF = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://192.168.1.4:5432/testinfo") \
    .option("driver", "org.postgresql.Driver") \
    .option("dbtable", "sjy_grade_threshhold") \
    .option("user", "postgres") \
    .option("password", "***") \
    .load()
gradeDF.cache
#psycopg2查询数据
def get_data(database_info, sql):
    conn = psycopg2.connect(database=database_info["database"],
                            user=database_info["user"],
                            password=database_info["password"],
                            host=database_info["host"],
                            port=database_info["port"])
    cur = conn.cursor()
    try:
        cur.execute(sql)
        # 获取表的所有字段名称
        coloumns = [row[0] for row in cur.description]
        result = [[str(item) for item in row] for row in cur.fetchall()]
        return [dict(zip(coloumns, row)) for row in result]
    except Exception as ex:
        print(ex)
    finally:
        conn.close()
#psycopg2更新数据
def update_data(database_info, sql):
    conn = psycopg2.connect(database=database_info["database"],
                            user=database_info["user"],
                            password=database_info["password"],
                            host=database_info["host"],
                            port=database_info["port"])
    cur = conn.cursor()
    try:
        cur.execute(sql)
        conn.commit()
        return cur.description
    except Exception as ex:
        print(ex)
    finally:
        conn.close()

# 数据库连接信息
database_info = {
    "database": "testinfo",
    "user": "postgres",
    "password": "***",
    "host": "192.168.1.4",
    "port": "5432"
}
#"so2_24", 151
def findBpIaqiByCp(pollt, cp):
    bpFiterStr = pollt + " >= " + str(cp)
    bpRow = gradeDF.select("grade", "iaqi", pollt).where(bpFiterStr).first()
    bpg = bpRow.grade - 1
    bpLFilterStr = "grade = " + str(bpg)
    bpLRow = gradeDF.select("grade", "iaqi", pollt).where(bpLFilterStr).first()
    bpIaqiDic = {"cp":cp, "bph":bpRow[pollt], "bpl":bpLRow[pollt], "iaqih":bpRow["iaqi"],"iaqil":bpLRow.iaqi}
    print("findBpIaqiByCp bpIaqiDic: ", bpIaqiDic)
    return bpIaqiDic


#具体计算污染物指数 IAQIP = (IAQIH - IAQIL)/(BPH - BPL)*(CP - BPL)+IAQIL
def computeiaqi(cp, bph, bpl, iaqih, iaqil):
    iaqip = (iaqih - iaqil) / (bph - bpl) * (float(cp) - bpl) + iaqil
    print("computeiaqi result cp : ", cp, ", iaqi: ", iaqip)
    return iaqip

#计算单项污染物指数"so2_24", 151
def computeIaqiByPolltCp(pollt, cp):
    if pollt == "pm2d5" and float(cp) > 500:
        cp = 500
    if pollt == "pm10" and float(cp) > 500:
        cp = 600
    bpIaqi = findBpIaqiByCp(pollt, cp)
    iaqip = computeiaqi(cp, bpIaqi["bph"],bpIaqi["bpl"],bpIaqi["iaqih"],bpIaqi["iaqil"])
    print("computeIaqiByPolltCp pollt: ", pollt, " ",iaqip)
    return iaqip
#spark read方式读取环境空气污染数据,暂时弃用该方法
airDF = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://192.168.1.4:5432/testinfo") \
    .option("driver", "org.postgresql.Driver") \
    .option("dbtable", "sjy_air") \
    .option("user", "postgres") \
    .option("password", "***") \
    .load()

pollt_type = ("co", "no2", "o38h", "pm10", "pm2d5", "so2")

#stationname, date; co, no2, o3, pm10, pm2d5, so2,o38h
#aqi, firstpollt

###airDF.printSchema()
###airDF.select("co", "no2", "o38h").show()

#airDF.select("o38h").foreach(print)
#airDF.select("o38h").map(lambda p: print(p.o38h))
#airDF.map(lambda x: computeIaqiByPolltCp())
#该方法暂时弃用
def computeaqi(row):
    for each in pollt_type:
        print(row[each], row["date"], row["stationname"], each)
        #computeIaqiByPolltCp(each, row[each])
#airDF.foreach(computeaqi)
#选择具体的污染物,组sql
def sqljoin(table, pollut_type):
    sql = "select stationname, date"
    for each in pollt_type:
        sql = sql + ", " + each;
    sql = sql + " from " + table
    return sql

def main():
    sql = sqljoin("sjy_air", pollt_type)
    data = get_data(database_info, sql)
    list = []  ## 空列表
    updatesql = ""
    count = 0
    for item in data:
        print("for each count:", count, item["stationname"], item["date"])
        if count % 10 == 0 and count != 0:
            print("exec updatesql: ", updatesql, count)
            update_data(database_info, updatesql)
            updatesql = ""
        resdic = {"stationname": item["stationname"], "date": item["date"]}
        # each item
        polltvalueres = 0
        polltres = ""
        for each in pollt_type:
            print("for each pollt count: ", count, each, item[each])
            polltvalue = computeIaqiByPolltCp(each, item[each])
            if polltvalue >= polltvalueres:
                polltvalueres = polltvalue
                polltres = each
            resdic["firstpollt"] = polltres
            resdic["aqi"] = polltvalueres
        updatesql = updatesql + "update sjy_air set firstpollt = '" + polltres + "', aqi = " + str(
            round(polltvalueres, 2)) + " where date = '" + item["date"] + "' and stationname = '" + item[
                        "stationname"] + "';"
        count = count + 1
    if count % 10 != 0:
        print(updatesql, count)
        update_data(database_info, updatesql)

相关文章

网友评论

      本文标题:spark python分析环境空气污染数据

      本文链接:https://www.haomeiwen.com/subject/rekvdqtx.html