前言: 我把Canal的原生kafka消息投递由多条改成单条投递,完全是为了方便自己工作中的处理,这个方式并不适用于所有业务,比如我是基于FlinkSql去接收canal的消息,然后做相应处理,多条投递非常不方便,所以改了下源码,变成单条投递, 顺便改了下消息体,以适配之前用tcp的client模式拉取消息。
canal源码下载地址: https://github.com/alibaba/canal/archive/canal-1.1.4.tar.gz
主要改动了两个模块:canal-protocol和canal-server
- protocol模块:
改动不多,加了个FlatMessageNew的类,目的是为了不污染原来的代码:
public class FlatMessageNew implements Serializable {
private static final long serialVersionUID = -3386650678735860050L;
private long id;
private String dbname;
private String tablename;
private String op;
// binlog executeTime
private Long binlogTime;
// dml build timeStamp
private Long sysExecuteTime;
//版本号,用于唯一标识一条binlog记录,后面会有专门的一篇文章写,如何加版本号
private Long version;
private String sql;
private List<String> keyFields;
private Boolean isDdl;
private Map<String, String> data;
private Map<String, String> beforeChange;
private Set<String> updateFields;
public FlatMessageNew() {
}
public FlatMessageNew(long id){
this.id = id;
}
// getter setter
}
- server模块
因为我主要用的是kafka投递,所以只改了kafka。一共两个类:- CanalKafkaProducer.java的
private void send()
方法,把用到FlatMessage的地方换成FlatMessageNew
- CanalKafkaProducer.java的
if() {
.....
} else {
// 发送扁平数据json
List<FlatMessageNew> flatMessages = MQMessageUtils.messageConverterNew(message, idWorker);
// List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(message);
List<ProducerRecord> records = new ArrayList<ProducerRecord>();
if (flatMessages != null) {
for (FlatMessageNew flatMessage : flatMessages) {
if (canalDestination.getPartitionHash() != null && !canalDestination.getPartitionHash().isEmpty()) {
//返回分区号
int partition = MQMessageUtils.messagePartitionNew(flatMessage, canalDestination.getPartitionsNum(), canalDestination.getPartitionHash());
records.add(new ProducerRecord<String, String>(topicName,
partition,
null,
JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue)));
} else {
final int partition = canalDestination.getPartition() != null ? canalDestination.getPartition() : 0;
records.add(new ProducerRecord<String, String>(topicName,
partition,
null,
JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue)));
}
// 每条记录需要flush
produce(topicName, records, true);
records.clear();
}
}
}
-
MQMessageUtils.messageConverterNew
方法
/**
* 将Message转换为FlatMessageNew-主要是适配之前用tcp模式时的格式是一条一条发。
* @param message 原生message
* @return FlatMessage列表
*/
public static List<FlatMessageNew> messageConverterNew(Message message) {
try {
if (message == null) {
return null;
}
List<FlatMessageNew> flatMessages = new ArrayList<>();
List<CanalEntry.Entry> entrys = null;
if (message.isRaw()) {
List<ByteString> rawEntries = message.getRawEntries();
entrys = new ArrayList<CanalEntry.Entry>(rawEntries.size());
for (ByteString byteString : rawEntries) {
CanalEntry.Entry entry = CanalEntry.Entry.parseFrom(byteString);
entrys.add(entry);
}
} else {
entrys = message.getEntries();
}
for (CanalEntry.Entry entry : entrys) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
|| entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
CanalEntry.RowChange rowChange;
try {
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:"
+ entry.toString(), e);
}
CanalEntry.EventType eventType = rowChange.getEventType();
if (!rowChange.getIsDdl()) {
//不需要sqltype
// Map<String, Integer> sqlType = new LinkedHashMap<>();
// Map<String, String> mysqlType = new LinkedHashMap<>();
Set<String> updateSet = new HashSet<>();
boolean hasInitPkNames = false;
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
FlatMessageNew flatMessage = new FlatMessageNew(message.getId());
flatMessages.add(flatMessage);
flatMessage.setDbname(entry.getHeader().getSchemaName());
flatMessage.setTablename(entry.getHeader().getTableName());
flatMessage.setIsDdl(rowChange.getIsDdl());
flatMessage.setOp(eventType.toString());
flatMessage.setBinlogTime(entry.getHeader().getExecuteTime());
flatMessage.setSysExecuteTime(System.currentTimeMillis());
// idworker
// flatMessage.setVersion(idWorker.nextId());
flatMessage.setSql(rowChange.getSql());
if (eventType != CanalEntry.EventType.INSERT && eventType != CanalEntry.EventType.UPDATE
&& eventType != CanalEntry.EventType.DELETE) {
continue;
}
// Map<String, String> data = new LinkedHashMap<>();
// Map<String, String> old = new LinkedHashMap<>();
Map<String, String> row = new LinkedHashMap<>();
List<CanalEntry.Column> columns;
if (eventType == CanalEntry.EventType.DELETE) {
columns = rowData.getBeforeColumnsList();
} else {
columns = rowData.getAfterColumnsList();
}
for (CanalEntry.Column column : columns) {
if (!hasInitPkNames && column.getIsKey()) {
flatMessage.addPkName(column.getName());
}
if (column.getIsNull()) {
row.put(column.getName(), null);
} else {
row.put(column.getName(), column.getValue());
}
// 获取update为true的字段
if (column.getUpdated()) {
updateSet.add(column.getName());
}
}
hasInitPkNames = true;
if (!row.isEmpty()) {
flatMessage.setData(row);
}
if (eventType == CanalEntry.EventType.UPDATE) {
Map<String, String> rowOld = new LinkedHashMap<>();
for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
if (updateSet.contains(column.getName())) {
if (column.getIsNull()) {
rowOld.put(column.getName(), null);
} else {
rowOld.put(column.getName(), column.getValue());
}
}
}
// update操作将记录修改前的值
if (!rowOld.isEmpty()) {
flatMessage.setBeforeChange(rowOld);
}
flatMessage.setUpdateFields(updateSet);
}
}
}
}
return flatMessages;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
-
MQMessageUtils.messagePartitionNew
方法
/**
* 将FlatMessage按指定的字段值hash拆分
*
* @param flatMessage flatMessage
* @param partitionsNum 分区数量
* @param pkHashConfigs hash映射
* @return 拆分后的flatMessage数组
*/
public static int messagePartitionNew(FlatMessageNew flatMessage, Integer partitionsNum, String pkHashConfigs) {
if (partitionsNum == null) {
partitionsNum = 1;
}
int partition;
if (flatMessage.getIsDdl()) {
partition = 0;
} else {
if (flatMessage.getData() != null && !flatMessage.getData().isEmpty()) {
String database = flatMessage.getDbname();
String table = flatMessage.getTablename();
HashMode hashMode = getPartitionHashColumns(database + "." + table, pkHashConfigs);
if (hashMode == null) {
// 如果都没有匹配,发送到第一个分区
partition = 0;
} else if (hashMode.tableHash) {
int hashCode = table.hashCode();
int pkHash = Math.abs(hashCode) % partitionsNum;
// math.abs可能返回负值,这里再取反,把出现负值的数据还是写到固定的分区,仍然可以保证消费顺序
partition = Math.abs(pkHash);
// partitionMessages[pkHash] = flatMessage;
} else {
List<String> pkNames = hashMode.pkNames;
if (hashMode.autoPkHash) {
pkNames = flatMessage.getKeyFields();
}
int idx = 0;
Map<String, String> row = flatMessage.getData();
int hashCode = database.hashCode();
if (pkNames != null) {
for (String pkName : pkNames) {
String value = row.get(pkName);
if (value == null) {
value = "";
}
hashCode = hashCode ^ value.hashCode();
}
}
int pkHash = Math.abs(hashCode) % partitionsNum;
// math.abs可能返回负值,这里再取反,把出现负值的数据还是写到固定的分区,仍然可以保证消费顺序
partition = Math.abs(pkHash);
}
} else {
// 针对stmt/mixed binlog格式的query事件
partition = 0;
}
}
return partition;
}
- 重新打包server和protocol模块,覆盖原来的jar就可以使用了。
The End
网友评论