美文网首页
filebeat采集日志到Kafka,Java消费Kafka,转

filebeat采集日志到Kafka,Java消费Kafka,转

作者: 莫夏_b560 | 来源:发表于2019-11-13 20:56 被阅读0次

1、采集到的日志通过kafka可以确定的是json格式,序列化不存在问题

2、kafka消费配置问题排查:发现消费的反序列化是StringDeserializer,所以在转换JSONObject的时候一直报错。

备注:序列化

public class JsonSerialize implements Serializer<JSONObject> {

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
    }

    @Override
    public byte[] serialize(String topic, JSONObject data) {
        try {
            return data.toString().getBytes("UTF-8");
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
            return null;
        }
    }

    @Override
    public void close() {
    }
}

反序列化

public class JsonDeserialize implements Deserializer<JSONObject> {

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
    }

    @Override
    public JSONObject deserialize(String topic, byte[] data) {
        JSONObject obj = null;
        try {
            obj = new JSONObject(new String(data,"UTF-8"));
        } catch (JSONException e) {
            e.printStackTrace();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        return obj;
    }

    @Override
    public void close() {
    }
}

相关文章

网友评论

      本文标题:filebeat采集日志到Kafka,Java消费Kafka,转

      本文链接:https://www.haomeiwen.com/subject/brikictx.html