美文网首页
2019-06-20 Kafka Streams学习笔记

2019-06-20 Kafka Streams学习笔记

作者: 51344b645c49 | 来源:发表于2019-06-20 14:40 被阅读0次

    简介理解

    传统的批处理注重结果(例如将一批的数据进行处理, 最终的处理结果才是最终想要的),而流注重变化,每一个输入都会对应输出,这种行为的不同,造成了批处理的相应较慢(或者说是需要等待很长时间),而流处理的相应较快。

    Kafka Streams 是一个库。一个基于Kafka的构建流处理程序的库,将流处理变得更为简单,特别是其输入一个Topic,输出是另一个Topic的程序。Kafka Streams不依赖于集群和框架,只是一个库,只需要Kafka和相关的处理代码,Kafka会去协调程序处理代码。


    Demo代码

    - Main入口

    public class KafkaStreamsDemo {

      public static void main(String[] args) {

        Properties config = new Properties();

        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "hellow_stream_demo");

        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");

        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();

        // 注册一个stream 并订阅topic

        KStream<String, String> stream1 = builder.stream("HelloWorld");

        // 转换方法1:flatMapValues

        KStream<String, String> stream2 = stream1.

            flatMapValues(new ValueMapper<String,Iterable<String>>(){

              @Override

              public Iterable<String> apply(String s) {

                List<String> list = Lists.newArrayList("flatMapValues::"+s);

                return list;

              }

            });

        stream2.to("hellow_stream_out");

        // 转换方法2:transform 进行转换

        KStream<String, String> stream3 = stream1.transform(new KafkaDemoTransformSupplier());

        stream3.to("hellow_stream_out_2");

        KafkaStreams streams = new KafkaStreams(builder.build(), config);

        streams.start();

      }

    }

    - Supplier类

    public class KafkaDemoTransformSupplier implements TransformerSupplier<String, String, KeyValue<String,String>> {

      @Override

      public Transformer<String, String, KeyValue<String, String>> get() {

        return new KafkaDemoTransformer();

      }

    }

    - transformer类

    public class KafkaDemoTransformer implements Transformer<String,String, KeyValue<String, String>> {

      private ProcessorContext processorContext;

      @Override

      public void init(ProcessorContext processorContext) {

        this.processorContext = processorContext;

      }

      @Override

      public KeyValue<String, String> transform(String key, String value) {

        String str = "transfer::"+value;

    // 此处开启,会推送两遍消息

    //    processorContext.forward(key,str);

        return new KeyValue<>(key,str);

      }

      @Override

      public void close() {

      }

    }

    参考资料

    https://www.cnblogs.com/devos/p/5616086.html

    https://blog.csdn.net/QYHuiiQ/article/details/89434173

    相关文章

      网友评论

          本文标题:2019-06-20 Kafka Streams学习笔记

          本文链接:https://www.haomeiwen.com/subject/yaadqctx.html