前言:
本文章适用于在Windows上使用Kafka Streams处理数据,统计单词出现次数。
官方demo
源码
WordCountDemo
// Serializers/deserializers (serde) for String and Long types
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
// Construct a `KStream` from the input topic "streams-plaintext-input", where message values
// represent lines of text (for the sake of this example, we ignore whatever may be stored
// in the message keys).
KStream<String, String> textLines = builder.stream("streams-plaintext-input",
Consumed.with(stringSerde, stringSerde);
KTable<String, Long> wordCounts = textLines
// Split each text line, by whitespace, into words.
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
// Group the text words as message keys
.groupBy((key, value) -> value)
// Count the occurrences of each word (message key).
.count()
// Store the running counts as a changelog stream to the output topic.
wordCounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
正式开始
启动Kafka 此处省略Kafka启动步骤,详见链接
切换路径 D:\com\kafka_2.11-2.0.1\bin\windows
创建Topic streams-plaintext-input
kafka-run-class.bat kafka.admin.TopicCommand --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic streams-plaintext-input
创建Topic streams-wordcount-output
kafka-run-class.bat kafka.admin.TopicCommand --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic streams-wordcount-output --config cleanup.policy=compact
启动Application
kafka-run-class.bat org.apache.kafka.streams.examples.wordcount.WordCountDemo
启动生产者(后续写入)
kafka-console-producer.bat --broker-list localhost:9092 --topic streams-plaintext-input
启动消费者1 观察分词
kafka-console-consumer.bat --bootstrap-server localhost:9092
--topic streams-wordcount-output
--from-beginning
--formatter kafka.tools.DefaultMessageFormatter
--property print.key=true
--property print.value=true
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
启动消费者2 观察接收输入
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic execsrc --from-beginning
在生产者Console中持续写入数条语句观察
image.png
网友评论