美文网首页
kafka_08_使用第三方工具序列化和反序列化

kafka_08_使用第三方工具序列化和反序列化

作者: 平头哥2 | 来源:发表于2019-03-31 09:43 被阅读0次

    1. gradle依赖

    // https://mvnrepository.com/artifact/io.protostuff/protostuff-core
    compile group: 'io.protostuff', name: 'protostuff-core', version: '1.6.0'
    
    // https://mvnrepository.com/artifact/io.protostuff/protostuff-runtime
    compile group: 'io.protostuff', name: 'protostuff-runtime', version: '1.6.0'
    

    2. 序列化类的编写

    import com.ghq.kafka.entity.Company;
    import io.protostuff.LinkedBuffer;
    import io.protostuff.ProtostuffIOUtil;
    import io.protostuff.Schema;
    import io.protostuff.runtime.RuntimeSchema;
    import org.apache.kafka.common.serialization.Serializer;
    
    import java.util.Map;
    
    public class ProtostuffSerializer implements Serializer<Company> {
    
        @Override
        public void configure(Map<String, ?> configs, boolean isKey) {
    
        }
    
        @Override
        public byte[] serialize(String topic, Company data) {
            if (data == null) {
                return null;
            }
            Schema schema = RuntimeSchema.getSchema(data.getClass());
            LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
            byte[] bytes = null;
            try {
                bytes  = ProtostuffIOUtil.toByteArray(data, schema, buffer);
            }catch (Exception e){
    
                e.printStackTrace();
            }finally {
                buffer.clear();
            }
    
    
            return bytes;
        }
    
        @Override
        public void close() {
    
        }
    }
    
    

    3. 反序列化类的编写

    import com.ghq.kafka.entity.Company;
    import io.protostuff.ProtostuffIOUtil;
    import io.protostuff.Schema;
    import io.protostuff.runtime.RuntimeSchema;
    import org.apache.kafka.common.serialization.Deserializer;
    
    import java.util.Map;
    
    public class ProtostuffDeserializer implements Deserializer<Company> {
        @Override
        public void configure(Map<String, ?> configs, boolean isKey) {
    
        }
    
        @Override
        public Company deserialize(String topic, byte[] data) {
            if (data == null) {
                return null;
            }
    
            Schema<Company> schema = RuntimeSchema.getSchema(Company.class);
    
            Company company = new Company();
    
            ProtostuffIOUtil.mergeFrom(data, company, schema);
            return company;
        }
    
        @Override
        public void close() {
    
        }
    }
    
    

    如何使用?

    参考:https://www.jianshu.com/p/35a432bcb006

    相关文章

      网友评论

          本文标题:kafka_08_使用第三方工具序列化和反序列化

          本文链接:https://www.haomeiwen.com/subject/btrybqtx.html