美文网首页大数据
spark-structstreaming 读取kafka数据

spark-structstreaming 读取kafka数据

作者: 一个懒散的人 | 来源:发表于2020-11-23 16:20 被阅读0次

    环境:pycharm + python3.5 + kafka-2.11 + kafka自带zookeeper

    目前网上关于spark-structsteaming 的word count 的样例千篇一律,但是基本上没见过用其他样例讲的,这次我就来说下,与word count 不一样的地方在于,按照输入的字符串任意定义列,而不是单纯的将第二列设置为1进行相加统计

    需求:按姓名统计学生总分数

    输入数据:

    >ren 98
    >hong 99
    >song 97
    >li 98 
    >ren 92
    >
    

    程序代码:

    # encoding=utf8
    from pyspark.sql import SparkSession, column
    from pyspark.sql.functions import *
    from pyspark import SparkContext
    
    
    
    if __name__ == "__main__":
        sc = SparkContext(appName='kafka_pyspark_test',master="local" ).getOrCreate()
        spark = SparkSession(sc)
    
        lines = spark.readStream.format("kafka")\
            .option("kafka.bootstrap.servers","192.168.31.128:19092")\
            .option("subscribe","test")\
            .load()\
            .selectExpr("cast(value as string)")
        lines.printSchema()
    
        wc = lines.select(split(lines.value," ").alias("stu_rank"))
        xx = wc.select(wc["stu_rank"][0].alias("name"),wc["stu_rank"][1].alias("score"))
        yy = xx.groupBy("name").agg(sum("score").alias("score"))
        dd = yy.filter("score is not null")
        query = dd\
            .writeStream\
            .format("console")\
            .outputMode("complete")\
            .start()
    
        query.awaitTermination()
    

    输出结果:

    Batch: 5
    -------------------------------------------
    +----+-----+
    |name|score|
    +----+-----+
    |hong| 99.0|
    |song| 97.0|
    |  li| 98.0|
    | ren| 98.0|
    +----+-----+
    
    -------------------------------------------
    Batch: 6
    -------------------------------------------
    +----+-----+
    |name|score|
    +----+-----+
    |hong| 99.0|
    |song| 97.0|
    |  li| 98.0|
    | ren|190.0|
    +----+-----+
    

    ok!

    相关文章

      网友评论

        本文标题:spark-structstreaming 读取kafka数据

        本文链接:https://www.haomeiwen.com/subject/hcrviktx.html