美文网首页
Canal-1.1.4基于Kafka的单条消息投递

Canal-1.1.4基于Kafka的单条消息投递

作者: 小胡子哥灬 | 来源:发表于2020-06-30 15:44 被阅读0次

    前言: 我把Canal的原生kafka消息投递由多条改成单条投递,完全是为了方便自己工作中的处理,这个方式并不适用于所有业务,比如我是基于FlinkSql去接收canal的消息,然后做相应处理,多条投递非常不方便,所以改了下源码,变成单条投递, 顺便改了下消息体,以适配之前用tcp的client模式拉取消息。

    canal源码下载地址: https://github.com/alibaba/canal/archive/canal-1.1.4.tar.gz

    主要改动了两个模块:canal-protocol和canal-server

    1. protocol模块:
      改动不多,加了个FlatMessageNew的类,目的是为了不污染原来的代码:
    public class FlatMessageNew implements Serializable {
    
        private static final long         serialVersionUID = -3386650678735860050L;
        private long                      id;
        private String                    dbname;
        private String                    tablename;
        private String                    op;
        // binlog executeTime
        private Long                      binlogTime;
        // dml build timeStamp
        private Long                      sysExecuteTime;
       //版本号,用于唯一标识一条binlog记录,后面会有专门的一篇文章写,如何加版本号
        private Long                      version;
        private String                    sql;
        private List<String>              keyFields;
        private Boolean                   isDdl;
        private Map<String, String> data;
        private Map<String, String> beforeChange;
        private Set<String> updateFields;
    
        public FlatMessageNew() {
        }
    
        public FlatMessageNew(long id){
            this.id = id;
        }
    // getter  setter
    }
    
    1. server模块
      因为我主要用的是kafka投递,所以只改了kafka。一共两个类:
      • CanalKafkaProducer.java的private void send() 方法,把用到FlatMessage的地方换成FlatMessageNew
    if() {
      .....
    } else {
                // 发送扁平数据json
                List<FlatMessageNew> flatMessages = MQMessageUtils.messageConverterNew(message, idWorker);
    //            List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(message);
                List<ProducerRecord> records = new ArrayList<ProducerRecord>();
                if (flatMessages != null) {
                    for (FlatMessageNew flatMessage : flatMessages) {
                        if (canalDestination.getPartitionHash() != null && !canalDestination.getPartitionHash().isEmpty()) {
                            //返回分区号
                            int partition = MQMessageUtils.messagePartitionNew(flatMessage, canalDestination.getPartitionsNum(), canalDestination.getPartitionHash());
                            records.add(new ProducerRecord<String, String>(topicName,
                                partition,
                                null,
                                JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue)));
                        } else {
                            final int partition = canalDestination.getPartition() != null ? canalDestination.getPartition() : 0;
                            records.add(new ProducerRecord<String, String>(topicName,
                                partition,
                                null,
                                JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue)));
                        }
    
                        // 每条记录需要flush
                        produce(topicName, records, true);
                        records.clear();
                    }
                }
            }
    
    • MQMessageUtils.messageConverterNew方法
    /**
         * 将Message转换为FlatMessageNew-主要是适配之前用tcp模式时的格式是一条一条发。
         * @param message 原生message
         * @return FlatMessage列表
         */
        public static List<FlatMessageNew> messageConverterNew(Message message) {
            try {
                if (message == null) {
                    return null;
                }
    
                List<FlatMessageNew> flatMessages = new ArrayList<>();
                List<CanalEntry.Entry> entrys = null;
                if (message.isRaw()) {
                    List<ByteString> rawEntries = message.getRawEntries();
                    entrys = new ArrayList<CanalEntry.Entry>(rawEntries.size());
                    for (ByteString byteString : rawEntries) {
                        CanalEntry.Entry entry = CanalEntry.Entry.parseFrom(byteString);
                        entrys.add(entry);
                    }
                } else {
                    entrys = message.getEntries();
                }
    
                for (CanalEntry.Entry entry : entrys) {
                    if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
                        || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                        continue;
                    }
    
                    CanalEntry.RowChange rowChange;
                    try {
                        rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                    } catch (Exception e) {
                        throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:"
                            + entry.toString(), e);
                    }
    
                    CanalEntry.EventType eventType = rowChange.getEventType();
    
                    if (!rowChange.getIsDdl()) {
                        //不需要sqltype
    //                    Map<String, Integer> sqlType = new LinkedHashMap<>();
    //                    Map<String, String> mysqlType = new LinkedHashMap<>();
                        Set<String> updateSet = new HashSet<>();
                        boolean hasInitPkNames = false;
                        for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                            FlatMessageNew flatMessage = new FlatMessageNew(message.getId());
                            flatMessages.add(flatMessage);
                            flatMessage.setDbname(entry.getHeader().getSchemaName());
                            flatMessage.setTablename(entry.getHeader().getTableName());
                            flatMessage.setIsDdl(rowChange.getIsDdl());
                            flatMessage.setOp(eventType.toString());
                            flatMessage.setBinlogTime(entry.getHeader().getExecuteTime());
                            flatMessage.setSysExecuteTime(System.currentTimeMillis());
                            // idworker
                            // flatMessage.setVersion(idWorker.nextId());
                            flatMessage.setSql(rowChange.getSql());
    
                            if (eventType != CanalEntry.EventType.INSERT && eventType != CanalEntry.EventType.UPDATE
                                && eventType != CanalEntry.EventType.DELETE) {
                                continue;
                            }
    //                        Map<String, String> data = new LinkedHashMap<>();
    
    //                        Map<String, String> old = new LinkedHashMap<>();
                            Map<String, String> row = new LinkedHashMap<>();
                            List<CanalEntry.Column> columns;
    
                            if (eventType == CanalEntry.EventType.DELETE) {
                                columns = rowData.getBeforeColumnsList();
                            } else {
                                columns = rowData.getAfterColumnsList();
                            }
    
                            for (CanalEntry.Column column : columns) {
                                if (!hasInitPkNames && column.getIsKey()) {
                                    flatMessage.addPkName(column.getName());
                                }
                                if (column.getIsNull()) {
                                    row.put(column.getName(), null);
                                } else {
                                    row.put(column.getName(), column.getValue());
                                }
                                // 获取update为true的字段
                                if (column.getUpdated()) {
                                    updateSet.add(column.getName());
                                }
                            }
    
                            hasInitPkNames = true;
                            if (!row.isEmpty()) {
                                flatMessage.setData(row);
                            }
    
                            if (eventType == CanalEntry.EventType.UPDATE) {
                                Map<String, String> rowOld = new LinkedHashMap<>();
                                for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
                                    if (updateSet.contains(column.getName())) {
                                        if (column.getIsNull()) {
                                            rowOld.put(column.getName(), null);
                                        } else {
                                            rowOld.put(column.getName(), column.getValue());
                                        }
                                    }
                                }
                                // update操作将记录修改前的值
                                if (!rowOld.isEmpty()) {
                                    flatMessage.setBeforeChange(rowOld);
                                }
                                flatMessage.setUpdateFields(updateSet);
                            }
                        }
    
                    }
                }
                return flatMessages;
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    
    • MQMessageUtils.messagePartitionNew方法
    /**
         * 将FlatMessage按指定的字段值hash拆分
         *
         * @param flatMessage flatMessage
         * @param partitionsNum 分区数量
         * @param pkHashConfigs hash映射
         * @return 拆分后的flatMessage数组
         */
        public static int messagePartitionNew(FlatMessageNew flatMessage, Integer partitionsNum, String pkHashConfigs) {
            if (partitionsNum == null) {
                partitionsNum = 1;
            }
            int partition;
    
            if (flatMessage.getIsDdl()) {
                partition = 0;
            } else {
                if (flatMessage.getData() != null && !flatMessage.getData().isEmpty()) {
                    String database = flatMessage.getDbname();
                    String table = flatMessage.getTablename();
                    HashMode hashMode = getPartitionHashColumns(database + "." + table, pkHashConfigs);
                    if (hashMode == null) {
                        // 如果都没有匹配,发送到第一个分区
                        partition = 0;
                    } else if (hashMode.tableHash) {
                        int hashCode = table.hashCode();
                        int pkHash = Math.abs(hashCode) % partitionsNum;
                        // math.abs可能返回负值,这里再取反,把出现负值的数据还是写到固定的分区,仍然可以保证消费顺序
                        partition = Math.abs(pkHash);
    //                    partitionMessages[pkHash] = flatMessage;
                    } else {
                        List<String> pkNames = hashMode.pkNames;
                        if (hashMode.autoPkHash) {
                            pkNames = flatMessage.getKeyFields();
                        }
    
                        int idx = 0;
                        Map<String, String> row = flatMessage.getData();
                        int hashCode = database.hashCode();
                        if (pkNames != null) {
                            for (String pkName : pkNames) {
                                String value = row.get(pkName);
                                if (value == null) {
                                    value = "";
                                }
                                hashCode = hashCode ^ value.hashCode();
                            }
                        }
    
                        int pkHash = Math.abs(hashCode) % partitionsNum;
                        // math.abs可能返回负值,这里再取反,把出现负值的数据还是写到固定的分区,仍然可以保证消费顺序
                        partition = Math.abs(pkHash);
                    }
                } else {
                    // 针对stmt/mixed binlog格式的query事件
                    partition = 0;
                }
            }
            return partition;
        }
    
    1. 重新打包server和protocol模块,覆盖原来的jar就可以使用了。

    The End


    相关文章

      网友评论

          本文标题:Canal-1.1.4基于Kafka的单条消息投递

          本文链接:https://www.haomeiwen.com/subject/bpjxqktx.html