spark python分析环境空气污染数据
在测试中发现不能实现spark读取dataframe后,遍历dataframe元素,分别再spark read操作,这样会出现序列化(serialize)错误。所以只能分别通过spark read方式、psycopg2方式读取postgresql数据。
完整代码例子如下:
from pyspark.sql import SparkSession
import psycopg2
import os
sparkClassPath = os.getenv('SPARK_CLASSPATH', '/home/hadoop/spark-2.4.0-bin-hadoop2.6/jars/*')
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.master("local") \
.config("spark.driver.extraClassPath", sparkClassPath) \
.getOrCreate()
gradeDF = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql://192.168.1.4:5432/testinfo") \
.option("driver", "org.postgresql.Driver") \
.option("dbtable", "sjy_grade_threshhold") \
.option("user", "postgres") \
.option("password", "***") \
.load()
gradeDF.cache
#psycopg2查询数据
def get_data(database_info, sql):
conn = psycopg2.connect(database=database_info["database"],
user=database_info["user"],
password=database_info["password"],
host=database_info["host"],
port=database_info["port"])
cur = conn.cursor()
try:
cur.execute(sql)
# 获取表的所有字段名称
coloumns = [row[0] for row in cur.description]
result = [[str(item) for item in row] for row in cur.fetchall()]
return [dict(zip(coloumns, row)) for row in result]
except Exception as ex:
print(ex)
finally:
conn.close()
#psycopg2更新数据
def update_data(database_info, sql):
conn = psycopg2.connect(database=database_info["database"],
user=database_info["user"],
password=database_info["password"],
host=database_info["host"],
port=database_info["port"])
cur = conn.cursor()
try:
cur.execute(sql)
conn.commit()
return cur.description
except Exception as ex:
print(ex)
finally:
conn.close()
# 数据库连接信息
database_info = {
"database": "testinfo",
"user": "postgres",
"password": "***",
"host": "192.168.1.4",
"port": "5432"
}
#"so2_24", 151
def findBpIaqiByCp(pollt, cp):
bpFiterStr = pollt + " >= " + str(cp)
bpRow = gradeDF.select("grade", "iaqi", pollt).where(bpFiterStr).first()
bpg = bpRow.grade - 1
bpLFilterStr = "grade = " + str(bpg)
bpLRow = gradeDF.select("grade", "iaqi", pollt).where(bpLFilterStr).first()
bpIaqiDic = {"cp":cp, "bph":bpRow[pollt], "bpl":bpLRow[pollt], "iaqih":bpRow["iaqi"],"iaqil":bpLRow.iaqi}
print("findBpIaqiByCp bpIaqiDic: ", bpIaqiDic)
return bpIaqiDic
#具体计算污染物指数 IAQIP = (IAQIH - IAQIL)/(BPH - BPL)*(CP - BPL)+IAQIL
def computeiaqi(cp, bph, bpl, iaqih, iaqil):
iaqip = (iaqih - iaqil) / (bph - bpl) * (float(cp) - bpl) + iaqil
print("computeiaqi result cp : ", cp, ", iaqi: ", iaqip)
return iaqip
#计算单项污染物指数"so2_24", 151
def computeIaqiByPolltCp(pollt, cp):
if pollt == "pm2d5" and float(cp) > 500:
cp = 500
if pollt == "pm10" and float(cp) > 500:
cp = 600
bpIaqi = findBpIaqiByCp(pollt, cp)
iaqip = computeiaqi(cp, bpIaqi["bph"],bpIaqi["bpl"],bpIaqi["iaqih"],bpIaqi["iaqil"])
print("computeIaqiByPolltCp pollt: ", pollt, " ",iaqip)
return iaqip
#spark read方式读取环境空气污染数据,暂时弃用该方法
airDF = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql://192.168.1.4:5432/testinfo") \
.option("driver", "org.postgresql.Driver") \
.option("dbtable", "sjy_air") \
.option("user", "postgres") \
.option("password", "***") \
.load()
pollt_type = ("co", "no2", "o38h", "pm10", "pm2d5", "so2")
#stationname, date; co, no2, o3, pm10, pm2d5, so2,o38h
#aqi, firstpollt
###airDF.printSchema()
###airDF.select("co", "no2", "o38h").show()
#airDF.select("o38h").foreach(print)
#airDF.select("o38h").map(lambda p: print(p.o38h))
#airDF.map(lambda x: computeIaqiByPolltCp())
#该方法暂时弃用
def computeaqi(row):
for each in pollt_type:
print(row[each], row["date"], row["stationname"], each)
#computeIaqiByPolltCp(each, row[each])
#airDF.foreach(computeaqi)
#选择具体的污染物,组sql
def sqljoin(table, pollut_type):
sql = "select stationname, date"
for each in pollt_type:
sql = sql + ", " + each;
sql = sql + " from " + table
return sql
def main():
sql = sqljoin("sjy_air", pollt_type)
data = get_data(database_info, sql)
list = [] ## 空列表
updatesql = ""
count = 0
for item in data:
print("for each count:", count, item["stationname"], item["date"])
if count % 10 == 0 and count != 0:
print("exec updatesql: ", updatesql, count)
update_data(database_info, updatesql)
updatesql = ""
resdic = {"stationname": item["stationname"], "date": item["date"]}
# each item
polltvalueres = 0
polltres = ""
for each in pollt_type:
print("for each pollt count: ", count, each, item[each])
polltvalue = computeIaqiByPolltCp(each, item[each])
if polltvalue >= polltvalueres:
polltvalueres = polltvalue
polltres = each
resdic["firstpollt"] = polltres
resdic["aqi"] = polltvalueres
updatesql = updatesql + "update sjy_air set firstpollt = '" + polltres + "', aqi = " + str(
round(polltvalueres, 2)) + " where date = '" + item["date"] + "' and stationname = '" + item[
"stationname"] + "';"
count = count + 1
if count % 10 != 0:
print(updatesql, count)
update_data(database_info, updatesql)
网友评论