概要
当今互联网越来越发达,对于各企业来说,服务的容灾越来越重要,系统一段时间的不可用,都有可能造成企业极大的损失和影响。特别是一些跟钱相关的业务场景,一段时间的不可用,都有可能对用户产生严重的信任问题。像银行这种的业务,国家甚至还规定了全年系统不可用的时间阈值。
传统的同城双机房,同城服务提供服务,数据写入到某个机房(数据库单点写);到两地三中心,其中一个异地中心做冷备(这种情况下当要发生切换时,由于冷备一直没提供服务,导致流量不敢切换)。到最后演变到现在的异地多活(每个中心都读写)。
可以看出来,容灾方案的发展对基础设施的依赖越来越严重。没有完善的基础设施,想做异地多活,有可能造成各种各样的问题,到最后只会疲于修复问题。
本文不针对异地多活中的某个实际产品来展开,只是取出某些技术细节来阐述。最后把谢串联起来,再分析我们异地多活中的各种产品。
首先我们想一下,要做异地多活,必然涉及到数据的两边写,那么涉及到数据的两边写,最终必然遇到的一个问题就是数据多中心的融合。这也是做异地多活最重要、最复杂的一个环节,因为上层业务的不可用,总有办法做修复,但是数据的不可用(数据错乱等)有可能就会造成业务的混乱,这也许就不是那么容易恢复了。
在做关于底层数据的产品前,我们需要首先明确自己的产品目标。目标是要做一款数据迁移产品,还是想做一款数据订阅变更的产品,甚至是一款数据同步的产品。首先确定好自己的产品形态,就数据迁移和数据订阅,实际上他们实现的思路几乎完全不一样。虽然最终他们可以融合成类似于阿里云的DTS这类的产品,但是在内部实现上,应该首先把他们隔离。否则在产品实施过程中,会顾此失彼,导致两边都做不好。
既然我们要做类似数据订阅、数据同步的产品(在这暂不考虑数据迁移),首先就涉及到需要把变更的数据获取出来。因此就从这开始,由下往上慢慢推出异地多活的整体形态~
获取数据变更的方式,我们知道mysql有binlog的机制,实际上变更的数据都在此。因此我们有2中方案可以获取到数据的变更操作:
- 监听binlog文件的变更
- 模拟slave来获取数据的变更
第一种方案监听binlog文件的变更,需要与mysql服务部署在同样的机器中,强耦合在一起,后续扩展能力也比较弱。而第二种方案,模拟slave,实际上只需要网络的打通,无须与mysql耦合,并且现在已经有足够多的开源产品实现此功能:
- LinkedIn 开源的 databus
- 阿里开源的 Canal
- 大众点评的 Puma
- mysql-binlog-connector-java
本文先从应用比较广泛的 Canal 了解一下其源码以及实现。
架构

以上canal在github上的架构图,包含了一个canal中的各个模块:
- eventParser
- eventSink
- eventStore
- metaManager
深入了解 Canal 订阅机制
1. Instance的启动
每次我们启动 Canal,实际就是启动一个 Instance 对象。
每次启动,调用AbstractCanalInstance的start方法,AbstractCanalInstance是各 Instance 的基类,其实现有CanalInstanceWithSpring、CanalInstanceWithManager。
@Override
public void start() {
super.start();
if (!metaManager.isStart()) {
metaManager.start();
}
if (!alarmHandler.isStart()) {
alarmHandler.start();
}
if (!eventStore.isStart()) {
eventStore.start();
}
if (!eventSink.isStart()) {
eventSink.start();
}
if (!eventParser.isStart()) {
beforeStartEventParser(eventParser);
eventParser.start();
afterStartEventParser(eventParser);
}
logger.info("start successful....");
}
可以从以上代码看出,instance对象持有eventParse、eventSink、eventStore、metaManager并掌握着其生命周期。它是我们整个 Canal 的基础。
2. EventParser的启动
EventParser是canal与mysql打交道的进出口,它由 Instance 对象来持有和启动。
AbstractEventParser是基类,实现有MysqlEventParser(模拟slave)和LocalBinLogEventParser(本地binlog读取)
实际上启动EventParser时,做了以下几个步骤:
- 初始化一个ringBuffer的缓冲队列(用于存放binlog event数据)
- 初始化过滤规则eventFilter,实际为binlogParser对象(用于过滤数据)
- 初始化连接、校验数据库是否符合模拟slave拉取规则
- 启动心跳线程(可用于检测canal是否假死状态)
- 获取最后消费的binlog(若没有则从show master status开始)
- 初始化 sinkHandler 用处处理binlog数据(拉取的event数据并作库表的黑白名单过滤后放入ringBuffer中,由ringbuffer交给eventSink)
- 开始dump数据(将数据交给sinkHandler)
当 Instance 启动时,调用 eventParser 的 start 方法
public void start() {
// 初始化操作
......
// 配置transaction buffer
// 初始化缓冲队列
transactionBuffer.setBufferSize(transactionSize);// 设置buffer大小
transactionBuffer.start();
// 构造bin log parser : 该方法中构造配置文件中的内容,如过滤规则等
binlogParser = buildParser();
binlogParser.start();
// 启动工作线程
parseThread = new Thread(new Runnable() {
public void run() {
// 初始化变量
......
while (running) { // 循环重试创建dump任务
try {
// 开始执行replication
// 1. 构造Erosa连接:通过配置文件构造mysql连接,底层通过netty去连接
erosaConnection = buildErosaConnection();
// 2. 启动一个心跳线程(若配置文件中设置detectingSQL则会使用该sql检测)
// 用于在即使没有binlog数据时,仍然与mysql保持随时连通状态(从而可以通过监控该连通状态判断canal节点是否假死)
// 若配置文件没有设置detect,则默认没有心跳检测,系统会伪造一个心跳包扭转给sink模块
startHeartBeat(erosaConnection);
// 3. 执行dump前的准备工作:校验是否支持的binlog-format等
preDump(erosaConnection);
erosaConnection.connect();// 链接
// 4. 获取最后的位置信息
// 1) 先从自身获取(内存、磁盘、zk等) 2)配置文件中获取 3)连接远程show master status获取 4)若没有配置journalName,则根据配置timestamp获取(连接找出所有binlog,根据时间来找到对应位置) 5)若配置journaName,优先匹配journalName + position,没有position在匹配journamName + timestamp
EntryPosition position = findStartPosition(erosaConnection);
final EntryPosition startPosition = position;
if (startPosition == null) {
throw new CanalParseException("can't find start position for " + destination);
}
logger.info("find start position : {}", startPosition.toString());
// 重新链接,因为在找position过程中可能有状态,需要断开后重建
erosaConnection.reconnect();
final SinkFunction sinkHandler = new SinkFunction<EVENT>() {
// 定制处理类
......
};
// 4. 开始dump数据,交给connection循环dump数据
if (StringUtils.isEmpty(startPosition.getJournalName()) && startPosition.getTimestamp() != null) {
erosaConnection.dump(startPosition.getTimestamp(), sinkHandler);
} else {
erosaConnection.dump(startPosition.getJournalName(),
startPosition.getPosition(),
sinkHandler);
}
} catch (Exception e) {
......
} finally {
......
}
// 出异常了,退出sink消费,释放一下状态:由于进入dump的循环,只有dump出现异常才会走到此
eventSink.interrupt();
transactionBuffer.reset();// 重置一下缓冲队列,重新记录数据
binlogParser.reset();// 重新置位
if (running) {
// sleep一段时间再进行重试
try {
Thread.sleep(10000 + RandomUtils.nextInt(10000));
} catch (InterruptedException e) {
}
}
}
}
});
......
}
注意:
- eventParser拉取数据后,将数据存放于一个ringBuffer中,而非直接交给eventSink。在交给ringBuffer时,flush方法会回调将event交给eventSink
- 当配置文件中没有设置detectSQL的语句,心跳是一个伪造的心跳包,而非真是心跳包
以下是启动一个心跳线程,定时将其提交到sink中(consumeTheEventAndProfilingIfNecessary方法)
protected void startHeartBeat(ErosaConnection connection) {
...... // 初始化timer
if (heartBeatTimerTask == null) {
heartBeatTimerTask = buildHeartBeatTimeTask(connection);
Integer interval = detectingIntervalInSeconds;
timer.schedule(heartBeatTimerTask, interval * 1000L, interval * 1000L);
}
}
protected TimerTask buildHeartBeatTimeTask(ErosaConnection connection) {
return new TimerTask() {
public void run() {
try {
if (exception == null || lastEntryTime > 0) {
// 如果未出现异常,或者有第一条正常数据
long now = System.currentTimeMillis();
long inteval = (now - lastEntryTime) / 1000;
if (inteval >= detectingIntervalInSeconds) {
...... // 伪造一个心跳包entry
Entry entry = entryBuilder.build();
// 提交到sink中,目前不会提交到store中,会在sink中进行忽略
consumeTheEventAndProfilingIfNecessary(Arrays.asList(entry));
}
}
} catch (Throwable e) {
logger.warn("heartBeat run failed " + ExceptionUtils.getStackTrace(e));
}
}
};
}
以下是eventBuffer添加一个event时候的回调方法,同样当EventParser start后,接受到数据以后,会把数据交给sinkHandler,sinkHandler拿到数据后加入到eventBuffer中,并回调以下方法
public AbstractEventParser(){
// 初始化一下
transactionBuffer = new EventTransactionBuffer(new TransactionFlushCallback() {
public void flush(List<CanalEntry.Entry> transaction) throws InterruptedException {
boolean successed = consumeTheEventAndProfilingIfNecessary(transaction);
......
}
});
}
可以从上面看出来,心跳包和binlog的数据包,均通过consumeTheEventAndProfilingIfNecessary这个方法来将数据交给EventSink,至此EventParser的任务基本是完成。

至此,数据从mysql binglog获取的整体流程是完成了。后续将会介绍关于数据过滤(eventSink)、数据存储(eventStore)、位置点管理(metaManager)的模块。
网友评论