美文网首页架构大数据
利用Canal投递MySQL Binlog到Kafka

利用Canal投递MySQL Binlog到Kafka

作者: LittleMagic | 来源:发表于2019-02-09 21:30 被阅读1266次

    Update:
    Canal与Camus的结合使用,见https://www.jianshu.com/p/4c4213385368

    Canal是阿里开源的一个比较有名的Java中间件,主要作用是接入数据库(MySQL)的binlog日志,实现数据的增量订阅、解析与消费,即CDC(Change Data Capture)。近期我们计划将数据仓库由基于Sqoop的离线按天入库方式改为近实时入库,Canal自然是非常符合需求的。
    Canal的模块设计精妙,但代码质量低,阅读起来比较困难。在其GitHub Wiki中详细叙述了其设计思路,值得学习,这里不再赘述,参见:https://github.com/alibaba/canal/wiki/Introduction
    在最新的Canal 1.1.x版本中,其新增了对消息队列的原生支持,通过不算复杂的配置可以直接将binlog投递到Kafka或者RocketMQ,无需再自己写producer程序(源码中有现成的CanalKafkaProducer和CanalRocketMQProducer类)。
    我们使用目前的稳定版本1.1.2小试一下。

    Canal最简单原理示意

    前置工作

    • 保证MySQL的binlog-format=ROW
    • 为canal用户配置MySQL slave的权限
    CREATE USER canal IDENTIFIED BY 'canal';  
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    FLUSH PRIVILEGES;
    

    canal.properties设置

    顺便还可以复习一下Kafka producer的一些配置参数含义。

    # 默认值tcp,这里改为投递到Kafka
    canal.serverMode = kafka
    # Kafka bootstrap.servers,可以不用写上全部的brokers
    canal.mq.servers = 10.10.99.132:9092,10.10.99.133:9092,10.10.99.134:9092,10.10.99.135:9092
    # 投递失败的重试次数,默认0,改为2
    canal.mq.retries = 2
    # Kafka batch.size,即producer一个微批次的大小,默认16K,这里加倍
    canal.mq.batchSize = 32768
    # Kafka max.request.size,即一个请求的最大大小,默认1M,这里也加倍
    canal.mq.maxRequestSize = 2097152
    # Kafka linger.ms,即sender线程在检查微批次是否就绪时的超时,默认0ms,这里改为200ms
    # 满足batch.size和linger.ms其中之一,就会发送消息
    canal.mq.lingerMs = 200
    # Kafka buffer.memory,缓存大小,默认32M
    canal.mq.bufferMemory = 33554432
    # 获取binlog数据的批次大小,默认50
    canal.mq.canalBatchSize = 50
    # 获取binlog数据的超时时间,默认200ms
    canal.mq.canalGetTimeout = 200
    # 是否将binlog转为JSON格式。如果为false,就是原生Protobuf格式
    canal.mq.flatMessage = true
    # 压缩类型,官方文档没有描述
    canal.mq.compressionType = none
    # Kafka acks,默认all,表示分区leader会等所有follower同步完才给producer发送ack
    # 0表示不等待ack,1表示leader写入完毕之后直接ack
    canal.mq.acks = all
    # Kafka消息投递是否使用事务
    # 主要针对flatMessage的异步发送和动态多topic消息投递进行事务控制来保持和Canal binlog位置的一致性
    # flatMessage模式下建议开启
    canal.mq.transaction = true
    

    instance.properties设置

    # 需要接入binlog的表名,支持正则,但这里手动指定了每张表,注意转义
    canal.instance.filter.regex=mall\\.address,mall\\.orders,mall\\.order_product,mall\\.product,mall\\.mall_category,mall\\.mall_comment,mall\\.mall_goods_category,mall\\.mall_goods_info,mall\\.mall_goods_wish,mall\\.mall_new_tags_v2,mall\\.mall_topic,mall\\.mall_topic_goods,mall\\.mall_user_cart_info
    # 黑名单
    canal.instance.filter.black.regex=
    # 消息队列对应topic名
    canal.mq.topic=binlog_mall_1
    # 发送到哪一个分区,由于下面用hash做分区,因此不设
    #canal.mq.partition=0
    # 根据正则表达式做动态topic,目前采用单topic,因此也不设
    #canal.mq.dynamicTopic=mall\\..*
    # 10个分区
    canal.mq.partitionsNum=10
    # 各个表的主键,依照主键来做hash分区
    canal.mq.partitionHash=mall.address:address_id,mall.orders:order_id,mall.order_product:order_product_id,mall.product:product_id,mall.mall_category:category_id,mall.mall_comment:comment_id,mall.mall_goods_category:goods_category_id,mall.mall_goods_info:goods_id,mall.mall_goods_wish:id,mall.mall_new_tags_v2:tags_id,mall.mall_topic:topic_id,mall.mall_topic_goods:id,mall.mall_user_cart_info:id
    

    上面的配置相当灵活,dynamicTopic选项可以控制单topic还是多topic,partitionHash选项可以控制单partition还是多partition。
    但是binlog是有序的,必须保证它进入到消息队列之后仍然有序。参照以上的配置,有以下几个方法:

    • 单topic单partition:可以严格保证与binlog相同的顺序,但效率比较低,TPS只有2~3K。
    • 多topic单partition:由于是按照表划分topic,因此可以保证表级别的有序性,但是每个表的热度不一样,对于热点表仍然会有性能问题。
    • 单/多topic多partition:按照给定的hash方法来划分partition,性能无疑是最好的。但必须要多加小心,每个表的hash依据都必须是其主键或者主键组。只有保证每表每主键binlog的顺序性,才能准确恢复变动数据。
      经过权衡,我们采用单topic多partition的方式来处理。还可以参考:https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart

    Kafka版本兼容性

    通过阅读Canal工程中的pom文件,得知它集成的Kafka版本为1.1.1,而我们的集群中,之前为了兼容一些老旧业务,采用的Kafka版本为0.8.2。起初我们做试验时,消息能够正常发送,但topic中始终没有任何消息。
    这是因为在0.10.2版本之前,Kafka只对客户端版本有向前兼容性,亦即高版本broker能够处理低版本client的请求,但低版本broker不能处理高版本client的请求。0.10.2版本提出了双向兼容性(bidirectional compatibility)改进,低版本broker与高版本client也能兼容了,但仍然对过时的0.8.x版本没有支持。
    鉴于1.1.1版本producer发送的消息不能被0.8.2版本的broker解析,后来我们索性将Kafka broker全部升级到了1.0.1(对应CDH Kafka版本为3.1.1,是目前最新的),兼容性问题就解决了。
    另外Kafka自带有命令行工具kafka-broker-api-versions.sh来检测broker支持的API版本,这里不表。

    Canal 1.1.2源码中的一处小bug

    一切配置好后,运行bin/startup.sh启动Canal,观察canal.log,发现疯狂报空指针异常,如下图所示。


    大量NPE

    通过仔细观察,发现对于类型为UPDATE的消息没有问题,但一旦触发INSERT就跪掉了。
    继续追根溯源,找到com.alibaba.otter.canal.protocol.FlatMessage类中,有一个messagePartition()方法,显然是做hash分区用的,其前半段源码如下,已经改正确了:

        public static FlatMessage[] messagePartition(FlatMessage flatMessage, Integer partitionsNum,
                                                     Map<String, String> pkHashConfig) {
            if (partitionsNum == null) {
                partitionsNum = 1;
            }
            FlatMessage[] partitionMessages = new FlatMessage[partitionsNum];
            String pk = pkHashConfig.get(flatMessage.getDatabase() + "." + flatMessage.getTable());
            if (pk == null || flatMessage.getIsDdl()) {
                partitionMessages[0] = flatMessage;
            } else {
                if (flatMessage.getData() != null) {
                    int idx = 0;
                    for (Map<String, String> row : flatMessage.getData()) {
                        String value = null;
                        if (flatMessage.getOld() != null) {
                            // [!]
                            Map<String, String> o = flatMessage.getOld().get(idx);
                            // String value;
                            // 如果old中有pk值说明主键有修改, 以旧的主键值hash为准
                            if (o != null && o.containsKey(pk)) {
                                value = o.get(pk);
                            }
                        }
                        if (value == null) {
                            value = row.get(pk);
                        }
                        if (value == null) {
                            value = "";
                        }
                        int hash = value.hashCode();
                        int pkHash = Math.abs(hash) % partitionsNum;
    ................
    

    注意上面代码中打[!]标记的地方,原有的代码根本没有对flatMessage.getOld()的结果做空校验,而INSERT操作恰好又没有变动之前的记录信息,自然就会产生NPE了。对于当前一个稳定版本release而言,代码中出现低级错误实属不该。
    修正这个bug之后,将canal.protocol模块重新打成jar包,替换掉原有deployer包中的同名文件,问题解决。

    附上Canal内部Kafka producer类的实现源码

    从中可以看出,producer还没有启用事务性,也就是说上面的canal.mq.transactions配置项其实是无效的。

    public class CanalKafkaProducer implements CanalMQProducer {
        private static final Logger       logger = LoggerFactory.getLogger(CanalKafkaProducer.class);
        private Producer<String, Message> producer;
        private Producer<String, String>  producer2;                                                 // 用于扁平message的数据投递
        private MQProperties              kafkaProperties;
    
        @Override
        public void init(MQProperties kafkaProperties) {
            this.kafkaProperties = kafkaProperties;
            Properties properties = new Properties();
            properties.put("bootstrap.servers", kafkaProperties.getServers());
            properties.put("acks", kafkaProperties.getAcks());
            properties.put("compression.type", kafkaProperties.getCompressionType());
            properties.put("retries", kafkaProperties.getRetries());
            properties.put("batch.size", kafkaProperties.getBatchSize());
            properties.put("linger.ms", kafkaProperties.getLingerMs());
            properties.put("max.request.size", kafkaProperties.getMaxRequestSize());
            properties.put("buffer.memory", kafkaProperties.getBufferMemory());
            properties.put("key.serializer", StringSerializer.class.getName());
            if (!kafkaProperties.getFlatMessage()) {
                properties.put("value.serializer", MessageSerializer.class.getName());
                producer = new KafkaProducer<String, Message>(properties);
            } else {
                properties.put("value.serializer", StringSerializer.class.getName());
                producer2 = new KafkaProducer<String, String>(properties);
            }
    
            // producer.initTransactions();
        }
    
        @Override
        public void stop() {
            try {
                logger.info("## stop the kafka producer");
                if (producer != null) {
                    producer.close();
                }
                if (producer2 != null) {
                    producer2.close();
                }
            } catch (Throwable e) {
                logger.warn("##something goes wrong when stopping kafka producer:", e);
            } finally {
                logger.info("## kafka producer is down.");
            }
        }
    
        @Override
        public void send(MQProperties.CanalDestination canalDestination, Message message, Callback callback) {
    
            // producer.beginTransaction();
            if (!kafkaProperties.getFlatMessage()) {
                try {
                    ProducerRecord<String, Message> record;
                    if (canalDestination.getPartition() != null) {
                        record = new ProducerRecord<String, Message>(canalDestination.getTopic(),
                            canalDestination.getPartition(),
                            null,
                            message);
                    } else {
                        record = new ProducerRecord<String, Message>(canalDestination.getTopic(), 0, null, message);
                    }
    
                    producer.send(record).get();
    
                    if (logger.isDebugEnabled()) {
                        logger.debug("Send  message to kafka topic: [{}], packet: {}",
                            canalDestination.getTopic(),
                            message.toString());
                    }
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                    // producer.abortTransaction();
                    callback.rollback();
                    return;
                }
            } else {
                // 发送扁平数据json
                List<FlatMessage> flatMessages = FlatMessage.messageConverter(message);
                if (flatMessages != null) {
                    for (FlatMessage flatMessage : flatMessages) {
                        if (canalDestination.getPartition() != null) {
                            try {
                                ProducerRecord<String, String> record = new ProducerRecord<String, String>(
                                    canalDestination.getTopic(),
                                    canalDestination.getPartition(),
                                    null,
                                    JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue));
                                producer2.send(record).get();
                            } catch (Exception e) {
                                logger.error(e.getMessage(), e);
                                // producer.abortTransaction();
                                callback.rollback();
                                return;
                            }
                        } else {
                            if (canalDestination.getPartitionHash() != null
                                && !canalDestination.getPartitionHash().isEmpty()) {
                                FlatMessage[] partitionFlatMessage = FlatMessage.messagePartition(flatMessage,
                                    canalDestination.getPartitionsNum(),
                                    canalDestination.getPartitionHash());
                                int length = partitionFlatMessage.length;
                                for (int i = 0; i < length; i++) {
                                    FlatMessage flatMessagePart = partitionFlatMessage[i];
                                    if (flatMessagePart != null) {
                                        try {
                                            ProducerRecord<String, String> record = new ProducerRecord<String, String>(
                                                canalDestination.getTopic(),
                                                i,
                                                null,
                                                JSON.toJSONString(flatMessagePart, SerializerFeature.WriteMapNullValue));
                                            producer2.send(record).get();
                                        } catch (Exception e) {
                                            logger.error(e.getMessage(), e);
                                            // producer.abortTransaction();
                                            callback.rollback();
                                            return;
                                        }
                                    }
                                }
                            } else {
                                try {
                                    ProducerRecord<String, String> record = new ProducerRecord<String, String>(
                                        canalDestination.getTopic(),
                                        0,
                                        null,
                                        JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue));
                                    producer2.send(record).get();
                                } catch (Exception e) {
                                    logger.error(e.getMessage(), e);
                                    // producer.abortTransaction();
                                    callback.rollback();
                                    return;
                                }
                            }
                        }
                        if (logger.isDebugEnabled()) {
                            logger.debug("Send flat message to kafka topic: [{}], packet: {}",
                                canalDestination.getTopic(),
                                JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue));
                        }
                    }
                }
            }
    
            // producer.commitTransaction();
            callback.commit();
        }
    }
    

    相关文章

      网友评论

        本文标题:利用Canal投递MySQL Binlog到Kafka

        本文链接:https://www.haomeiwen.com/subject/vrfzsqtx.html