美文网首页
spark python分析环境空气污染数据

spark python分析环境空气污染数据

作者: reco171 | 来源:发表于2019-01-18 10:05 被阅读0次

    spark python分析环境空气污染数据
    在测试中发现不能实现spark读取dataframe后,遍历dataframe元素,分别再spark read操作,这样会出现序列化(serialize)错误。所以只能分别通过spark read方式、psycopg2方式读取postgresql数据。
    完整代码例子如下:

    from pyspark.sql import SparkSession
    import psycopg2
    
    import os
    
    sparkClassPath = os.getenv('SPARK_CLASSPATH', '/home/hadoop/spark-2.4.0-bin-hadoop2.6/jars/*')
    spark = SparkSession \
        .builder \
        .appName("Python Spark SQL basic example") \
        .master("local") \
        .config("spark.driver.extraClassPath", sparkClassPath) \
        .getOrCreate()
    gradeDF = spark.read \
        .format("jdbc") \
        .option("url", "jdbc:postgresql://192.168.1.4:5432/testinfo") \
        .option("driver", "org.postgresql.Driver") \
        .option("dbtable", "sjy_grade_threshhold") \
        .option("user", "postgres") \
        .option("password", "***") \
        .load()
    gradeDF.cache
    #psycopg2查询数据
    def get_data(database_info, sql):
        conn = psycopg2.connect(database=database_info["database"],
                                user=database_info["user"],
                                password=database_info["password"],
                                host=database_info["host"],
                                port=database_info["port"])
        cur = conn.cursor()
        try:
            cur.execute(sql)
            # 获取表的所有字段名称
            coloumns = [row[0] for row in cur.description]
            result = [[str(item) for item in row] for row in cur.fetchall()]
            return [dict(zip(coloumns, row)) for row in result]
        except Exception as ex:
            print(ex)
        finally:
            conn.close()
    #psycopg2更新数据
    def update_data(database_info, sql):
        conn = psycopg2.connect(database=database_info["database"],
                                user=database_info["user"],
                                password=database_info["password"],
                                host=database_info["host"],
                                port=database_info["port"])
        cur = conn.cursor()
        try:
            cur.execute(sql)
            conn.commit()
            return cur.description
        except Exception as ex:
            print(ex)
        finally:
            conn.close()
    
    # 数据库连接信息
    database_info = {
        "database": "testinfo",
        "user": "postgres",
        "password": "***",
        "host": "192.168.1.4",
        "port": "5432"
    }
    #"so2_24", 151
    def findBpIaqiByCp(pollt, cp):
        bpFiterStr = pollt + " >= " + str(cp)
        bpRow = gradeDF.select("grade", "iaqi", pollt).where(bpFiterStr).first()
        bpg = bpRow.grade - 1
        bpLFilterStr = "grade = " + str(bpg)
        bpLRow = gradeDF.select("grade", "iaqi", pollt).where(bpLFilterStr).first()
        bpIaqiDic = {"cp":cp, "bph":bpRow[pollt], "bpl":bpLRow[pollt], "iaqih":bpRow["iaqi"],"iaqil":bpLRow.iaqi}
        print("findBpIaqiByCp bpIaqiDic: ", bpIaqiDic)
        return bpIaqiDic
    
    
    #具体计算污染物指数 IAQIP = (IAQIH - IAQIL)/(BPH - BPL)*(CP - BPL)+IAQIL
    def computeiaqi(cp, bph, bpl, iaqih, iaqil):
        iaqip = (iaqih - iaqil) / (bph - bpl) * (float(cp) - bpl) + iaqil
        print("computeiaqi result cp : ", cp, ", iaqi: ", iaqip)
        return iaqip
    
    #计算单项污染物指数"so2_24", 151
    def computeIaqiByPolltCp(pollt, cp):
        if pollt == "pm2d5" and float(cp) > 500:
            cp = 500
        if pollt == "pm10" and float(cp) > 500:
            cp = 600
        bpIaqi = findBpIaqiByCp(pollt, cp)
        iaqip = computeiaqi(cp, bpIaqi["bph"],bpIaqi["bpl"],bpIaqi["iaqih"],bpIaqi["iaqil"])
        print("computeIaqiByPolltCp pollt: ", pollt, " ",iaqip)
        return iaqip
    #spark read方式读取环境空气污染数据,暂时弃用该方法
    airDF = spark.read \
        .format("jdbc") \
        .option("url", "jdbc:postgresql://192.168.1.4:5432/testinfo") \
        .option("driver", "org.postgresql.Driver") \
        .option("dbtable", "sjy_air") \
        .option("user", "postgres") \
        .option("password", "***") \
        .load()
    
    pollt_type = ("co", "no2", "o38h", "pm10", "pm2d5", "so2")
    
    #stationname, date; co, no2, o3, pm10, pm2d5, so2,o38h
    #aqi, firstpollt
    
    ###airDF.printSchema()
    ###airDF.select("co", "no2", "o38h").show()
    
    #airDF.select("o38h").foreach(print)
    #airDF.select("o38h").map(lambda p: print(p.o38h))
    #airDF.map(lambda x: computeIaqiByPolltCp())
    #该方法暂时弃用
    def computeaqi(row):
        for each in pollt_type:
            print(row[each], row["date"], row["stationname"], each)
            #computeIaqiByPolltCp(each, row[each])
    #airDF.foreach(computeaqi)
    #选择具体的污染物,组sql
    def sqljoin(table, pollut_type):
        sql = "select stationname, date"
        for each in pollt_type:
            sql = sql + ", " + each;
        sql = sql + " from " + table
        return sql
    
    def main():
        sql = sqljoin("sjy_air", pollt_type)
        data = get_data(database_info, sql)
        list = []  ## 空列表
        updatesql = ""
        count = 0
        for item in data:
            print("for each count:", count, item["stationname"], item["date"])
            if count % 10 == 0 and count != 0:
                print("exec updatesql: ", updatesql, count)
                update_data(database_info, updatesql)
                updatesql = ""
            resdic = {"stationname": item["stationname"], "date": item["date"]}
            # each item
            polltvalueres = 0
            polltres = ""
            for each in pollt_type:
                print("for each pollt count: ", count, each, item[each])
                polltvalue = computeIaqiByPolltCp(each, item[each])
                if polltvalue >= polltvalueres:
                    polltvalueres = polltvalue
                    polltres = each
                resdic["firstpollt"] = polltres
                resdic["aqi"] = polltvalueres
            updatesql = updatesql + "update sjy_air set firstpollt = '" + polltres + "', aqi = " + str(
                round(polltvalueres, 2)) + " where date = '" + item["date"] + "' and stationname = '" + item[
                            "stationname"] + "';"
            count = count + 1
        if count % 10 != 0:
            print(updatesql, count)
            update_data(database_info, updatesql)
    

    相关文章

      网友评论

          本文标题:spark python分析环境空气污染数据

          本文链接:https://www.haomeiwen.com/subject/rekvdqtx.html