美文网首页
kafka-2.7.0(五)Kafka Streams 流处理

kafka-2.7.0(五)Kafka Streams 流处理

作者: _大叔_ | 来源:发表于2021-03-02 15:04 被阅读0次

    简述

    简单来说,Kafka Streams就是可以在kafka内部实现一套零延时、可定制的数据计算处理逻辑,可以把不连续的数据进行计算。

    入门

    启动 zookeeper 和 kafka

    bin/zookeeper-server-start.sh config/zookeeper.properties & bin/kafka-server-start.sh -daemon config/server.properties 
    

    创建topic {streams -plaintext-input,streams-plaintext-output}

    bin/kafka-topics.sh --create --zookeeper 192.168.81.62:2181 --replication-factor 1 --partitions 1 --topic streams-plaintext-input 
    bin/kafka-topics.sh --create --zookeeper 192.168.81.62:2181 --replication-factor 1 --partitions 1 --topic streams-plaintext-output
    

    查看topic状态

    bin/kafka-topics.sh --zookeeper 192.168.81.62:2181 --describe 
    

    准备好了这些之后我们就可以运行 Kafka Streams 的 Word Count 程序了。 Kafka 自带了Demo 程序供用户使用

    bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo 
    

    上面的 WordCountDemo 会固定地读取名为 streams-plaintext-input topic ,为读取的每条消息执行 Word Count 程序的转换计算逻辑,然后持续地把处理结果固定写入 streams-wordcount-output中 。当运行上述命令时,会看不到任何输出。需要启动 kafka-console-consumer 才能看到最终消息。但我们先启动 Kafka 自带的 console producer 来生产一些输入数据供 Word Count 程序消费。

    bin/kafka-console-producer.sh --broker-list 192.168.81.62:9092 --topic streams-plaintext-input
    

    启动后暂时不要发送任何数据,接下来新建窗口启动 kafka-console-consumer

    bin/kafka-console-consumer.sh --bootstrap-server 192.168.81.62:9092 \
    --topic streams-wordcount-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
    

    如果以上命令出现 WARN 警告信息,则是正常的,这是因为当 clients 首次向 broker 发送请求获取该 topic数据时,很可能尚未有该 topic 的元数据信息,故 broker 向 clients 返回的响应中会带上 LEADER_NOT_AVAILABLE 异常,表明 clients 应该主动更新元数据。

    接下来发送数据,而在 kafka-console-consumer 窗口就能看到数据统计。

    如果设置的kafka配置不是 localhost:9092 或者 127.0.01:9092,则运行不了,因为他的demo默认bootstrap.server就是localhost:9092,也没地方修改配置,除非修改源码,那就等于自定义一样。

    自定义 stream

    在上面的例子,因为我本身kafka配置的原因导致我无法使用stream,所以把代码改了改,让 bootstrap.server 指向我的配置。

            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-streams</artifactId>
            </dependency>
    
    package com.example.demo.stream;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.common.serialization.Serdes;
    import org.apache.kafka.streams.KafkaStreams;
    import org.apache.kafka.streams.KeyValue;
    import org.apache.kafka.streams.StreamsBuilder;
    import org.apache.kafka.streams.StreamsConfig;
    import org.apache.kafka.streams.kstream.*;
    import org.apache.kafka.streams.kstream.internals.KStreamImpl;
    
    import java.util.Arrays;
    import java.util.Locale;
    import java.util.Properties;
    import java.util.concurrent.CountDownLatch;
    
    /**
     * @author big uncle
     * @date 2021/3/2 13:29
     * @module
     **/
    public class TJStream {
    
    
        public static void main(String[] args) {
    
            Properties properties = new Properties();
            properties.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-wordcount");
            properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.81.62:9092") ;
            properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
            properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
            //使得每次运行程序时都能保证从头消费一次消息。
            properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
    
            StreamsBuilder builder= new StreamsBuilder();
            // 指定输入 topic
            KStream<String, String> source = builder.stream("streams-plaintext-input");
            KTable<String,Long> counts = source.flatMapValues(new ValueMapper<String, Iterable<String>>() {
                @Override
                public Iterable<String> apply(String s) {
                    return Arrays.asList(s.toLowerCase(Locale.getDefault()).split(" "));
                }
            }).groupBy(new KeyValueMapper<String, String, String>() {
                @Override
                public String apply(String s, String s2) {
                    return s2;
                }
            }).count();
            counts
                // 转换 KStream 类型
                .toStream()
                // 把 value 的 long 类型转换位 string 类型
                .map((k,v) -> new KeyValue<String,String>(k,String.valueOf(v)))
                // 发送到这个 topic
                .to("streams-wordcount-output",Produced.with(Serdes.String(),Serdes.String()));
    
            final KafkaStreams streams = new KafkaStreams (builder.build(), properties) ;
            // 添加监控,关闭之后释放资源
            final CountDownLatch latch = new CountDownLatch (1) ;
            Runtime.getRuntime().addShutdownHook (new Thread ("streams-wordcount-shutdown-hook") {
                @Override
                public void run() {
                    streams.close();
                    latch.countDown();
                }
            });
            try {
                // 运行 这里不会阻塞
                streams.start();
                // 阻塞主线程
                latch.await();
            }catch(Throwable e) {
                System.exit(1);
            }
            System.exit(0);
        }
    }
    

    kafka stream 里面的 Api 还是有必要学习一下的

    打成jar包,然后把jar放入 /kafka/libs 下,启动

    bin/kafka-run-class.sh com.example.demo.stream.TJStream
    

    启动之后会有很多警告,不需要管,按照如上的步骤我们继续操作就行,到此就能看到输入的数据。以下是我输出的统计信息,但是consumer在消费的时候比较慢,不是即时的。

    haha    4
    hah     1
    ha      5
    

    我们也可以使用 spring boot 来监听 streams-wordcount-output 这个 topic来接收数据

    相关文章

      网友评论

          本文标题:kafka-2.7.0(五)Kafka Streams 流处理

          本文链接:https://www.haomeiwen.com/subject/jwimfltx.html