美文网首页
使用Kafka Streams 处理数据

使用Kafka Streams 处理数据

作者: CNSTT | 来源:发表于2018-12-17 15:42 被阅读0次

前言:

本文章适用于在Windows上使用Kafka Streams处理数据,统计单词出现次数。
官方demo

源码

WordCountDemo

// Serializers/deserializers (serde) for String and Long types
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
 
// Construct a `KStream` from the input topic "streams-plaintext-input", where message values
// represent lines of text (for the sake of this example, we ignore whatever may be stored
// in the message keys).
KStream<String, String> textLines = builder.stream("streams-plaintext-input",
    Consumed.with(stringSerde, stringSerde);
 
KTable<String, Long> wordCounts = textLines
    // Split each text line, by whitespace, into words.
    .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
 
    // Group the text words as message keys
    .groupBy((key, value) -> value)
 
    // Count the occurrences of each word (message key).
    .count()
 
// Store the running counts as a changelog stream to the output topic.
wordCounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));

正式开始

启动Kafka 此处省略Kafka启动步骤,详见链接
切换路径 D:\com\kafka_2.11-2.0.1\bin\windows
创建Topic streams-plaintext-input

kafka-run-class.bat kafka.admin.TopicCommand --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic streams-plaintext-input

创建Topic streams-wordcount-output

kafka-run-class.bat kafka.admin.TopicCommand --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic streams-wordcount-output --config cleanup.policy=compact

启动Application

kafka-run-class.bat org.apache.kafka.streams.examples.wordcount.WordCountDemo

启动生产者(后续写入)

kafka-console-producer.bat --broker-list localhost:9092 --topic streams-plaintext-input

启动消费者1 观察分词

kafka-console-consumer.bat --bootstrap-server localhost:9092 
--topic streams-wordcount-output 
--from-beginning 
--formatter kafka.tools.DefaultMessageFormatter 
--property print.key=true 
--property print.value=true 
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer 
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer 

启动消费者2 观察接收输入

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic execsrc --from-beginning

在生产者Console中持续写入数条语句观察


image.png

至此在Windows环境下使用Kafka Streams处理数据(Demo)完成!

谢谢阅读,有帮助的点个❤!

相关文章

网友评论

      本文标题:使用Kafka Streams 处理数据

      本文链接:https://www.haomeiwen.com/subject/smtckqtx.html