美文网首页kafka
Kafka Stream简单示例(四)---定义更通用的Serd

Kafka Stream简单示例(四)---定义更通用的Serd

作者: 不1见2不3散4 | 来源:发表于2019-03-10 11:14 被阅读0次

    本篇是在《Kafka Stream简单示例(一)》《Kafka Stream简单示例(二)---聚合 Aggregation--统计总和》 以及《 Kafka Stream简单示例(三)---自定义Serde》基础上成文的,建议先阅读前三篇,以便清楚上下文关系需求背景。

    第三篇 《Kafka Stream简单示例(三)---自定义Serde》中,我们自定义了Statistic类的Serializer和Deserializer。现实中我们可能需要多个类都支持序列化和反序列,能否有泛型的Serializer和Deserializer,直接放入自己的类就可以完成工作?答案是,如果你的类,是POJO类型的,使用泛型JsonPOJOSerializer和JsonPOJODeserializer就可以。

    注意:示例中的代码只是展示流程,非生产代码,仅供参考,由此导致的问题本人概不负责。

    官方文档在这里,我用是kafka 1.0. 所以连接也是1.0版本的文档。 http://kafka.apache.org/10/documentation/streams/developer-guide/datatypes.html

    项目需求

    统计一分钟内(固定时间窗口Tumbling Window)内温度的总和与平均值。类似的还有,最大值,最小值。

    主要流程和代码

    完整的代码在这里,欢迎加星和fork。 谢谢!

    一个结果中必须同时含有总和与平均值,于是我们设计一个简单数据结构

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public class Statistics {
        private Long avg;
        private Long sum;
        private Long count;
    }
    

    细心的人会发现本篇中的Statistic比《 Kafka Stream简单示例(三)---自定义Serde》中的Statistic多一个@NoArgsConstructor注解,这是因为我们后面使用反序列化是需要生成Statistics对象(使用默认的无参构造函数生成)。 因此需要添加@NoArgsConstructor注解。

    根据Serdes的要求,我们必须提供对应的Serializer和Deserializer。


    SerdesClass.png

    我们需要实现JsonPOJOSerializer和JsonPOJODeserializer。仍然才考LongSerializer和LongDeserializer的实现, 我们实现了StatisticsSerializer和StatisticsDeserializer。
    首先是序列化实现JsonPOJOSerializer

    package com.yq.generic;
    
    import com.fasterxml.jackson.databind.ObjectMapper;
    import org.apache.kafka.common.serialization.Serializer;
    import org.apache.kafka.common.errors.SerializationException;
    
    import java.util.Map;
    
    /**
     * 这个是官方例子的copy, 版权归官方。copy到本地是为了让我的例子也运行起来
     * https://github.com/apache/kafka/blob/1.0/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java
     * className: JsonPOJOSerializer
     *
     */
    
    
    public class JsonPOJOSerializer<T> implements Serializer<T> {
        private final ObjectMapper objectMapper = new ObjectMapper();
    
        /**
         * Default constructor needed by Kafka
         */
        public JsonPOJOSerializer() {
        }
    
        @Override
        public void configure(Map<String, ?> props, boolean isKey) {
        }
    
        @Override
        public byte[] serialize(String topic, T data) {
            if (data == null)
                return null;
    
            try {
                return objectMapper.writeValueAsBytes(data);
            } catch (Exception e) {
                throw new SerializationException("Error serializing JSON message", e);
            }
        }
    
        @Override
        public void close() {
        }
    
    }
    
    
    

    其次是反序列化实现JsonPOJODeserializer

    package com.yq.generic;
    
    import com.fasterxml.jackson.databind.ObjectMapper;
    import org.apache.kafka.common.serialization.Deserializer;
    import org.apache.kafka.common.errors.SerializationException;
    
    import java.util.Map;
    
    /**
     * 这个是官方例子的copy, 版权归官方。copy到本地是为了让我的例子也运行起来
     * https://github.com/apache/kafka/blob/1.0/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java
     * className: JsonPOJOSerializer
     *
     */
    public class JsonPOJODeserializer<T> implements Deserializer<T> {
        private ObjectMapper objectMapper = new ObjectMapper();
    
        private Class<T> tClass;
    
        /**
         * Default constructor needed by Kafka
         */
        public JsonPOJODeserializer() {
        }
    
        @SuppressWarnings("unchecked")
        @Override
        public void configure(Map<String, ?> props, boolean isKey) {
            tClass = (Class<T>) props.get("JsonPOJOClass");
        }
    
        @Override
        public T deserialize(String topic, byte[] bytes) {
            if (bytes == null)
                return null;
    
            T data;
            try {
                data = objectMapper.readValue(bytes, tClass);
            } catch (Exception e) {
                throw new SerializationException(e);
            }
    
            return data;
        }
    
        @Override
        public void close() {
    
        }
    }
    
    
    

    最后是我们的主流程。
    第一步,我们需要先定义

    final Serializer<Statistics> statisticsSerializer = new JsonPOJOSerializer<>();
            serdeProps.put("JsonPOJOClass", Statistics.class);
            statisticsSerializer.configure(serdeProps, false);
    
            final Deserializer<Statistics> statisticsDeserializer = new JsonPOJODeserializer<>();
            serdeProps.put("JsonPOJOClass", Statistics.class);
            statisticsDeserializer.configure(serdeProps, false);
    
            final Serde<Statistics> statisticsSerde = Serdes.serdeFrom(statisticsSerializer, statisticsDeserializer);
    

    第二步。 就像SerDe内置的Serdes.Long()或者 Serdes.String(), 可以直接使用statisticsSerde。

    KTable的格式是 KTable<Windowed<String>, Statistics>。 aggregate函数的初始值和返回都是Statistics类型, 结果存储的格式Materialized.<String, Statistics, WindowStore<Bytes, byte[]>>as("time-windowed-aggregated-temp-stream-store")
    .withValueSerde(statisticsSerde) , 也是Statistics类型。

    package com.yq.generic;
    
    
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONObject;
    import com.yq.customized.Statistics;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.common.serialization.Deserializer;
    import org.apache.kafka.common.serialization.Serde;
    import org.apache.kafka.common.serialization.Serdes;
    import org.apache.kafka.common.serialization.Serializer;
    import org.apache.kafka.common.utils.Bytes;
    import org.apache.kafka.streams.KafkaStreams;
    import org.apache.kafka.streams.StreamsBuilder;
    import org.apache.kafka.streams.StreamsConfig;
    import org.apache.kafka.streams.kstream.Aggregator;
    import org.apache.kafka.streams.kstream.Initializer;
    import org.apache.kafka.streams.kstream.KStream;
    import org.apache.kafka.streams.kstream.KTable;
    import org.apache.kafka.streams.kstream.KeyValueMapper;
    import org.apache.kafka.streams.kstream.Materialized;
    import org.apache.kafka.streams.kstream.Produced;
    import org.apache.kafka.streams.kstream.TimeWindows;
    import org.apache.kafka.streams.kstream.Windowed;
    import org.apache.kafka.streams.kstream.internals.WindowedDeserializer;
    import org.apache.kafka.streams.kstream.internals.WindowedSerializer;
    import org.apache.kafka.streams.state.WindowStore;
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Properties;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    
    /**
     *  http://kafka.apache.org/10/documentation/streams/developer-guide/datatypes.html
     * 统计60秒内,温度值的最大值  topic中的消息格式为数字,30, 21或者{"temp":19, "humidity": 25}
     */
    public class TemperatureAvgGenericSerDeDemo {
        private static final int TEMPERATURE_WINDOW_SIZE = 60;
    
        public static void main(String[] args) throws Exception {
    
            Properties props = new Properties();
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-temp-avg");
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
            props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
    
            StreamsBuilder builder = new StreamsBuilder();
    
            KStream<String, String> source = builder.stream("iot-temp");
    
            Map<String, Object> serdeProps = new HashMap<>();
    
            final Serializer<Statistics> statisticsSerializer = new JsonPOJOSerializer<>();
            serdeProps.put("JsonPOJOClass", Statistics.class);
            statisticsSerializer.configure(serdeProps, false);
    
            final Deserializer<Statistics> statisticsDeserializer = new JsonPOJODeserializer<>();
            serdeProps.put("JsonPOJOClass", Statistics.class);
            statisticsDeserializer.configure(serdeProps, false);
    
            final Serde<Statistics> statisticsSerde = Serdes.serdeFrom(statisticsSerializer, statisticsDeserializer);
    
            KTable<Windowed<String>, Statistics> max = source
                    .selectKey(new KeyValueMapper<String, String, String>() {
                        @Override
                        public String apply(String key, String value) {
                            return "stat";
                        }
                    })
                    .groupByKey()
                    .windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(TEMPERATURE_WINDOW_SIZE)))
                    .aggregate(
                            new Initializer<Statistics>() {
                                @Override
                                public Statistics apply() {
                                    Statistics avgAndSum = new Statistics(0L,0L,0L);
                                    return avgAndSum;
                                }
                            },
                            new Aggregator<String, String, Statistics>() {
                                @Override
                                public Statistics apply(String aggKey, String newValue, Statistics aggValue) {
                                    //topic中的消息格式为{"temp":19, "humidity": 25}
                                    System.out.println("aggKey:" + aggKey + ",  newValue:" + newValue + ", aggKey:" + aggValue);
                                    Long newValueLong = null;
                                    try {
                                        JSONObject json = JSON.parseObject(newValue);
                                        newValueLong = json.getLong("temp");
                                    }
                                    catch (ClassCastException ex) {
                                        try {
                                            newValueLong = Long.valueOf(newValue);
                                        }
                                         catch (NumberFormatException e) {
                                             System.out.println("Exception:" + e.getMessage());
                                             //异常返回原值
                                             return aggValue;
                                        }
                                    }
                                    catch (Exception e) {
                                        System.out.println("Exception:" + e.getMessage());
                                        //异常返回原值
                                        return aggValue;
                                    }
    
    
                                    aggValue.setCount(aggValue.getCount() + 1);
                                    aggValue.setSum(aggValue.getSum() + newValueLong);
                                    aggValue.setAvg(aggValue.getSum() / aggValue.getCount());
    
                                    return aggValue;
                                }
                            },
                            Materialized.<String, Statistics, WindowStore<Bytes, byte[]>>as("time-windowed-aggregated-temp-stream-store")
                                    .withValueSerde(statisticsSerde)
                    );
    
            WindowedSerializer<String> windowedSerializer = new WindowedSerializer<>(Serdes.String().serializer());
            WindowedDeserializer<String> windowedDeserializer = new WindowedDeserializer<>(Serdes.String().deserializer(), TEMPERATURE_WINDOW_SIZE);
            Serde<Windowed<String>> windowedSerde = Serdes.serdeFrom(windowedSerializer, windowedDeserializer);
    
            max.toStream().to("iot-temp-stat", Produced.with(windowedSerde, statisticsSerde));
    
            final KafkaStreams streams = new KafkaStreams(builder.build(), props);
            final CountDownLatch latch = new CountDownLatch(1);
    
    
            Runtime.getRuntime().addShutdownHook(new Thread("streams-temperature-shutdown-hook") {
                @Override
                public void run() {
                    streams.close();
                    latch.countDown();
                }
            });
    
            try {
                streams.start();
                latch.await();
            } catch (Throwable e) {
                System.exit(1);
            }
            System.exit(0);
        }
    }
    

    效果截图

    图中已经有文字说明,结合代码能更清楚了解Kafka Stream。


    GenericPOJOSer.png

    相关文章

      网友评论

        本文标题:Kafka Stream简单示例(四)---定义更通用的Serd

        本文链接:https://www.haomeiwen.com/subject/pulupqtx.html