美文网首页MySQLali开源组件
阿里开源Canal--④投递数据到Kafka

阿里开源Canal--④投递数据到Kafka

作者: 撸码小丑 | 来源:发表于2019-02-15 17:40 被阅读24次

    基本说明

    canal 1.1.1版本之后, 默认支持将canal server接收到的binlog数据直接投递到MQ, 目前默认支持的MQ系统有:

    1.配置修改

    基于前面第二章节内容我们搭建的canal server(Billow自编译版本为1.1.3,支持动态Topic配置),我们在instance的配置文件中做配置的修改。

    1.1.修改instance 配置文件

    vi conf/example/instance.properties
    
    #  按需修改成自己的数据库信息
    #################################################
    ...
    canal.instance.master.address=192.168.1.20:3306
    # username/password,数据库的用户名和密码
    ...
    canal.instance.dbUsername = canal
    canal.instance.dbPassword = canal
    ...
    # mq config
    canal.mq.topic=example
    # 针对库名或者表名发送动态topic
    #canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..*
    canal.mq.partition=0
    # hash partition config
    #canal.mq.partitionsNum=3
    #库名.表名: 唯一主键,多个表之间用逗号分隔
    #canal.mq.partitionHash=mytest.person:id,mytest.role:id
    #################################################
    

    对应ip 地址的MySQL 数据库需进行相关初始化与设置, 可参考前面Billow发的文章:

    dynamicTopic规则: 表达式如果只有库名则匹配库名的数据都会发送到对应名称topic, 如果是库名.表名则匹配的数据会发送到以'库名_表名'为名称的topic。如要指定topic名称,则可以配置:

    canal.mq.dynamicTopic=examp2:.*;exmaple3:mytest\\..*,mytest2\\..*;example4:mytest3.user
    
    

    以topic名 ':' 正则规则作为配置, 多个topic配置之间以 ';'隔开, message会发送到所有符合规则的topic

    1.2.修改canal 配置文件

    vi /usr/local/canal/conf/canal.properties
    
    # ...
    # 可选项: tcp(默认), kafka, RocketMQ
    canal.serverMode = kafka
    # ...
    # kafka/rocketmq 集群配置: 192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092 
    canal.mq.servers = 127.0.0.1:6667
    canal.mq.retries = 0
    canal.mq.batchSize = 16384
    canal.mq.maxRequestSize = 1048576
    canal.mq.lingerMs = 1
    canal.mq.bufferMemory = 33554432
    # Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)
    canal.mq.canalBatchSize = 50
    # Canal get数据的超时时间, 单位: 毫秒, 空为不限超时
    canal.mq.canalGetTimeout = 100
    # 是否为flat json格式对象
    canal.mq.flatMessage = true
    canal.mq.compressionType = none
    canal.mq.acks = all
    # kafka消息投递是否使用事务
    canal.mq.transaction = false
    

    mq相关参数说明


    canal.mq.dynamicTopic 表达式说明

    canal 1.1.3版本之后, 支持配置格式:schema 或 schema.table,多个配置之间使用逗号分隔

    • 例子1:test\.test 指定匹配的单表,发送到以 test_test为名字的topic上
    • 例子2:.\..* 匹配所有表,每个表都会发送到各自表名的topic上
    • 例子3:test 指定匹配对应的库,一个库的所有表都会发送到库名的topic上
    • 例子4:test\.* 指定匹配的表达式,针对匹配的表会发送到各自表名的topic上
    • 例子5:test,test1\.test1,指定多个表达式,会将test库的表都发送到test的topic上,test1\.test1的表发送到对应的test1_test1 topic上,其余的表发送到默认的canal.mq.topic值

    支持指定topic名称匹配, 配置格式:topicName:schema 或 schema.table,多个配置之间使用逗号分隔, 多组之间使用 ; 分隔

    • 例子:test:test,test1\.test1;test2:test2,test3\.test1 针对匹配的表会发送到指定的topic上

    大家可以结合自己的业务需求,设置匹配规则,建议MQ开启自动创建topic的能力

    表达式说明

    canal 1.1.3版本之后, 支持配置格式:schema.table:pk1^pk2,多个配置之间使用逗号分隔

    • 例子1:test\.test:pk1^pk2 指定匹配的单表,对应的hash字段为pk1 + pk2
    • 例子2:.\..*:id 正则匹配,指定所有正则匹配的表对应的hash字段为id
    • 例子3:.\..*:pk 正则匹配,指定所有正则匹配的表对应的hash字段为表主键(自动查找)
    • 例子4: 匹配规则啥都不写,则默认发到0这个partition上
    • 例子5:.\..* ,不指定pk信息的正则匹配,将所有正则匹配的表,对应的hash字段为表名
      • 按表hash: 一张表的所有数据可以发到同一个分区,不同表之间会做散列 (会有热点表分区过大问题)
    • 例子6: test\.test:id,.\..* , 针对test的表按照id散列,其余的表按照table散列

    注意:大家可以结合自己的业务需求,设置匹配规则,多条匹配规则之间是按照顺序进行匹配(命中一条规则就返回)

    mq顺序性问题

    binlog本身是有序的,写入到mq之后如何保障顺序是很多人会比较关注,在issue里也有非常多人咨询了类似的问题,这里做一个统一的解答

    1. canal目前选择支持的kafka/rocketmq,本质上都是基于本地文件的方式来支持了分区级的顺序消息的能力,也就是binlog写入mq是可以有一些顺序性保障,这个取决于用户的一些参数选择
    2. canal支持MQ数据的几种路由方式:单topic单分区,单topic多分区、多topic单分区、多topic多分区
    • canal.mq.dynamicTopic,主要控制是否是单topic还是多topic,针对命中条件的表可以发到表名对应的topic、库名对应的topic、默认topic name
    • canal.mq.partitionsNum、canal.mq.partitionHash,主要控制是否多分区以及分区的partition的路由计算,针对命中条件的可以做到按表级做分区、pk级做分区等
    1. canal的消费顺序性,主要取决于描述2中的路由选择,举例说明:
    • 单topic单分区,可以严格保证和binlog一样的顺序性,缺点就是性能比较慢,单分区的性能写入大概在2~3k的TPS
    • 多topic单分区,可以保证表级别的顺序性,一张表或者一个库的所有数据都写入到一个topic的单分区中,可以保证有序性,针对热点表也存在写入分区的性能问题
    • 单topic、多topic的多分区,如果用户选择的是指定table的方式,那和第二部分一样,保障的是表级别的顺序性(存在热点表写入分区的性能问题),如果用户选择的是指定pk hash的方式,那只能保障的是一个pk的多次binlog顺序性 ** pk hash的方式需要业务权衡,这里性能会最好,但如果业务上有pk变更或者对多pk数据有顺序性依赖,就会产生业务处理错乱的情况. 如果有pk变更,pk变更前和变更后的值会落在不同的分区里,业务消费就会有先后顺序的问题,需要注意

    1.4.启动

    cd /usr/local/canal/
    sh bin/startup.sh
    
    

    1.5.查看日志

    a.查看 logs/canal/canal.log

    vi logs/canal/canal.log
    
    

    b. 查看instance的日志:

    vi logs/example/example.log
    
    

    1.6 关闭

    cd /usr/local/canal/
    sh bin/stop.sh
    
    

    1.7.MQ数据消费

    canal源码中有实例代码;如下

    public class CanalKafkaClientExample {
    
        protected final static Logger           logger  = LoggerFactory.getLogger(CanalKafkaClientExample.class);
    
        private KafkaCanalConnector             connector;
    
        private static volatile boolean         running = false;
    
        private Thread                          thread  = null;
    
        private Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
    
                                                            public void uncaughtException(Thread t, Throwable e) {
                                                                logger.error("parse events has an error", e);
                                                            }
                                                        };
    
        public CanalKafkaClientExample(String zkServers, String servers, String topic, Integer partition, String groupId){
            connector = new KafkaCanalConnector(servers, topic, partition, groupId, null, false);
        }
    
        public static void main(String[] args) {
            try {
                final CanalKafkaClientExample kafkaCanalClientExample = new CanalKafkaClientExample(
                    AbstractKafkaTest.zkServers,
                    AbstractKafkaTest.servers,
                    AbstractKafkaTest.topic,
                    AbstractKafkaTest.partition,
                    AbstractKafkaTest.groupId);
                logger.info("## start the kafka consumer: {}-{}", AbstractKafkaTest.topic, AbstractKafkaTest.groupId);
                kafkaCanalClientExample.start();
                logger.info("## the canal kafka consumer is running now ......");
                Runtime.getRuntime().addShutdownHook(new Thread() {
    
                    public void run() {
                        try {
                            logger.info("## stop the kafka consumer");
                            kafkaCanalClientExample.stop();
                        } catch (Throwable e) {
                            logger.warn("##something goes wrong when stopping kafka consumer:", e);
                        } finally {
                            logger.info("## kafka consumer is down.");
                        }
                    }
    
                });
                while (running)
                    ;
            } catch (Throwable e) {
                logger.error("## Something goes wrong when starting up the kafka consumer:", e);
                System.exit(0);
            }
        }
    
        public void start() {
            Assert.notNull(connector, "connector is null");
            thread = new Thread(new Runnable() {
    
                public void run() {
                    process();
                }
            });
            thread.setUncaughtExceptionHandler(handler);
            thread.start();
            running = true;
        }
    
        public void stop() {
            if (!running) {
                return;
            }
            running = false;
            if (thread != null) {
                try {
                    thread.join();
                } catch (InterruptedException e) {
                    // ignore
                }
            }
        }
    
        private void process() {
            while (!running) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                }
            }
    
            while (running) {
                try {
                    connector.connect();
                    connector.subscribe();
                    while (running) {
                        try {
                            List<Message> messages = connector.getListWithoutAck(100L, TimeUnit.MILLISECONDS); // 获取message
                            if (messages == null) {
                                continue;
                            }
                            for (Message message : messages) {
                                long batchId = message.getId();
                                int size = message.getEntries().size();
                                if (batchId == -1 || size == 0) {
                                    // try {
                                    // Thread.sleep(1000);
                                    // } catch (InterruptedException e) {
                                    // }
                                } else {
                                    // printSummary(message, batchId, size);
                                    // printEntry(message.getEntries());
                                    logger.info(message.toString());
                                }
                            }
    
                            connector.ack(); // 提交确认
                        } catch (Exception e) {
                            logger.error(e.getMessage(), e);
                        }
                    }
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }
    
            try {
                connector.unsubscribe();
            } catch (WakeupException e) {
                // No-op. Continue process
            }
            connector.disconnect();
        }
    }
    

    相关文章

      网友评论

        本文标题:阿里开源Canal--④投递数据到Kafka

        本文链接:https://www.haomeiwen.com/subject/jmgkeqtx.html