from pyspark import SparkConf, SparkContext,SQLContext
from pyspark.sql import Row
conf = SparkConf()
sc = SparkContext(conf=conf)
#设置错误级别
sc.setLogLevel("error")
sqlContext = SQLContext(sc)
def run(outfile):
origin_data = sc.textFile("filepath").map(lambda x: x.split("\t"))
first = origin_data.first()
# 过滤第一行
whole= origin_data.filter(lambda x: x != first)
course_order = whole.map(lambda x: (int(x[0]), int(x[1]), int(x[2]), int(x[3]))). \
filter(lambda x: int(x[3]) == 3). \
filter(lambda x: x[2] <= 100). \
filter(lambda x: x[0] != 0). \
map(lambda x: (int(x[0]), int(x[1]))). \
map(lambda x: Row(user_id=int(x[0]), num=int(x[1])))
out = sqlContext.createDataFrame(course_order).\
#分组
groupBy("user_id"). \
#聚合
agg({"num": "sum"}). \
#列重命名
withColumnRenamed("sum(num)", "num")
#csv file
out.repartition(1).write.format("csv").option("header", "false").mode("append").save(outfile)
#sql file
out.rdd.map(lambda x:sq % (x['user_id'] %10, x['num'], x['user_id'])).repartition(1).saveAsTextFile('sql.csv')
if __name__ == '__main__':
run("out")
网友评论