美文网首页
Kafka消费GZIP消息

Kafka消费GZIP消息

作者: 宝华殿法师 | 来源:发表于2019-05-27 15:06 被阅读0次

    接到新需求,消费kafka中压缩过的消息。本来天真的以为Producer有诸如compression.type之类的配置,Consumer理应有对应的解析器(decompression.type之类)。我尼玛。
    JAVA 解析GZIP的工具类不多说了,贴代码

    package com.halooye.base.utils;
    import java.io.ByteArrayInputStream;
    import java.io.ByteArrayOutputStream;
    import java.io.IOException;
    import java.util.zip.GZIPInputStream;
    import java.util.zip.GZIPOutputStream;
    
    public class GZIPUtils  {
        public static final String GZIP_ENCODE_UTF_8 = "UTF-8";
        public static final String GZIP_ENCODE_ISO_8859_1 = "ISO-8859-1";
    
    
        public static byte[] compress(String str, String encoding) {
            if (str == null || str.length() == 0) {
                return null;
            }
            ByteArrayOutputStream out = new ByteArrayOutputStream();
            GZIPOutputStream gzip;
            try {
                gzip = new GZIPOutputStream(out);
                gzip.write(str.getBytes(encoding));
                gzip.close();
            } catch ( Exception e) {
                e.printStackTrace();
            }
            return out.toByteArray();
        }
    
        public static byte[] compress(String str) throws IOException {
            return compress(str, GZIP_ENCODE_UTF_8);
        }
    
        public static byte[] uncompress(byte[] bytes) {
            if (bytes == null || bytes.length == 0) {
                return null;
            }
            ByteArrayOutputStream out = new ByteArrayOutputStream();
            ByteArrayInputStream in = new ByteArrayInputStream(bytes);
            try {
                GZIPInputStream ungzip = new GZIPInputStream(in);
                byte[] buffer = new byte[256];
                int n;
                while ((n = ungzip.read(buffer)) >= 0) {
                    out.write(buffer, 0, n);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            return out.toByteArray();
        }
    
        public static String uncompressToString(byte[] bytes, String encoding) {
            if (bytes == null || bytes.length == 0) {
                return null;
            }
            ByteArrayOutputStream out = new ByteArrayOutputStream();
            ByteArrayInputStream in = new ByteArrayInputStream(bytes);
            try {
                GZIPInputStream ungzip = new GZIPInputStream(in);
                byte[] buffer = new byte[256];
                int n;
                while ((n = ungzip.read(buffer)) >= 0) {
                    out.write(buffer, 0, n);
                }
                return out.toString(encoding);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return null;
        }
    
        public static String uncompressToString(byte[] bytes) {
            return uncompressToString(bytes, GZIP_ENCODE_UTF_8);
        }
    
        public static void main(String[] args) throws IOException {
            String s = "我是消息";
            System.out.println("字符串长度:"+s.length());
            System.out.println("压缩后::"+compress(s).length);
            System.out.println("解压后:"+uncompress(compress(s)).length);
            System.out.println("解压字符串后::"+uncompressToString(compress(s)).length());
        }
    }
    

    看工具类的解析方法参数是byte[],操蛋地又去找数据源,看到反序列化配置

     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    

    当然是掀开裙子看啦,找到解析实现

    public String deserialize(String topic, byte[] data) {
            try {
                if (data == null)
                    return null;
                else
                    return new String(data, encoding);
            } catch (UnsupportedEncodingException e) {
                throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding);
            }
        }
    

    呐,就在这里啦,既然官方不给个GZIP的解析器,就自己写一个呗
    return new String(data, encoding)
    改成
    return GZIPUtils.uncompress(data);
    抽根烟,完工。

    相关文章

      网友评论

          本文标题:Kafka消费GZIP消息

          本文链接:https://www.haomeiwen.com/subject/zqjhtctx.html