美文网首页Kafka大数据栈
7. kafka序列化&反序列化

7. kafka序列化&反序列化

作者: 阿飞的博客 | 来源:发表于2018-07-03 14:07 被阅读1253次

    序列化

    kafka序列化消息是在生产端,序列化后,消息才能网络传输。而构造KafkaProducer代码如下:

    Properties props = new Properties();
    props.put("bootstrap.servers", "10.0.55.229:9092");
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    kafkaProducer = new KafkaProducer<>(props);
    

    属性key.serializervalue.serializer就是key和value指定的序列化方式。无论是key还是value序列化和反序列化实现都是一样的,所以接下来都只以value的序列化和反序列为例。

    StringSerializer

    StringSerializer是内置的字符串序列化方式,核心源码如下:

    /**
     *  String encoding defaults to UTF8 and can be customized by setting the property key.serializer.encoding,
     *  value.serializer.encoding or serializer.encoding. The first two take precedence over the last.
     */
    public class StringSerializer implements Serializer<String> {
        private String encoding = "UTF8";
        ... ...
    
        @Override
        public byte[] serialize(String topic, String data) {
            try {
                // 如果数据为空,那么直接返回null即可
                if (data == null)
                    return null;
                else
                    // 否则将String序列化,即转为byte[]即可
                    return data.getBytes(encoding);
            } catch (UnsupportedEncodingException e) {
                throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
            }
        }
    
        @Override
        public void close() {
            // nothing to do
        }
    }
    

    自定义序列化

    和内置的StringSerializer字符串序列化一样,如果要自定义序列化方式,需要实现接口Serializer。假设每个字段按照下图所示的方式自定义序列化:

    image.png

    下面是一个简单的自定义实现(假设自定义Order类型的Serializer,Order类中有一些String,Integer,Long,Date类型的属性--其他类型暂不支持,读者可以自行扩展):

    /**
     * @author wangzhenfei9
     * @version 1.0.0
     * @since 2018年06月22日
     */
    public class OrderSerializer implements Serializer<Order> {
        @Override
        public void configure(Map<String, ?> configs, boolean isKey) {
            // nothing to do
        }
    
        @Override
        public byte[] serialize(String topic, Order data){
    
            if (data == null) {
                return null;
            }
    
            Class<? extends Order> clazz = data.getClass();
            Field[] fields = clazz.getDeclaredFields();
    
            // 计算保存Order所示属性值总计需要多少字节
            int total = 0;
    
            // 遍历所有属性字段, 目前只支持Integer, Long, String, Date类型
            for (Field field:fields){
                String fieldName = field.getName();
                int fieldLen = fieldName.getBytes().length;
                Object value = getPropertyValue(data, fieldName);
                String type = field.getType().getSimpleName();
                System.out.println("propertyName: "+fieldName +", value: " + value
                        +", len: " + fieldLen+", type: " + type);
                // 每个属性序列化方式: 属性长度(都是4个字节)+属性名称(长度需要计算)+值长度+值
                switch (type){
                    case "Long":
                        // 第一个4表示属性名长度需要的空间, 即int的长度;(int类型反序列化后需要4个字节长度,即32位)
                        // 第二个fieldLen表示属性名需要的空间
                        // 第三个4表示属性值长度需要的空间
                        // 第四个8表示属性值需要的空间(Long类型反序列化后需要8个字节长度,即64位)
                        total+=(4+fieldLen+4+8);
                        break;
                    case "Date":
                        // +8+8
                        // 如果是日期类型先转成Long类型的timestamp
                        total+=(4+fieldLen+4+8);
                        break;
                    case "Integer":
                        total+=(4+fieldLen+4+4);
                        break;
                    case "String":
                        try {
                            total+=(4+fieldLen + 4 + value.toString().getBytes("utf-8").length);
                        } catch (UnsupportedEncodingException e) {
                            e.printStackTrace();
                        }
                        break;
                    // 本次自定义的Serializer只支持Integer, Long, Date, String类型属性, 如果想要支持其他类型属性, 请自行实现
                    default:
                        throw new IllegalArgumentException("Unsupported argument type: "+type);
                }
            }
    
            // 算出需要个byte[]数组长度后, 分配同样大的byte[]中
            ByteBuffer result = ByteBuffer.allocate(total);
    
            for (Field field:fields){
                String fieldName = field.getName();
                byte[] fieldByte = fieldName.getBytes();
                int fieldLen = fieldByte.length;
    
                Object value = getPropertyValue(data, fieldName);
                String type = field.getType().getSimpleName();
    
                // 无论什么类型都要先put属性长度和属性名称
                result.putInt(fieldLen);
                result.put(fieldByte);
    
                switch (type){
                    case "Long":
                        byte[] longByte = SerializeUtil.longSerialize((Long) value);
                        result.putInt(longByte.length);
                        result.put(longByte);
                        break;
                    case "Date":
                        // 如果是日期类型先转成Long类型的timestamp
                        byte[] dateByte = SerializeUtil.longSerialize(((Date) value).getTime());
                        result.putInt(dateByte.length);
                        result.put(dateByte);
                        break;
                    case "Integer":
                        byte[] integerByte = SerializeUtil.integerSerialize((Integer) value);
                        result.putInt(integerByte.length);
                        result.put(integerByte);
                        break;
                    case "String":
                        byte[] stringByte = ((String)value).getBytes();
                        result.putInt(stringByte.length);
                        result.put(stringByte);
                        break;
                    default:
                        throw new IllegalArgumentException("Unsupported argument type: "+type);
                }
            }
    
            // 返回序列化后的结果
            return result.array();
        }
    
    
        private Object getPropertyValue(Order order, String propertyName) {
            Method[] methods = order.getClass().getMethods();
            for (Method method : methods) {
                // 这里方法匹配还不够严谨
                if (method.getName().equalsIgnoreCase("get" + propertyName)
                        || method.getName().equalsIgnoreCase("is" + propertyName)) {
                    try {
                        return method.invoke(order);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
            return null;
        }
    
        @Override
        public void close() {
            // nothing to do
        }
    
    }
    
    

    自定义Serializer后,修改属性value.serializer的值为com.afei.kafka.serialization.OrderSerializer,且ProducerRecord申明为ProducerRecord<String, Order>

    反序列化

    kafka反序列化消息是在消费端。由于网络传输过来的是byte[],只有反序列化后才能得到生产者发送的真实的消息内容。而构造KafkaConsumer代码如下:

    Properties props = new Properties();
    props.put("bootstrap.servers", "10.0.55.229:9092");
    props.put("key.deserializer",   "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
    

    属性key.deserializervalue.deserializer就是key和value指定的反序列化方式。

    StringDeserializer

    StringDeserializer是内置的字符串反序列化方式,核心源码如下:

    /**
     *  String encoding defaults to UTF8 and can be customized by setting the property key.deserializer.encoding,
     *  value.deserializer.encoding or deserializer.encoding. The first two take precedence over the last.
     */
    public class StringDeserializer implements Deserializer<String> {
        private String encoding = "UTF8";
        ... ...
    
        @Override
        public String deserialize(String topic, byte[] data) {
            try {
                // 如果数据为空,那么直接返回null即可
                if (data == null)
                    return null;
                else
                    // 否则将byte[]反序列化,即转为String即可
                    return new String(data, encoding);
            } catch (UnsupportedEncodingException e) {
                throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding);
            }
        }
    
        ... ...
    }
    

    自定义反序列化

    和内置的StringDeserializer字符串反序列化一样,如果要自定义反序列化方式,需要实现接口Deserializer。下面是一个简单的自定义实现(假设自定义Order类型的Deserializer,Order类中有一些String,Integer,Long,Date类型的属性--其他类型暂不支持),反序列化就是根据序列化的方式得到序列化前的内容:

    /**
     * @author wangzhenfei9
     * @version 1.0.0
     * @since 2018年06月22日
     */
    public class OrderDeserializer implements Deserializer<Order> {
        @Override
        public void configure(Map<String, ?> configs, boolean isKey) {
            // nothing to do
        }
    
        @Override
        public Order deserialize(String topic, byte[] bytes) {
    
            ByteBuffer data = ByteBuffer.wrap(bytes);
    
            Field[] declaredFields = new Field[0];
            try {
                declaredFields = Class.forName(Order.class.getCanonicalName()).getDeclaredFields();
            } catch (ClassNotFoundException e) {
                e.printStackTrace();
            }
    
            int propertyCount = declaredFields.length;
    
            Order order = new Order();
    
            while ((propertyCount--)>0){
                int propertyLen = data.getInt();
                byte[] nameByte = new byte[propertyLen];
                data.get(nameByte);
                String propertyName = new String(nameByte);
                //根据属性名称得到属性的类型
                String propertyType = getPropertyType(Order.class, propertyName);
    
                int valueLen = data.getInt();
                byte[] valueByte = new byte[valueLen];
                data.get(valueByte);
    
                Object value;
    
                switch (propertyType ){
                    case "Long":
                        value = DeserializeUtil.longDeserialize(valueByte);
                        break;
                    case "Date":
                        // 如果是日期类型先反序列化成Long类型, 然后得到Date类型的值
                        // result.putLong(((Date) value).getTime());
                        value = new Date(DeserializeUtil.longDeserialize(valueByte));
                        break;
                    case "Integer":
                        value = DeserializeUtil.integerDeserialize(valueByte);
                        break;
                    case "String":
                        value = new String(valueByte);
                        break;
                    default:
                        throw new IllegalArgumentException("Unsupported argument type: "+propertyType);
                }
    
                setPropertyValue(order, propertyName, value);
                System.out.println("property = "+propertyLen+", name = "+propertyName
                        +", value = "+valueLen+", name = "+value);
            }
            return order;
        }
    
        private String getPropertyType(Class<? extends Order> clazz, String propertyName) {
             /*
            * 得到类中的方法
            */
            try {
                Field field = clazz.getDeclaredField(propertyName);
                return field.getType().getSimpleName();
            } catch (NoSuchFieldException e) {
    
            }
            return null;
        }
    
        /**
         * 调用Order中属性${propertyName}的setter方法并赋值${propertyValue}
         */
        private void setPropertyValue(Order order, String propertyName, Object propertyValue) {
             /*
            * 得到类中的方法
            */
            Method[] methods = order.getClass().getMethods();
            for (Method method : methods) {
                // 这里方法匹配还不够严谨
                if (method.getName().equalsIgnoreCase("set" + propertyName)) {
                    try {
                        method.invoke(order, propertyValue);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    
        @Override
        public void close() {
            // nothing to do
        }
    
    }
    

    自定义Serializer后,修改属性value.serializer的值为com.afei.kafka.serialization.OrderDeserializer,且KafkaConsumer申明为KafkaConsumer<String, Order>

    总结

    可以看到,自定义Serializer和Deserializer非常痛苦,而且上面还有很多异常情况没有处理,还有很多类型不支持,非常脆弱。复杂类型的支持更是一件痛苦的事情,不同版本之间的兼容性问题更是一个极大的挑战。由于Serializer和Deserializer影响到上下游系统,导致牵一发而动全身。自定义序列化&反序列化实现不是能力的体现,而是逗比的体现。所以强烈不建议自定义实现序列化&反序列化推荐直接使用StringSerializerStringDeserializer,然后使用json作为标准的数据传输格式。站在巨人的肩膀上,事半功倍。

    相关文章

      网友评论

        本文标题:7. kafka序列化&反序列化

        本文链接:https://www.haomeiwen.com/subject/wngduftx.html