美文网首页大数据
spark-structstreaming 读取kafka数据

spark-structstreaming 读取kafka数据

作者: 一个懒散的人 | 来源:发表于2020-11-23 16:20 被阅读0次

环境:pycharm + python3.5 + kafka-2.11 + kafka自带zookeeper

目前网上关于spark-structsteaming 的word count 的样例千篇一律,但是基本上没见过用其他样例讲的,这次我就来说下,与word count 不一样的地方在于,按照输入的字符串任意定义列,而不是单纯的将第二列设置为1进行相加统计

需求:按姓名统计学生总分数

输入数据:

>ren 98
>hong 99
>song 97
>li 98 
>ren 92
>

程序代码:

# encoding=utf8
from pyspark.sql import SparkSession, column
from pyspark.sql.functions import *
from pyspark import SparkContext



if __name__ == "__main__":
    sc = SparkContext(appName='kafka_pyspark_test',master="local" ).getOrCreate()
    spark = SparkSession(sc)

    lines = spark.readStream.format("kafka")\
        .option("kafka.bootstrap.servers","192.168.31.128:19092")\
        .option("subscribe","test")\
        .load()\
        .selectExpr("cast(value as string)")
    lines.printSchema()

    wc = lines.select(split(lines.value," ").alias("stu_rank"))
    xx = wc.select(wc["stu_rank"][0].alias("name"),wc["stu_rank"][1].alias("score"))
    yy = xx.groupBy("name").agg(sum("score").alias("score"))
    dd = yy.filter("score is not null")
    query = dd\
        .writeStream\
        .format("console")\
        .outputMode("complete")\
        .start()

    query.awaitTermination()

输出结果:

Batch: 5
-------------------------------------------
+----+-----+
|name|score|
+----+-----+
|hong| 99.0|
|song| 97.0|
|  li| 98.0|
| ren| 98.0|
+----+-----+

-------------------------------------------
Batch: 6
-------------------------------------------
+----+-----+
|name|score|
+----+-----+
|hong| 99.0|
|song| 97.0|
|  li| 98.0|
| ren|190.0|
+----+-----+

ok!

相关文章

网友评论

    本文标题:spark-structstreaming 读取kafka数据

    本文链接:https://www.haomeiwen.com/subject/hcrviktx.html