美文网首页kafka Stream
kafka stream word count实例

kafka stream word count实例

作者: go4it | 来源:发表于2017-10-14 12:53 被阅读24次

    kafka呢其实正道不是消息队列,本质是日志存储系统,而stream processing是其最近大力推广的特性,本文简单介绍下word count的实例。

    maven

            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-streams</artifactId>
                <version>0.10.2.1</version>
            </dependency>
    

    准备topic

    sh kafka-topics.sh --create --topic wc-input --replication-factor 1 --partitions 1 --zookeeper localhost:2181
    

    一个是wc-input,然后输出呢,为了简单方便,这里采用控制台输出,当然也可以输出到另外一个topic等等。

    配置参数

    Properties props = new Properties();
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, "word-count-demo");
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
            props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    
            // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
            // Note: To re-run the demo, you need to use the offset reset tool:
            // https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    

    构造KStream

            KStreamBuilder builder = new KStreamBuilder();
            KStream<String, String> source = builder.stream("wc-input");
            KTable<String, Long> counts = source
                    .flatMapValues(new ValueMapper<String, Iterable<String>>() {
                        @Override
                        public Iterable<String> apply(String value) {
                            return Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "));
                        }
                    }).map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
                        @Override
                        public KeyValue<String, String> apply(String key, String value) {
                            return new KeyValue<>(value, value);
                        }
                    })
                    .groupByKey()
                    .count("Counts");
            counts.print();
            KafkaStreams streams = new KafkaStreams(builder, props);
    

    启动/关闭

            final CountDownLatch latch = new CountDownLatch(1);
            // attach shutdown handler to catch control-c
            Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") {
                @Override
                public void run() {
                    streams.close();
                    latch.countDown();
                }
            });
    
            try {
                streams.start();
                latch.await();
            } catch (Throwable e) {
                e.printStackTrace();
            }
    

    测试

    输入

    sh kafka-console-producer.sh --broker-list localhost:9092 --topic wc-input
    

    Kafka Streams is a client library for building applications and microservices, where the input and output data are stored in Kafka clusters. It combines the simplicity of writing and deploying standard Java and Scala applications on the client side with the benefits of Kafka's server-side cluster technology.

    输出

    [KSTREAM-AGGREGATE-0000000003]: streams , (1<-null)
    [KSTREAM-AGGREGATE-0000000003]: is , (1<-null)
    [KSTREAM-AGGREGATE-0000000003]: a , (1<-null)
    [KSTREAM-AGGREGATE-0000000003]: library , (1<-null)
    [KSTREAM-AGGREGATE-0000000003]: for , (1<-null)
    [KSTREAM-AGGREGATE-0000000003]: building , (1<-null)
    [KSTREAM-AGGREGATE-0000000003]: microservices, , (1<-null)
    [KSTREAM-AGGREGATE-0000000003]: where , (1<-null)
    [KSTREAM-AGGREGATE-0000000003]: input , (1<-null)
    [KSTREAM-AGGREGATE-0000000003]: output , (1<-null)
    [KSTREAM-AGGREGATE-0000000003]: data , (1<-null)
    [KSTREAM-AGGREGATE-0000000003]: are , (1<-null)
    [KSTREAM-AGGREGATE-0000000003]: stored , (1<-null)
    [KSTREAM-AGGREGATE-0000000003]: in , (1<-null)
    [KSTREAM-AGGREGATE-0000000003]: kafka , (2<-null)
    [KSTREAM-AGGREGATE-0000000003]: clusters. , (1<-null)
    [KSTREAM-AGGREGATE-0000000003]: it , (1<-null)
    [KSTREAM-AGGREGATE-0000000003]: combines , (1<-null)
    [KSTREAM-AGGREGATE-0000000003]: simplicity , (1<-null)
    [KSTREAM-AGGREGATE-0000000003]: writing , (1<-null)
    [KSTREAM-AGGREGATE-0000000003]: deploying , (1<-null)
    [KSTREAM-AGGREGATE-0000000003]: standard , (1<-null)
    [KSTREAM-AGGREGATE-0000000003]: java , (1<-null)
    [KSTREAM-AGGREGATE-0000000003]: and , (5<-null)
    [KSTREAM-AGGREGATE-0000000003]: scala , (1<-null)
    [KSTREAM-AGGREGATE-0000000003]: applications , (2<-null)
    [KSTREAM-AGGREGATE-0000000003]: on , (1<-null)
    [KSTREAM-AGGREGATE-0000000003]: client , (2<-null)
    [KSTREAM-AGGREGATE-0000000003]: side , (1<-null)
    [KSTREAM-AGGREGATE-0000000003]: with , (1<-null)
    [KSTREAM-AGGREGATE-0000000003]: the , (4<-null)
    [KSTREAM-AGGREGATE-0000000003]: benefits , (1<-null)
    [KSTREAM-AGGREGATE-0000000003]: of , (2<-null)
    [KSTREAM-AGGREGATE-0000000003]: kafka's , (1<-null)
    [KSTREAM-AGGREGATE-0000000003]: server-side , (1<-null)
    [KSTREAM-AGGREGATE-0000000003]: cluster , (1<-null)
    

    doc

    相关文章

      网友评论

        本文标题:kafka stream word count实例

        本文链接:https://www.haomeiwen.com/subject/ugmxuxtx.html