美文网首页Kafka
kafka_04_Kafka的序列化和反序列化

kafka_04_Kafka的序列化和反序列化

作者: 平头哥2 | 来源:发表于2019-03-25 21:09 被阅读0次

    序列化

    kafka自带的序列化器都实现了org.apache.kafka.common.serialization.Serializer接口

    该类如下:

    package org.apache.kafka.common.serialization;
    
    import java.io.Closeable;
    import java.util.Map;
    
    /**
     * An interface for converting objects to bytes.
     *
     * A class that implements this interface is expected to have a constructor with no parameter.
     * <p>
     * Implement {@link org.apache.kafka.common.ClusterResourceListener} to receive cluster metadata once it's available. Please see the class documentation for ClusterResourceListener for more information.
     *
     * @param <T> Type to be serialized from.
     */
    public interface Serializer<T> extends Closeable {
    
        /**
         * Configure this class.
         * @param configs configs in key/value pairs
         * @param isKey whether is for key or value
         */
        void configure(Map<String, ?> configs, boolean isKey);
    
        /**
         * Convert {@code data} into a byte array.
         *
         * @param topic topic associated with data
         * @param data typed data
         * @return serialized bytes
         */
        byte[] serialize(String topic, T data);
    
        /**
         * Close this serializer.
         *
         * This method must be idempotent as it may be called multiple times.
         */
        @Override
        void close();
    }
    

    方法直观明了,不再解释。

    其继承结构如下:

    序列化继承结构.jpg
    • 其中:org.apache.kafka.common.serialization.StringSerializer源码如下:
    package org.apache.kafka.common.serialization;
    
    import org.apache.kafka.common.errors.SerializationException;
    
    import java.io.UnsupportedEncodingException;
    import java.util.Map;
    
    /**
     *  String encoding defaults to UTF8 and can be customized by setting the property key.serializer.encoding,
     *  value.serializer.encoding or serializer.encoding. The first two take precedence over the last.
     */
    public class StringSerializer implements Serializer<String> {
        private String encoding = "UTF8";
    
        @Override
        public void configure(Map<String, ?> configs, boolean isKey) {
            String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
            Object encodingValue = configs.get(propertyName);
            if (encodingValue == null)
                encodingValue = configs.get("serializer.encoding");
            if (encodingValue instanceof String)
                encoding = (String) encodingValue;
        }
    
        @Override
        public byte[] serialize(String topic, String data) {
            try {
                if (data == null)
                    return null;
                else
                    return data.getBytes(encoding);
            } catch (UnsupportedEncodingException e) {
                throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
            }
        }
    
        @Override
        public void close() {
            // nothing to do
        }
    }
    

    方法直观明了,不再解释。

    实体类定义如下:

    package com.ghq.kafka.entity;
    public class Company {
    
        private String name;
        private String address;
        public Company(){}
        public Company(String name, String address) {
            this.name = name;
            this.address = address;
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        public String getAddress() {
            return address;
        }
    
        public void setAddress(String address) {
            this.address = address;
        }
    
        @Override
        public String toString() {
            return "Company{" +
                    "name='" + name + '\'' +
                    ", address='" + address + '\'' +
                    '}';
        }
    }
    
    

    序列化器定义如下(实现Serializer接口即可):

    package com.ghq.kafka.server;
    import com.ghq.kafka.entity.Company;
    import org.apache.kafka.common.serialization.Serializer;
    
    import java.nio.ByteBuffer;
    import java.util.Map;
    
    public class CompanySerializer implements Serializer<Company> {
        @Override
        public void configure(Map<String, ?> configs, boolean isKey) {
    
        }
    
        @Override
        public byte[] serialize(String topic, Company data) {
            if (data == null) {
    
                return null;
            }
    
            byte[] name,address;
    
            try{
    
                if(data.getName() != null){
                    name = data.getName().getBytes("UTF-8");
                }else {
                    name = new byte[0];
                }
    
                if(data.getAddress() != null){
                    address = data.getAddress().getBytes("UTF-8");
                }else{
                    address =  new byte[0];
                }
    
    
                ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + name.length + address.length);
                buffer.putInt(name.length);
                buffer.put(name);
                buffer.putInt(address.length);
                buffer.put(address);
                return buffer.array();
    
            }catch (Exception e){
                e.printStackTrace();
            }
            return new byte[0];
        }
    
        @Override
        public void close() {
    
        }
    }
    

    主方法实现逻辑如下:

    package com.ghq.kafka.server;
    
    import com.ghq.kafka.entity.Company;
    import org.apache.kafka.clients.producer.*;
    import java.util.Properties;
    
    public class CompanyProducerDemo {
    
        public static final String brokerList = "192.168.52.135:9092";
        public static final String topic = "company-demo";
    
        public static Properties initProperties(){
            Properties prop = new Properties();
            //这里将key.serializer 定义为我们自定义的CompanySerializer
            prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, CompanySerializer.class.getName());
            ////这里将value.serializer 定义为我们自定义的CompanySerializer
            prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CompanySerializer.class.getName());
            prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
            prop.put(ProducerConfig.CLIENT_ID_CONFIG,"company.client.id.demo");
            prop.put(ProducerConfig.RETRIES_CONFIG,10);
            return prop;
        }
    
        public static void main(String[] args) {
            async();
        }
    
        public static void async() {
            Properties prop = initProperties();
            //注意泛型
            KafkaProducer<String, Company> producer = new KafkaProducer<>(prop);
            Company company = new Company("xxxxxxxx","yyyyyyyyyy");
            ProducerRecord<String, Company> record = new ProducerRecord<>(topic, company);
    
            producer.send(record, new Callback() {
    
                /**
                 * metadata 和 exception 互斥
                 * 消息发送成功:metadata != null exception == null
                 * 消息发送失败:metadata == null exception != null
                 */
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
    
                    if (exception != null) {
                        System.out.println("消息1发送失败:"+metadata);
                    }else {
                        System.out.println("消息1发送成功:"+metadata);
                    }
                }
            });
    
            producer.send(record, (metadata, exception) -> {
                if (exception != null) {
                    System.out.println("消息2发送失败:"+metadata);
                }else {
                    System.out.println("消息2发送成功:"+metadata);
                }
            });
    
            producer.close();
        }
    
    }
    

    反序列化

    kafka自带的反序列化器都实现了org.apache.kafka.common.serialization.Deserializer接口

    该类如下:

    public interface Deserializer<T> extends Closeable {
    
        /**
         * Configure this class.
         * @param configs configs in key/value pairs
         * @param isKey whether is for key or value
         */
        void configure(Map<String, ?> configs, boolean isKey);
    
        /**
         * Deserialize a record value from a byte array into a value or object.
         * @param topic topic associated with the data
         * @param data serialized bytes; may be null; implementations are recommended to handle null by returning a value or null rather than throwing an exception.
         * @return deserialized typed data; may be null
         */
        T deserialize(String topic, byte[] data);
    
        @Override
        void close();
    
    

    其继承结构如下:

    反序列化继承结构.png
    • 其中:org.apache.kafka.common.serialization.StringDeserializer 类如下:
    import java.io.UnsupportedEncodingException;
    import java.util.Map;
    
    /**
     *  String encoding defaults to UTF8 and can be customized by setting the property key.deserializer.encoding,
     *  value.deserializer.encoding or deserializer.encoding. The first two take precedence over the last.
     */
    public class StringDeserializer implements Deserializer<String> {
        private String encoding = "UTF8";
    
        @Override
        public void configure(Map<String, ?> configs, boolean isKey) {
            String propertyName = isKey ? "key.deserializer.encoding" : "value.deserializer.encoding";
            Object encodingValue = configs.get(propertyName);
            if (encodingValue == null)
                encodingValue = configs.get("deserializer.encoding");
            if (encodingValue instanceof String)
                encoding = (String) encodingValue;
        }
    
        @Override
        public String deserialize(String topic, byte[] data) {
            try {
                if (data == null)
                    return null;
                else
                    return new String(data, encoding);
            } catch (UnsupportedEncodingException e) {
                throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding);
            }
        }
    
        @Override
        public void close() {
            // nothing to do
        }
    }
    

    自定义的反序列化器如下(和上文的序列化器对应):

    package com.ghq.kafka.client;
    
    import com.ghq.kafka.entity.Company;
    import org.apache.kafka.common.errors.SerializationException;
    import org.apache.kafka.common.serialization.Deserializer;
    
    import java.nio.ByteBuffer;
    import java.util.Map;
    
    public class CompanyDeserializer implements Deserializer<Company> {
        @Override
        public void configure(Map<String, ?> configs, boolean isKey) {
    
        }
    
        @Override
        public Company deserialize(String topic, byte[] data) {
    
            if (data == null) {
                return null;
            }
            if (data.length < 8) {
                throw new SerializationException("序列化失败...");
            }
    
            ByteBuffer buffer = ByteBuffer.wrap(data);
    
            int nameLength, addressLength;
            String name = null, address = null;
    
            nameLength = buffer.getInt();
            byte[] nameBytes = new byte[nameLength];
            buffer.get(nameBytes);
    
            addressLength = buffer.getInt();
            byte[] addressBytes = new byte[addressLength];
            buffer.get(addressBytes);
    
            try {
    
                name = new String(nameBytes, "UTF-8");
                address = new String(addressBytes, "UTF-8");
            } catch (Exception e) {
                e.printStackTrace();
            }
    
            return new Company(name, address);
        }
    
        @Override
        public void close() {
    
        }
    }
    
    

    消费者客户端编写:

    package com.ghq.kafka.client;
    
    import com.ghq.kafka.entity.Company;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Properties;
    
    public class CompanyConsumerDemo {
    
        public static final String brokerList = "192.168.52.135:9092";
        public static final String topic = "company-demo";
        public static final String groupId = "group.demo";
    
        public static Properties initProperties(){
            Properties prop = new Properties();
            //这里将key.deserializer 定义为我们自定义的CompanyDeserializer
            prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, CompanyDeserializer.class.getName());
            //这里将value.deserializer 定义为我们自定义的CompanyDeserializer
            prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CompanyDeserializer.class.getName());
            prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
            prop.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            prop.put(ConsumerConfig.CLIENT_ID_CONFIG, "company-id-demo");
            return prop;
        }
    
        public static void main(String[] args) {
            Properties prop = initProperties();
            KafkaConsumer<String, Company> consumer = new KafkaConsumer<>(prop);
            consumer.subscribe(Collections.singletonList(topic));
            while (true) {
                ConsumerRecords<String, Company> records = consumer.poll(Duration.ofSeconds(5));
                for (ConsumerRecord<String, Company> record : records) {
                    System.out.println("--------------->:" + record.value());
                }
            }
    
        }
    }
    
    

    分别执行生产者和消费者客户端,结果如下:

    消息1发送成功:company-demo-0@4
    消息2发送成功:company-demo-0@5
    
    --------------->:Company{name='xxxxxxxx', address='yyyyyyyyyy'}
    --------------->:Company{name='xxxxxxxx', address='yyyyyyyyyy'}
    

    结束。

    相关文章

      网友评论

        本文标题:kafka_04_Kafka的序列化和反序列化

        本文链接:https://www.haomeiwen.com/subject/tokivqtx.html