美文网首页
使用Kafka Streams 处理数据

使用Kafka Streams 处理数据

作者: CNSTT | 来源:发表于2018-12-17 15:42 被阅读0次

    前言:

    本文章适用于在Windows上使用Kafka Streams处理数据,统计单词出现次数。
    官方demo

    源码

    WordCountDemo

    // Serializers/deserializers (serde) for String and Long types
    final Serde<String> stringSerde = Serdes.String();
    final Serde<Long> longSerde = Serdes.Long();
     
    // Construct a `KStream` from the input topic "streams-plaintext-input", where message values
    // represent lines of text (for the sake of this example, we ignore whatever may be stored
    // in the message keys).
    KStream<String, String> textLines = builder.stream("streams-plaintext-input",
        Consumed.with(stringSerde, stringSerde);
     
    KTable<String, Long> wordCounts = textLines
        // Split each text line, by whitespace, into words.
        .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
     
        // Group the text words as message keys
        .groupBy((key, value) -> value)
     
        // Count the occurrences of each word (message key).
        .count()
     
    // Store the running counts as a changelog stream to the output topic.
    wordCounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
    

    正式开始

    启动Kafka 此处省略Kafka启动步骤,详见链接
    切换路径 D:\com\kafka_2.11-2.0.1\bin\windows
    创建Topic streams-plaintext-input

    kafka-run-class.bat kafka.admin.TopicCommand --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic streams-plaintext-input
    

    创建Topic streams-wordcount-output

    kafka-run-class.bat kafka.admin.TopicCommand --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic streams-wordcount-output --config cleanup.policy=compact
    

    启动Application

    kafka-run-class.bat org.apache.kafka.streams.examples.wordcount.WordCountDemo
    

    启动生产者(后续写入)

    kafka-console-producer.bat --broker-list localhost:9092 --topic streams-plaintext-input
    

    启动消费者1 观察分词

    kafka-console-consumer.bat --bootstrap-server localhost:9092 
    --topic streams-wordcount-output 
    --from-beginning 
    --formatter kafka.tools.DefaultMessageFormatter 
    --property print.key=true 
    --property print.value=true 
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer 
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer 
    

    启动消费者2 观察接收输入

    kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic execsrc --from-beginning
    

    在生产者Console中持续写入数条语句观察


    image.png

    至此在Windows环境下使用Kafka Streams处理数据(Demo)完成!

    谢谢阅读,有帮助的点个❤!

    相关文章

      网友评论

          本文标题:使用Kafka Streams 处理数据

          本文链接:https://www.haomeiwen.com/subject/smtckqtx.html