美文网首页
kafka-stream流式编程实现

kafka-stream流式编程实现

作者: 会飞的蜗牛66666 | 来源:发表于2018-11-09 11:31 被阅读0次

1.导入依赖包:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>0.10.2.0</version>
</dependency>

2.实现
public class kafkaStreamDemo {

public static void main(String[] args) throws Exception {

    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:6667,node2:6667.node3:6667");
    props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

    KStreamBuilder builder = new KStreamBuilder();
    KStream<String, String> source = builder.stream("kafkaStream");
    //对value进行操作,构造一个ValueMapper
    final KTable<String, Long> counts = source.flatMapValues(new ValueMapper<String, Iterable<String>>() {
        @Override
        public Iterable<String> apply(String value) {
            //数据格式:java,scala,python,c
            return Arrays.asList(value.toLowerCase(Locale.getDefault()).split(","));//按照逗号切割,并变为集合
        }
    }).map(new KeyValueMapper<String, String, KeyValue<String, ?>>() {
        @Override
        public KeyValue<String, String> apply(String key, String value) {//只取value,按照单词进行分组
            return new KeyValue<>(value, value);
        }
    }).groupByKey().count("countstore");
    counts.print();
    final KafkaStreams streams = new KafkaStreams(builder, props);

    //启动与关闭,开启一个任务执行
    final CountDownLatch latch = new CountDownLatch(1);

    //线程完毕以后释放流
    Runtime.getRuntime().addShutdownHook(new Thread("word-count") {
        @Override
        public void run() {
            streams.close();
            latch.countDown();//流关闭的同时,latch值变为0
        }
    });

    try {
        streams.start();
        latch.await();//线程被挂起,等待latch的值变为0才重新开始执行
    } catch (IllegalStateException e) {
        e.printStackTrace();
    } catch (StreamsException e) {
        e.printStackTrace();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

}
3.结果
输入数据格式:
java,scala,python,c
java,java,c
scala,java,java
java,scala,python,c
java,scala,python,c
.....
得到的结果:
[KSTREAM-AGGREGATE-0000000003]: java , (9<-null)
[KSTREAM-AGGREGATE-0000000003]: scala , (7<-null)
[KSTREAM-AGGREGATE-0000000003]: python , (5<-null)
[KSTREAM-AGGREGATE-0000000003]: c , (5<-null)

kafka-stream作为轻量级的流式处理,处理简单的流业务,如日志监控等,简单指标监控等还是很有必要的。

相关文章

网友评论

      本文标题:kafka-stream流式编程实现

      本文链接:https://www.haomeiwen.com/subject/vkwnxqtx.html