美文网首页技术文
异地多活的基础设施构建-数据订阅canal深入(一)

异地多活的基础设施构建-数据订阅canal深入(一)

作者: 酱君挺怎样 | 来源:发表于2018-08-11 16:48 被阅读12次

概要

当今互联网越来越发达,对于各企业来说,服务的容灾越来越重要,系统一段时间的不可用,都有可能造成企业极大的损失和影响。特别是一些跟钱相关的业务场景,一段时间的不可用,都有可能对用户产生严重的信任问题。像银行这种的业务,国家甚至还规定了全年系统不可用的时间阈值。

传统的同城双机房,同城服务提供服务,数据写入到某个机房(数据库单点写);到两地三中心,其中一个异地中心做冷备(这种情况下当要发生切换时,由于冷备一直没提供服务,导致流量不敢切换)。到最后演变到现在的异地多活(每个中心都读写)。

可以看出来,容灾方案的发展对基础设施的依赖越来越严重。没有完善的基础设施,想做异地多活,有可能造成各种各样的问题,到最后只会疲于修复问题。

本文不针对异地多活中的某个实际产品来展开,只是取出某些技术细节来阐述。最后把谢串联起来,再分析我们异地多活中的各种产品。

首先我们想一下,要做异地多活,必然涉及到数据的两边写,那么涉及到数据的两边写,最终必然遇到的一个问题就是数据多中心的融合。这也是做异地多活最重要、最复杂的一个环节,因为上层业务的不可用,总有办法做修复,但是数据的不可用(数据错乱等)有可能就会造成业务的混乱,这也许就不是那么容易恢复了。

在做关于底层数据的产品前,我们需要首先明确自己的产品目标。目标是要做一款数据迁移产品,还是想做一款数据订阅变更的产品,甚至是一款数据同步的产品。首先确定好自己的产品形态,就数据迁移和数据订阅,实际上他们实现的思路几乎完全不一样。虽然最终他们可以融合成类似于阿里云的DTS这类的产品,但是在内部实现上,应该首先把他们隔离。否则在产品实施过程中,会顾此失彼,导致两边都做不好。

既然我们要做类似数据订阅、数据同步的产品(在这暂不考虑数据迁移),首先就涉及到需要把变更的数据获取出来。因此就从这开始,由下往上慢慢推出异地多活的整体形态~

获取数据变更的方式,我们知道mysql有binlog的机制,实际上变更的数据都在此。因此我们有2中方案可以获取到数据的变更操作:

  1. 监听binlog文件的变更
  2. 模拟slave来获取数据的变更

第一种方案监听binlog文件的变更,需要与mysql服务部署在同样的机器中,强耦合在一起,后续扩展能力也比较弱。而第二种方案,模拟slave,实际上只需要网络的打通,无须与mysql耦合,并且现在已经有足够多的开源产品实现此功能:

  1. LinkedIn 开源的 databus
  2. 阿里开源的 Canal
  3. 大众点评的 Puma
  4. mysql-binlog-connector-java

本文先从应用比较广泛的 Canal 了解一下其源码以及实现。

架构

canal架构图.png

以上canal在github上的架构图,包含了一个canal中的各个模块:

  • eventParser
  • eventSink
  • eventStore
  • metaManager

深入了解 Canal 订阅机制

1. Instance的启动

每次我们启动 Canal,实际就是启动一个 Instance 对象。

每次启动,调用AbstractCanalInstance的start方法,AbstractCanalInstance是各 Instance 的基类,其实现有CanalInstanceWithSpring、CanalInstanceWithManager。

@Override
public void start() {
    super.start();
    if (!metaManager.isStart()) {
        metaManager.start();
    }

    if (!alarmHandler.isStart()) {
        alarmHandler.start();
    }

    if (!eventStore.isStart()) {
        eventStore.start();
    }

    if (!eventSink.isStart()) {
        eventSink.start();
    }

    if (!eventParser.isStart()) {
        beforeStartEventParser(eventParser);
        eventParser.start();
        afterStartEventParser(eventParser);
    }
    logger.info("start successful....");
}

可以从以上代码看出,instance对象持有eventParse、eventSink、eventStore、metaManager并掌握着其生命周期。它是我们整个 Canal 的基础。

2. EventParser的启动

EventParser是canal与mysql打交道的进出口,它由 Instance 对象来持有和启动。

AbstractEventParser是基类,实现有MysqlEventParser(模拟slave)和LocalBinLogEventParser(本地binlog读取)

实际上启动EventParser时,做了以下几个步骤:

  1. 初始化一个ringBuffer的缓冲队列(用于存放binlog event数据)
  2. 初始化过滤规则eventFilter,实际为binlogParser对象(用于过滤数据)
  3. 初始化连接、校验数据库是否符合模拟slave拉取规则
  4. 启动心跳线程(可用于检测canal是否假死状态)
  5. 获取最后消费的binlog(若没有则从show master status开始)
  6. 初始化 sinkHandler 用处处理binlog数据(拉取的event数据并作库表的黑白名单过滤后放入ringBuffer中,由ringbuffer交给eventSink)
  7. 开始dump数据(将数据交给sinkHandler)

当 Instance 启动时,调用 eventParser 的 start 方法

public void start() {
    // 初始化操作
    ......

    // 配置transaction buffer
    // 初始化缓冲队列
    transactionBuffer.setBufferSize(transactionSize);// 设置buffer大小
    transactionBuffer.start();

    // 构造bin log parser : 该方法中构造配置文件中的内容,如过滤规则等
    binlogParser = buildParser();
    binlogParser.start();


    // 启动工作线程
    parseThread = new Thread(new Runnable() {

        public void run() {

            // 初始化变量
            ......

            while (running) { // 循环重试创建dump任务
                try {

                    // 开始执行replication
                    // 1. 构造Erosa连接:通过配置文件构造mysql连接,底层通过netty去连接
                    erosaConnection = buildErosaConnection();

                    // 2. 启动一个心跳线程(若配置文件中设置detectingSQL则会使用该sql检测)
                    // 用于在即使没有binlog数据时,仍然与mysql保持随时连通状态(从而可以通过监控该连通状态判断canal节点是否假死)
                    // 若配置文件没有设置detect,则默认没有心跳检测,系统会伪造一个心跳包扭转给sink模块
                    startHeartBeat(erosaConnection);

                    // 3. 执行dump前的准备工作:校验是否支持的binlog-format等
                    preDump(erosaConnection);

                    erosaConnection.connect();// 链接


                    // 4. 获取最后的位置信息
                    // 1) 先从自身获取(内存、磁盘、zk等) 2)配置文件中获取 3)连接远程show master status获取 4)若没有配置journalName,则根据配置timestamp获取(连接找出所有binlog,根据时间来找到对应位置) 5)若配置journaName,优先匹配journalName + position,没有position在匹配journamName + timestamp
                    EntryPosition position = findStartPosition(erosaConnection);
                    final EntryPosition startPosition = position;
                    if (startPosition == null) {
                        throw new CanalParseException("can't find start position for " + destination);
                    }
                    logger.info("find start position : {}", startPosition.toString());



                    // 重新链接,因为在找position过程中可能有状态,需要断开后重建
                    erosaConnection.reconnect();

                    final SinkFunction sinkHandler = new SinkFunction<EVENT>() {
                        // 定制处理类
                        ......
                    };

                    // 4. 开始dump数据,交给connection循环dump数据
                    if (StringUtils.isEmpty(startPosition.getJournalName()) && startPosition.getTimestamp() != null) {
                        erosaConnection.dump(startPosition.getTimestamp(), sinkHandler);
                    } else {
                        erosaConnection.dump(startPosition.getJournalName(),
                            startPosition.getPosition(),
                            sinkHandler);
                    }

                } catch (Exception e) {
                  ......
                } finally {
                  ......
                }

                // 出异常了,退出sink消费,释放一下状态:由于进入dump的循环,只有dump出现异常才会走到此
                eventSink.interrupt();
                transactionBuffer.reset();// 重置一下缓冲队列,重新记录数据
                binlogParser.reset();// 重新置位

                if (running) {
                    // sleep一段时间再进行重试
                    try {
                        Thread.sleep(10000 + RandomUtils.nextInt(10000));
                    } catch (InterruptedException e) {
                    }
                }
            }
        }
    });

    ......
}

注意:

  1. eventParser拉取数据后,将数据存放于一个ringBuffer中,而非直接交给eventSink。在交给ringBuffer时,flush方法会回调将event交给eventSink
  2. 当配置文件中没有设置detectSQL的语句,心跳是一个伪造的心跳包,而非真是心跳包

以下是启动一个心跳线程,定时将其提交到sink中(consumeTheEventAndProfilingIfNecessary方法)

protected void startHeartBeat(ErosaConnection connection) {
    ...... // 初始化timer
    if (heartBeatTimerTask == null) {
        heartBeatTimerTask = buildHeartBeatTimeTask(connection);
        Integer interval = detectingIntervalInSeconds;
        timer.schedule(heartBeatTimerTask, interval * 1000L, interval * 1000L);
    }
}

protected TimerTask buildHeartBeatTimeTask(ErosaConnection connection) {
    return new TimerTask() {

        public void run() {
            try {
                if (exception == null || lastEntryTime > 0) {
                    // 如果未出现异常,或者有第一条正常数据
                    long now = System.currentTimeMillis();
                    long inteval = (now - lastEntryTime) / 1000;
                    if (inteval >= detectingIntervalInSeconds) {
                        ...... // 伪造一个心跳包entry
                        Entry entry = entryBuilder.build();

                        // 提交到sink中,目前不会提交到store中,会在sink中进行忽略
                        consumeTheEventAndProfilingIfNecessary(Arrays.asList(entry));
                    }
                }
            } catch (Throwable e) {
                logger.warn("heartBeat run failed " + ExceptionUtils.getStackTrace(e));
            }
        }

    };
}

以下是eventBuffer添加一个event时候的回调方法,同样当EventParser start后,接受到数据以后,会把数据交给sinkHandler,sinkHandler拿到数据后加入到eventBuffer中,并回调以下方法

public AbstractEventParser(){
    // 初始化一下
    transactionBuffer = new EventTransactionBuffer(new TransactionFlushCallback() {

        public void flush(List<CanalEntry.Entry> transaction) throws InterruptedException {
            boolean successed = consumeTheEventAndProfilingIfNecessary(transaction);

            ......
        }
    });
}

可以从上面看出来,心跳包和binlog的数据包,均通过consumeTheEventAndProfilingIfNecessary这个方法来将数据交给EventSink,至此EventParser的任务基本是完成。


EventParser模块.png

至此,数据从mysql binglog获取的整体流程是完成了。后续将会介绍关于数据过滤(eventSink)、数据存储(eventStore)、位置点管理(metaManager)的模块。

相关文章

网友评论

    本文标题:异地多活的基础设施构建-数据订阅canal深入(一)

    本文链接:https://www.haomeiwen.com/subject/pvmdbftx.html