环境:pycharm + python3.5 + kafka-2.11 + kafka自带zookeeper
目前网上关于spark-structsteaming 的word count 的样例千篇一律,但是基本上没见过用其他样例讲的,这次我就来说下,与word count 不一样的地方在于,按照输入的字符串任意定义列,而不是单纯的将第二列设置为1进行相加统计:
需求:按姓名统计学生总分数
输入数据:
>ren 98
>hong 99
>song 97
>li 98
>ren 92
>
程序代码:
# encoding=utf8
from pyspark.sql import SparkSession, column
from pyspark.sql.functions import *
from pyspark import SparkContext
if __name__ == "__main__":
sc = SparkContext(appName='kafka_pyspark_test',master="local" ).getOrCreate()
spark = SparkSession(sc)
lines = spark.readStream.format("kafka")\
.option("kafka.bootstrap.servers","192.168.31.128:19092")\
.option("subscribe","test")\
.load()\
.selectExpr("cast(value as string)")
lines.printSchema()
wc = lines.select(split(lines.value," ").alias("stu_rank"))
xx = wc.select(wc["stu_rank"][0].alias("name"),wc["stu_rank"][1].alias("score"))
yy = xx.groupBy("name").agg(sum("score").alias("score"))
dd = yy.filter("score is not null")
query = dd\
.writeStream\
.format("console")\
.outputMode("complete")\
.start()
query.awaitTermination()
输出结果:
Batch: 5
-------------------------------------------
+----+-----+
|name|score|
+----+-----+
|hong| 99.0|
|song| 97.0|
| li| 98.0|
| ren| 98.0|
+----+-----+
-------------------------------------------
Batch: 6
-------------------------------------------
+----+-----+
|name|score|
+----+-----+
|hong| 99.0|
|song| 97.0|
| li| 98.0|
| ren|190.0|
+----+-----+
ok!
网友评论