目前常用canal 来订阅mysql 的binlog 然后通过kafka 等消息中间件将记录发送出去,供后续其他应用通过kafka 消费;
在实际应用过程中发现当
canal.mq.flatMessage 配置为false 时,消费端拿到的数据是乱码,原因是
canal.mq.flatMessage 这个配置是canal 会不会将数据以json格式发送,如果false,对应MQ收到的消息为protobuf格式
因此解决方案有2个
- canal.mq.flatMessage 配置改为true;
但是此时需要注意,官方说明 该配置为false 时,相对为true 时,性能有30%-60% 的提升
https://github.com/alibaba/canal/wiki/Canal-MQ-Performance
2.不修改canal.mq.flatMessage 的配置,去canal 源码中查看是怎么处理该数据格式的。
答案在
com.alibaba.otter.canal.connector.kafka.consumer.CanalKafkaConsumer 这个类中。
这个方法在消费数据的时候,判断如果canal.mq.flatMessage 为false ,会调用
MessageUtil.convert(record.value()) 来进行数据格式的转化,
那解决方法就很简单了,把这个工具类copy到自己项目中(连同涉及到的依赖)
maven 依赖
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.9.4</version>
</dependency>
类
MessageUtil.java
CanalMessageSerializerUtil.java
KafkaMessageDeserializer.java
CommonMessage.java
DateUtil.java
JdbcTypeUtil.java
TimeZone.java
网友评论