美文网首页技术文
异地多活的基础设施构建-数据订阅canal深入(一)

异地多活的基础设施构建-数据订阅canal深入(一)

作者: 酱君挺怎样 | 来源:发表于2018-08-11 16:48 被阅读12次

    概要

    当今互联网越来越发达,对于各企业来说,服务的容灾越来越重要,系统一段时间的不可用,都有可能造成企业极大的损失和影响。特别是一些跟钱相关的业务场景,一段时间的不可用,都有可能对用户产生严重的信任问题。像银行这种的业务,国家甚至还规定了全年系统不可用的时间阈值。

    传统的同城双机房,同城服务提供服务,数据写入到某个机房(数据库单点写);到两地三中心,其中一个异地中心做冷备(这种情况下当要发生切换时,由于冷备一直没提供服务,导致流量不敢切换)。到最后演变到现在的异地多活(每个中心都读写)。

    可以看出来,容灾方案的发展对基础设施的依赖越来越严重。没有完善的基础设施,想做异地多活,有可能造成各种各样的问题,到最后只会疲于修复问题。

    本文不针对异地多活中的某个实际产品来展开,只是取出某些技术细节来阐述。最后把谢串联起来,再分析我们异地多活中的各种产品。

    首先我们想一下,要做异地多活,必然涉及到数据的两边写,那么涉及到数据的两边写,最终必然遇到的一个问题就是数据多中心的融合。这也是做异地多活最重要、最复杂的一个环节,因为上层业务的不可用,总有办法做修复,但是数据的不可用(数据错乱等)有可能就会造成业务的混乱,这也许就不是那么容易恢复了。

    在做关于底层数据的产品前,我们需要首先明确自己的产品目标。目标是要做一款数据迁移产品,还是想做一款数据订阅变更的产品,甚至是一款数据同步的产品。首先确定好自己的产品形态,就数据迁移和数据订阅,实际上他们实现的思路几乎完全不一样。虽然最终他们可以融合成类似于阿里云的DTS这类的产品,但是在内部实现上,应该首先把他们隔离。否则在产品实施过程中,会顾此失彼,导致两边都做不好。

    既然我们要做类似数据订阅、数据同步的产品(在这暂不考虑数据迁移),首先就涉及到需要把变更的数据获取出来。因此就从这开始,由下往上慢慢推出异地多活的整体形态~

    获取数据变更的方式,我们知道mysql有binlog的机制,实际上变更的数据都在此。因此我们有2中方案可以获取到数据的变更操作:

    1. 监听binlog文件的变更
    2. 模拟slave来获取数据的变更

    第一种方案监听binlog文件的变更,需要与mysql服务部署在同样的机器中,强耦合在一起,后续扩展能力也比较弱。而第二种方案,模拟slave,实际上只需要网络的打通,无须与mysql耦合,并且现在已经有足够多的开源产品实现此功能:

    1. LinkedIn 开源的 databus
    2. 阿里开源的 Canal
    3. 大众点评的 Puma
    4. mysql-binlog-connector-java

    本文先从应用比较广泛的 Canal 了解一下其源码以及实现。

    架构

    canal架构图.png

    以上canal在github上的架构图,包含了一个canal中的各个模块:

    • eventParser
    • eventSink
    • eventStore
    • metaManager

    深入了解 Canal 订阅机制

    1. Instance的启动

    每次我们启动 Canal,实际就是启动一个 Instance 对象。

    每次启动,调用AbstractCanalInstance的start方法,AbstractCanalInstance是各 Instance 的基类,其实现有CanalInstanceWithSpring、CanalInstanceWithManager。

    @Override
    public void start() {
        super.start();
        if (!metaManager.isStart()) {
            metaManager.start();
        }
    
        if (!alarmHandler.isStart()) {
            alarmHandler.start();
        }
    
        if (!eventStore.isStart()) {
            eventStore.start();
        }
    
        if (!eventSink.isStart()) {
            eventSink.start();
        }
    
        if (!eventParser.isStart()) {
            beforeStartEventParser(eventParser);
            eventParser.start();
            afterStartEventParser(eventParser);
        }
        logger.info("start successful....");
    }
    

    可以从以上代码看出,instance对象持有eventParse、eventSink、eventStore、metaManager并掌握着其生命周期。它是我们整个 Canal 的基础。

    2. EventParser的启动

    EventParser是canal与mysql打交道的进出口,它由 Instance 对象来持有和启动。

    AbstractEventParser是基类,实现有MysqlEventParser(模拟slave)和LocalBinLogEventParser(本地binlog读取)

    实际上启动EventParser时,做了以下几个步骤:

    1. 初始化一个ringBuffer的缓冲队列(用于存放binlog event数据)
    2. 初始化过滤规则eventFilter,实际为binlogParser对象(用于过滤数据)
    3. 初始化连接、校验数据库是否符合模拟slave拉取规则
    4. 启动心跳线程(可用于检测canal是否假死状态)
    5. 获取最后消费的binlog(若没有则从show master status开始)
    6. 初始化 sinkHandler 用处处理binlog数据(拉取的event数据并作库表的黑白名单过滤后放入ringBuffer中,由ringbuffer交给eventSink)
    7. 开始dump数据(将数据交给sinkHandler)

    当 Instance 启动时,调用 eventParser 的 start 方法

    public void start() {
        // 初始化操作
        ......
    
        // 配置transaction buffer
        // 初始化缓冲队列
        transactionBuffer.setBufferSize(transactionSize);// 设置buffer大小
        transactionBuffer.start();
    
        // 构造bin log parser : 该方法中构造配置文件中的内容,如过滤规则等
        binlogParser = buildParser();
        binlogParser.start();
    
    
        // 启动工作线程
        parseThread = new Thread(new Runnable() {
    
            public void run() {
    
                // 初始化变量
                ......
    
                while (running) { // 循环重试创建dump任务
                    try {
    
                        // 开始执行replication
                        // 1. 构造Erosa连接:通过配置文件构造mysql连接,底层通过netty去连接
                        erosaConnection = buildErosaConnection();
    
                        // 2. 启动一个心跳线程(若配置文件中设置detectingSQL则会使用该sql检测)
                        // 用于在即使没有binlog数据时,仍然与mysql保持随时连通状态(从而可以通过监控该连通状态判断canal节点是否假死)
                        // 若配置文件没有设置detect,则默认没有心跳检测,系统会伪造一个心跳包扭转给sink模块
                        startHeartBeat(erosaConnection);
    
                        // 3. 执行dump前的准备工作:校验是否支持的binlog-format等
                        preDump(erosaConnection);
    
                        erosaConnection.connect();// 链接
    
    
                        // 4. 获取最后的位置信息
                        // 1) 先从自身获取(内存、磁盘、zk等) 2)配置文件中获取 3)连接远程show master status获取 4)若没有配置journalName,则根据配置timestamp获取(连接找出所有binlog,根据时间来找到对应位置) 5)若配置journaName,优先匹配journalName + position,没有position在匹配journamName + timestamp
                        EntryPosition position = findStartPosition(erosaConnection);
                        final EntryPosition startPosition = position;
                        if (startPosition == null) {
                            throw new CanalParseException("can't find start position for " + destination);
                        }
                        logger.info("find start position : {}", startPosition.toString());
    
    
    
                        // 重新链接,因为在找position过程中可能有状态,需要断开后重建
                        erosaConnection.reconnect();
    
                        final SinkFunction sinkHandler = new SinkFunction<EVENT>() {
                            // 定制处理类
                            ......
                        };
    
                        // 4. 开始dump数据,交给connection循环dump数据
                        if (StringUtils.isEmpty(startPosition.getJournalName()) && startPosition.getTimestamp() != null) {
                            erosaConnection.dump(startPosition.getTimestamp(), sinkHandler);
                        } else {
                            erosaConnection.dump(startPosition.getJournalName(),
                                startPosition.getPosition(),
                                sinkHandler);
                        }
    
                    } catch (Exception e) {
                      ......
                    } finally {
                      ......
                    }
    
                    // 出异常了,退出sink消费,释放一下状态:由于进入dump的循环,只有dump出现异常才会走到此
                    eventSink.interrupt();
                    transactionBuffer.reset();// 重置一下缓冲队列,重新记录数据
                    binlogParser.reset();// 重新置位
    
                    if (running) {
                        // sleep一段时间再进行重试
                        try {
                            Thread.sleep(10000 + RandomUtils.nextInt(10000));
                        } catch (InterruptedException e) {
                        }
                    }
                }
            }
        });
    
        ......
    }
    

    注意:

    1. eventParser拉取数据后,将数据存放于一个ringBuffer中,而非直接交给eventSink。在交给ringBuffer时,flush方法会回调将event交给eventSink
    2. 当配置文件中没有设置detectSQL的语句,心跳是一个伪造的心跳包,而非真是心跳包

    以下是启动一个心跳线程,定时将其提交到sink中(consumeTheEventAndProfilingIfNecessary方法)

    protected void startHeartBeat(ErosaConnection connection) {
        ...... // 初始化timer
        if (heartBeatTimerTask == null) {
            heartBeatTimerTask = buildHeartBeatTimeTask(connection);
            Integer interval = detectingIntervalInSeconds;
            timer.schedule(heartBeatTimerTask, interval * 1000L, interval * 1000L);
        }
    }
    
    protected TimerTask buildHeartBeatTimeTask(ErosaConnection connection) {
        return new TimerTask() {
    
            public void run() {
                try {
                    if (exception == null || lastEntryTime > 0) {
                        // 如果未出现异常,或者有第一条正常数据
                        long now = System.currentTimeMillis();
                        long inteval = (now - lastEntryTime) / 1000;
                        if (inteval >= detectingIntervalInSeconds) {
                            ...... // 伪造一个心跳包entry
                            Entry entry = entryBuilder.build();
    
                            // 提交到sink中,目前不会提交到store中,会在sink中进行忽略
                            consumeTheEventAndProfilingIfNecessary(Arrays.asList(entry));
                        }
                    }
                } catch (Throwable e) {
                    logger.warn("heartBeat run failed " + ExceptionUtils.getStackTrace(e));
                }
            }
    
        };
    }
    

    以下是eventBuffer添加一个event时候的回调方法,同样当EventParser start后,接受到数据以后,会把数据交给sinkHandler,sinkHandler拿到数据后加入到eventBuffer中,并回调以下方法

    public AbstractEventParser(){
        // 初始化一下
        transactionBuffer = new EventTransactionBuffer(new TransactionFlushCallback() {
    
            public void flush(List<CanalEntry.Entry> transaction) throws InterruptedException {
                boolean successed = consumeTheEventAndProfilingIfNecessary(transaction);
    
                ......
            }
        });
    }
    

    可以从上面看出来,心跳包和binlog的数据包,均通过consumeTheEventAndProfilingIfNecessary这个方法来将数据交给EventSink,至此EventParser的任务基本是完成。


    EventParser模块.png

    至此,数据从mysql binglog获取的整体流程是完成了。后续将会介绍关于数据过滤(eventSink)、数据存储(eventStore)、位置点管理(metaManager)的模块。

    相关文章

      网友评论

        本文标题:异地多活的基础设施构建-数据订阅canal深入(一)

        本文链接:https://www.haomeiwen.com/subject/pvmdbftx.html