美文网首页
Kafka producer 序列化

Kafka producer 序列化

作者: 只是肿态度 | 来源:发表于2019-11-28 16:00 被阅读0次

    kafka需要将要发送的消息序列化为字节数组才能发送给Boker,kafka Client 自带了几种序列化方式:String、ByteArray、ByteBuffer、Bytes、Double、Long 。但是如果想使用自定义对象序列化的话,我们就需要构建一个自定义的序列化器。自定义的序列化器需要实org.apache.kafka.common.serialization.Serializer 的接口。

    1.首先创建一个自定义对象

    @Data
    @Builder
    @AllArgsConstructor
    @NoArgsConstructor
    public class Company {
        private String name;
        private String address;
    }
    

    2.实现Serializer 接口

    public class CompanySerializer implements Serializer<Company> {
        @Override
        public void configure(Map<String, ?> configs, boolean isKey) {
     
        }
         
        //进行字节数组序列化
        @Override
        public byte[] serialize(String topic, Company data) {
            if(data == null){
                return null;
            }
            byte[] name, address;
            try{
                if(data.getName() != null){
                    name = data.getName().getBytes("UTF-8");
                }else {
                    name = new byte[0];
                }
                if(data.getAddress() != null){
                    address = data.getAddress().getBytes("UTF-8");
                }else{
                    address = new byte[0];
                }
                ByteBuffer byteBuffer = ByteBuffer.allocate(4 + 4+ name.length + address.length);
     
                byteBuffer.putInt(name.length);
                byteBuffer.put(name);
                byteBuffer.putInt(address.length);
                byteBuffer.put(address);
                return byteBuffer.array();
            }catch (UnsupportedEncodingException e){
                e.printStackTrace();
            }
            return new byte[0];
        }
     
        @Override
        public void close() {
     
        }
    }
    

    此时,自定义序列化器已经做好了,我们就可以使用了。

    @Test
    public void testSerializer() throws ExecutionException, InterruptedException {
        Properties properties = new Properties();
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //设置key的序列化器
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CompanySerializer.class.getName());
        properties.put("bootstrap.servers", brokerList);
        KafkaProducer<String, Company> producer = new KafkaProducer<>(properties);
        Company company = Company.builder().name("hiddenkafka").address("China").build();
        ProducerRecord<String, Company> record = new ProducerRecord<>(topic, company);
        producer.send(record).get();
    }
    

    相关文章

      网友评论

          本文标题:Kafka producer 序列化

          本文链接:https://www.haomeiwen.com/subject/mmybwctx.html