上文提到在使用canal同步数据库数据时,发现kafka报消息体过大的错,当时只是调大了kafka的max.message.bytes参数以及canal的相应参数进行简单的处理。但是当时根据业务与监听的表觉得不会出现这么大的消息体,于是深入分析原因。
查看kafka接收记录
通过 bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic test 命令模拟消费线上数据,发现出现我们的canal.instance.filter.regex白名单配置根本没生效,我们收到了未配置的表数据,继续分析数据
{"data":null,"database":"","es":1583405669000,"id":15601,"isDdl":false,"mysqlType":null,"old":null,"pkNames":null,"sql":"update TABLE_STAT set FILE_SIZE=FILE_SIZE+0, FILE_COUNT=FILE_COUNT+0, DELETE_COUNT=DELETE_COUNT+0, UPDATE_COUNT=UPDATE_COUNT+1, INSERT_COUNT=INSERT_COUNT+0, GMT_MODIFIED=now() WHERE PIPELINE_ID=7 and DATA_MEDIA_PAIR_ID=56","sqlType":null,"table":"TABLE_STAT","ts":1583405670037,"type":"QUERY"}
可以看到这条数据的type是QUERY,对应的是数据库的 binlog_rows_query_log_events 事件(我们由于其他业务开启了该事件),该事件可以获取sql,于是我们看canal源码该事件对应的parse处理
private Entry parseRowsQueryEvent(RowsQueryLogEvent event) {
// mysql5.6支持,需要设置binlog-rows-query-log-events=1,可详细打印原始DML语句
String queryString = null;
try { queryString = new String(event.getRowsQuery().getBytes(ISO_8859_1), charset.name());
String tableName = null;
if (useDruidDdlFilter) {
List<DdlResult> results = DruidDdlParser.parse(queryString, null);
if (results.size() > 0) {
tableName = results.get(0).getTableName();
}
}
return buildQueryEntry(queryString, event.getHeader(), tableName);
} catch (UnsupportedEncodingException e) { t
hrow new CanalParseException(e);
}
}
可以看到该事件canal的处理没有做任何过滤,猜想原因是因为获取不到库名所以不能根据白名单过滤,所以白名单过滤了数据变化的时间,但是这个事件的数据依然能够通过canal发送,导致我们一些不想监听的一些业务表数据通过kafka发送,且这些信息带有sql信息,一些sql过大导致了一些大数据。
解决方案
其实当我们的业务不需要sql信息的时候我们可以将源码对应的时间注释掉,然后重新打包发布。
注释代码
case LogEvent.ROWS_QUERY_LOG_EVENT:
return parseRowsQueryEvent((RowsQueryLogEvent) logEvent);
网友评论