美文网首页
2019-06-20 Kafka Streams学习笔记

2019-06-20 Kafka Streams学习笔记

作者: 51344b645c49 | 来源:发表于2019-06-20 14:40 被阅读0次

简介理解

传统的批处理注重结果(例如将一批的数据进行处理, 最终的处理结果才是最终想要的),而流注重变化,每一个输入都会对应输出,这种行为的不同,造成了批处理的相应较慢(或者说是需要等待很长时间),而流处理的相应较快。

Kafka Streams 是一个库。一个基于Kafka的构建流处理程序的库,将流处理变得更为简单,特别是其输入一个Topic,输出是另一个Topic的程序。Kafka Streams不依赖于集群和框架,只是一个库,只需要Kafka和相关的处理代码,Kafka会去协调程序处理代码。


Demo代码

- Main入口

public class KafkaStreamsDemo {

  public static void main(String[] args) {

    Properties config = new Properties();

    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "hellow_stream_demo");

    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");

    config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    StreamsBuilder builder = new StreamsBuilder();

    // 注册一个stream 并订阅topic

    KStream<String, String> stream1 = builder.stream("HelloWorld");

    // 转换方法1:flatMapValues

    KStream<String, String> stream2 = stream1.

        flatMapValues(new ValueMapper<String,Iterable<String>>(){

          @Override

          public Iterable<String> apply(String s) {

            List<String> list = Lists.newArrayList("flatMapValues::"+s);

            return list;

          }

        });

    stream2.to("hellow_stream_out");

    // 转换方法2:transform 进行转换

    KStream<String, String> stream3 = stream1.transform(new KafkaDemoTransformSupplier());

    stream3.to("hellow_stream_out_2");

    KafkaStreams streams = new KafkaStreams(builder.build(), config);

    streams.start();

  }

}

- Supplier类

public class KafkaDemoTransformSupplier implements TransformerSupplier<String, String, KeyValue<String,String>> {

  @Override

  public Transformer<String, String, KeyValue<String, String>> get() {

    return new KafkaDemoTransformer();

  }

}

- transformer类

public class KafkaDemoTransformer implements Transformer<String,String, KeyValue<String, String>> {

  private ProcessorContext processorContext;

  @Override

  public void init(ProcessorContext processorContext) {

    this.processorContext = processorContext;

  }

  @Override

  public KeyValue<String, String> transform(String key, String value) {

    String str = "transfer::"+value;

// 此处开启,会推送两遍消息

//    processorContext.forward(key,str);

    return new KeyValue<>(key,str);

  }

  @Override

  public void close() {

  }

}

参考资料

https://www.cnblogs.com/devos/p/5616086.html

https://blog.csdn.net/QYHuiiQ/article/details/89434173

相关文章

网友评论

      本文标题:2019-06-20 Kafka Streams学习笔记

      本文链接:https://www.haomeiwen.com/subject/yaadqctx.html