canal的模块canal.meta 增量订阅&消费信息管理器 说法很官方口吻,我们来看一下 meta模块是怎么做到的。
首先看一下代码组成部分
包含测试类在内都是非常少的。
上一篇canal源码解析之canal.deployer看到instance实例启动的时候第一个启动的就是 CanalMetaManager
protected CanalMetaManager metaManager;
@Override
public void start() {
super.start();
if (!metaManager.isStart()) {
metaManager.start();
}
…………………………
}
所以先看CanalMetaManager这个类
CanalMetaManager类图这是个接口,实现了接口CanalLifeCycle接口,CanalLifeCycle是生命管理的接口定义,非常简单的三个接口定义
public interface CanalLifeCycle {
void start();
void stop();
boolean isStart();
}
CanalMetaManager接口定义了几个接口方法,比较重要的入口方法是
/**
* 增加一个 client订阅 <br/>
* 如果 client已经存在,则不做任何修改
*/
void subscribe(ClientIdentity clientIdentity) throws CanalMetaManagerException;
所有客户端连接到canal的时候都会调用这个方法(这个在另一个模块,后续再说)看下实现类
CanalMetaManager接口的实现类还是比较简单。
主要说一下 canal server为每个连接到canal server client实例分配一个 CanalMetaMange,用于记录 客户端上次同步到事件的位置,连接的哪个instance等。主要记录方式无外乎内存,文件,zookeeper,和混合模式,所以就有了这几个实现类
MemoryMetaManager是记录到内存
FileMixedMetaManager是内存和文件的混合模式,策略是: 1. 先写内存,然后定时刷新数据到File 2. 数据采取overwrite模式(只保留最后一次),通过logger实施append模式(记录历史版本)
ZooKeeperMetaManager 基于zk树形节点的记录方式
zk 版本的 canal manager, 存储结构:
/otter
canal
destinations
dest1
client1
filter
batch_mark
1
2
3
MixedMetaManager 是组合memory + zookeeper的使用模式
PeriodMixedMetaManager 是基于定时刷新的策略的mixed实现,做了几个优化2个优化:
- 去除batch数据刷新到zk中,切换时batch数据可忽略,重新从头开始获取
- cursor的更新,启用定时刷新,合并多次请求。如果最近没有变化则不更新
然后我们来看下每种方式是怎么做到的。具体debug 对应的test类即可
两个抽象的测试类
AbstractZkTest 和AbstractMetaManagerTest,AbstractMetaManagerTest继承AbstractZkTest
AbstractZkTest 定义了destion和zk的cluster
AbstractMetaManagerTest 定义了mysql的地址和抽象了几个测试类都会用到
doSubscribeTest、doBatchTest和doCursorTest
先看内存版实现的测试类
MemoryMetaManagerTest
我们可以看出内存版的实现 其实就是存贮到了对应的map中
protected Map<String, List<ClientIdentity>> destinations;
protected Map<ClientIdentity, MemoryClientIdentityBatch> batches;
protected Map<ClientIdentity, Position> cursors;
MemoryMetaManager的start方法 初始化了 对应的map
public void start() {
super.start();
batches = MigrateMap.makeComputingMap(new Function<ClientIdentity, MemoryClientIdentityBatch>() {
public MemoryClientIdentityBatch apply(ClientIdentity clientIdentity) {
return MemoryClientIdentityBatch.create(clientIdentity);
}
});
cursors = new MapMaker().makeMap();
destinations = MigrateMap.makeComputingMap(new Function<String, List<ClientIdentity>>() {
public List<ClientIdentity> apply(String destination) {
return Lists.newArrayList();
}
});
}
然后模拟了 客户端client的subscribe
public void doSubscribeTest(CanalMetaManager metaManager) {
ClientIdentity client1 = new ClientIdentity(destination, (short) 1);
metaManager.subscribe(client1);
metaManager.subscribe(client1); // 重复调用
ClientIdentity client2 = new ClientIdentity(destination, (short) 2);
metaManager.subscribe(client2);
List<ClientIdentity> clients = metaManager.listAllSubscribeInfo(destination);
Assert.assertEquals(Arrays.asList(client1, client2), clients);
metaManager.unsubscribe(client2);
ClientIdentity client3 = new ClientIdentity(destination, (short) 3);
metaManager.subscribe(client3);
clients = metaManager.listAllSubscribeInfo(destination);
Assert.assertEquals(Arrays.asList(client1, client3), clients);
}
具体是测试多次subscribe不会出现重复的client记录到内存,还有就是 unscrbe的时候是否从内存移除掉,比较简单。
testBatchAll是为了测试为 client 产生一个唯一、递增的id是否会重复之类的问题。
testCursorAll是为了更新 cursor 游标等
其他的类比较类似,主要是存贮的方式不一样,再主要看个文件的和zk的
文件版的subscribe
public void subscribe(final ClientIdentity clientIdentity) throws CanalMetaManagerException {
super.subscribe(clientIdentity);
// 订阅信息频率发生比较低,不需要做定时merge处理
executor.submit(new Runnable() {
public void run() {
flushDataToFile(clientIdentity.getDestination());
}
});
}
看到调用了flushDataToFile方法
private void flushDataToFile(String destination, File dataFile) {
FileMetaInstanceData data = new FileMetaInstanceData();
if (destinations.containsKey(destination)) {
synchronized (destination.intern()) { // 基于destination控制一下并发更新
data.setDestination(destination);
List<FileMetaClientIdentityData> clientDatas = Lists.newArrayList();
List<ClientIdentity> clientIdentitys = destinations.get(destination);
for (ClientIdentity clientIdentity : clientIdentitys) {
FileMetaClientIdentityData clientData = new FileMetaClientIdentityData();
clientData.setClientIdentity(clientIdentity);
Position position = cursors.get(clientIdentity);
if (position != null && position != nullCursor) {
clientData.setCursor((LogPosition) position);
}
clientDatas.add(clientData);
}
data.setClientDatas(clientDatas);
}
String json = JsonUtils.marshalToString(data);
try {
FileUtils.writeStringToFile(dataFile, json);
} catch (IOException e) {
throw new CanalMetaManagerException(e);
}
}
}
看到最后是写到了文件 FileUtils.writeStringToFile(dataFile, json);
看一个比较完整的meta.dat的文件格式
{
"clientDatas": [
{
"clientIdentity": {
"clientId": 1001,
"destination": "example",
"filter": ".*\\..*"
},
"cursor": {
"identity": {
"slaveId": -1,
"sourceAddress": {
"address": "192.168.1.4",
"port": 3306
}
},
"postion": {
"included": false,
"journalName": "mysql-bin.000002",
"position": 10670,
"serverId": 1,
"timestamp": 1481773274000
}
}
}
],
"destination": "example"
}
可以看到其实就是序列化的描述整个canal instance对应数据对象。
在看下zk版本的实现
public void subscribe(ClientIdentity clientIdentity) throws CanalMetaManagerException {
String path = ZookeeperPathUtils.getClientIdNodePath(clientIdentity.getDestination(),
clientIdentity.getClientId());
try {
zkClientx.createPersistent(path, true);
} catch (ZkNodeExistsException e) {
// ignore
}
if (clientIdentity.hasFilter()) {
String filterPath = ZookeeperPathUtils.getFilterPath(clientIdentity.getDestination(),
clientIdentity.getClientId());
byte[] bytes = null;
try {
bytes = clientIdentity.getFilter().getBytes(ENCODE);
} catch (UnsupportedEncodingException e) {
throw new CanalMetaManagerException(e);
}
try {
zkClientx.createPersistent(filterPath, bytes);
} catch (ZkNodeExistsException e) {
// ignore
zkClientx.writeData(filterPath, bytes);
}
}
}
也是非常简单调用了zk客户端代码进行了树形目录的创建和数据写入
zk树形目录创建注意这里创建的树的类型。
写入数据到zk这是调用zkclient代码写入数据。我这里安装了zkui 所以可以看一下数据
zkui web形式查看写入的节点数据这里会有人经常遇到这样的问题就是
canal配置canal.instance.filter.regex无效,这里说一下这个问题
其实可以查看对应的存贮文件meta.dat或者zk上的节点数据
有{"clientIdentity":{"clientId":1001,"destination":"example","filter":".\.."}
所以当你只关心部分库表更新时,设置了canal.instance.filter.regex,一定不要在客户端调用CanalConnector.subscribe(".\.."),不然等于没设置canal.instance.filter.regex。
如果一定要调用CanalConnector.subscribe(".\.."),那么可以设置instance.properties的canal.instance.filter.black.regex参数添加黑名单,过滤非关注库表。
总结,其实这个涉及到了设计模式,策略,根据你的配置,是否是基于zk等选择存贮方式,实现对client的管理。
网友评论