canal源码解析之canal.meta

作者: holly_wang_王小飞 | 来源:发表于2016-12-17 14:45 被阅读0次

    canal的模块canal.meta 增量订阅&消费信息管理器 说法很官方口吻,我们来看一下 meta模块是怎么做到的。
    首先看一下代码组成部分

    canal.meta源码部分
    包含测试类在内都是非常少的。
    上一篇canal源码解析之canal.deployer看到instance实例启动的时候第一个启动的就是 CanalMetaManager
    protected CanalMetaManager                       metaManager;
     @Override
        public void start() {
            super.start();
            if (!metaManager.isStart()) {
                metaManager.start();
            }
    …………………………
        }
    

    所以先看CanalMetaManager这个类

    CanalMetaManager类图

    这是个接口,实现了接口CanalLifeCycle接口,CanalLifeCycle是生命管理的接口定义,非常简单的三个接口定义

    public interface CanalLifeCycle {
    
        void start();
    
        void stop();
    
        boolean isStart();
    }
    

    CanalMetaManager接口定义了几个接口方法,比较重要的入口方法是

     /**
         * 增加一个 client订阅 <br/>
         * 如果 client已经存在,则不做任何修改
         */
        void subscribe(ClientIdentity clientIdentity) throws CanalMetaManagerException;
    
    

    所有客户端连接到canal的时候都会调用这个方法(这个在另一个模块,后续再说)看下实现类

    CanalMetaManager接口的实现类

    还是比较简单。
    主要说一下 canal server为每个连接到canal server client实例分配一个 CanalMetaMange,用于记录 客户端上次同步到事件的位置,连接的哪个instance等。主要记录方式无外乎内存,文件,zookeeper,和混合模式,所以就有了这几个实现类
    MemoryMetaManager是记录到内存
    FileMixedMetaManager是内存和文件的混合模式,策略是: 1. 先写内存,然后定时刷新数据到File 2. 数据采取overwrite模式(只保留最后一次),通过logger实施append模式(记录历史版本)
    ZooKeeperMetaManager 基于zk树形节点的记录方式

    
      zk 版本的 canal manager, 存储结构:
    
      /otter
         canal
           destinations
             dest1 
               client1
                 filter
                 batch_mark
                   1
                   2
                   3
    

    MixedMetaManager 是组合memory + zookeeper的使用模式
    PeriodMixedMetaManager 是基于定时刷新的策略的mixed实现,做了几个优化2个优化:

    1. 去除batch数据刷新到zk中,切换时batch数据可忽略,重新从头开始获取
    2. cursor的更新,启用定时刷新,合并多次请求。如果最近没有变化则不更新

    然后我们来看下每种方式是怎么做到的。具体debug 对应的test类即可
    两个抽象的测试类
    AbstractZkTest 和AbstractMetaManagerTest,AbstractMetaManagerTest继承AbstractZkTest
    AbstractZkTest 定义了destion和zk的cluster
    AbstractMetaManagerTest 定义了mysql的地址和抽象了几个测试类都会用到
    doSubscribeTest、doBatchTest和doCursorTest

    先看内存版实现的测试类
    MemoryMetaManagerTest
    我们可以看出内存版的实现 其实就是存贮到了对应的map中

     protected Map<String, List<ClientIdentity>>              destinations;
     protected Map<ClientIdentity, MemoryClientIdentityBatch> batches;
     protected Map<ClientIdentity, Position>                  cursors;
    

    MemoryMetaManager的start方法 初始化了 对应的map

    public void start() {
           super.start();
    
           batches = MigrateMap.makeComputingMap(new Function<ClientIdentity, MemoryClientIdentityBatch>() {
    
               public MemoryClientIdentityBatch apply(ClientIdentity clientIdentity) {
                   return MemoryClientIdentityBatch.create(clientIdentity);
               }
    
           });
    
           cursors = new MapMaker().makeMap();
    
           destinations = MigrateMap.makeComputingMap(new Function<String, List<ClientIdentity>>() {
    
               public List<ClientIdentity> apply(String destination) {
                   return Lists.newArrayList();
               }
           });
       }
    

    然后模拟了 客户端client的subscribe

    public void doSubscribeTest(CanalMetaManager metaManager) {
            ClientIdentity client1 = new ClientIdentity(destination, (short) 1);
            metaManager.subscribe(client1);
            metaManager.subscribe(client1); // 重复调用
            ClientIdentity client2 = new ClientIdentity(destination, (short) 2);
            metaManager.subscribe(client2);
    
            List<ClientIdentity> clients = metaManager.listAllSubscribeInfo(destination);
            Assert.assertEquals(Arrays.asList(client1, client2), clients);
    
            metaManager.unsubscribe(client2);
            ClientIdentity client3 = new ClientIdentity(destination, (short) 3);
            metaManager.subscribe(client3);
    
            clients = metaManager.listAllSubscribeInfo(destination);
            Assert.assertEquals(Arrays.asList(client1, client3), clients);
    
        }
    

    具体是测试多次subscribe不会出现重复的client记录到内存,还有就是 unscrbe的时候是否从内存移除掉,比较简单。
    testBatchAll是为了测试为 client 产生一个唯一、递增的id是否会重复之类的问题。
    testCursorAll是为了更新 cursor 游标等
    其他的类比较类似,主要是存贮的方式不一样,再主要看个文件的和zk的
    文件版的subscribe

    public void subscribe(final ClientIdentity clientIdentity) throws CanalMetaManagerException {
            super.subscribe(clientIdentity);
    
            // 订阅信息频率发生比较低,不需要做定时merge处理
            executor.submit(new Runnable() {
    
                public void run() {
                    flushDataToFile(clientIdentity.getDestination());
                }
            });
        }
    

    看到调用了flushDataToFile方法

    private void flushDataToFile(String destination, File dataFile) {
            FileMetaInstanceData data = new FileMetaInstanceData();
            if (destinations.containsKey(destination)) {
                synchronized (destination.intern()) { // 基于destination控制一下并发更新
                    data.setDestination(destination);
    
                    List<FileMetaClientIdentityData> clientDatas = Lists.newArrayList();
                    List<ClientIdentity> clientIdentitys = destinations.get(destination);
                    for (ClientIdentity clientIdentity : clientIdentitys) {
                        FileMetaClientIdentityData clientData = new FileMetaClientIdentityData();
                        clientData.setClientIdentity(clientIdentity);
                        Position position = cursors.get(clientIdentity);
                        if (position != null && position != nullCursor) {
                            clientData.setCursor((LogPosition) position);
                        }
    
                        clientDatas.add(clientData);
                    }
    
                    data.setClientDatas(clientDatas);
                }
    
                String json = JsonUtils.marshalToString(data);
                try {
                    FileUtils.writeStringToFile(dataFile, json);
                } catch (IOException e) {
                    throw new CanalMetaManagerException(e);
                }
            }
        }
    

    看到最后是写到了文件 FileUtils.writeStringToFile(dataFile, json);
    看一个比较完整的meta.dat的文件格式

    {
        "clientDatas": [
            {
                "clientIdentity": {
                    "clientId": 1001,
                    "destination": "example",
                    "filter": ".*\\..*"
                },
                "cursor": {
                    "identity": {
                        "slaveId": -1,
                        "sourceAddress": {
                            "address": "192.168.1.4",
                            "port": 3306
                        }
                    },
                    "postion": {
                        "included": false,
                        "journalName": "mysql-bin.000002",
                        "position": 10670,
                        "serverId": 1,
                        "timestamp": 1481773274000
                    }
                }
            }
        ],
        "destination": "example"
    }
    

    可以看到其实就是序列化的描述整个canal instance对应数据对象。

    在看下zk版本的实现

    public void subscribe(ClientIdentity clientIdentity) throws CanalMetaManagerException {
            String path = ZookeeperPathUtils.getClientIdNodePath(clientIdentity.getDestination(),
                clientIdentity.getClientId());
    
            try {
                zkClientx.createPersistent(path, true);
            } catch (ZkNodeExistsException e) {
                // ignore
            }
            if (clientIdentity.hasFilter()) {
                String filterPath = ZookeeperPathUtils.getFilterPath(clientIdentity.getDestination(),
                    clientIdentity.getClientId());
    
                byte[] bytes = null;
                try {
                    bytes = clientIdentity.getFilter().getBytes(ENCODE);
                } catch (UnsupportedEncodingException e) {
                    throw new CanalMetaManagerException(e);
                }
    
                try {
                    zkClientx.createPersistent(filterPath, bytes);
                } catch (ZkNodeExistsException e) {
                    // ignore
                    zkClientx.writeData(filterPath, bytes);
                }
            }
        }
    

    也是非常简单调用了zk客户端代码进行了树形目录的创建和数据写入

    zk树形目录创建

    注意这里创建的树的类型。

    写入数据到zk

    这是调用zkclient代码写入数据。我这里安装了zkui 所以可以看一下数据

    zkui web形式查看写入的节点数据

    这里会有人经常遇到这样的问题就是
    canal配置canal.instance.filter.regex无效,这里说一下这个问题
    其实可以查看对应的存贮文件meta.dat或者zk上的节点数据

    有{"clientIdentity":{"clientId":1001,"destination":"example","filter":".\.."}

    所以当你只关心部分库表更新时,设置了canal.instance.filter.regex,一定不要在客户端调用CanalConnector.subscribe(".\.."),不然等于没设置canal.instance.filter.regex。
    如果一定要调用CanalConnector.subscribe(".\.."),那么可以设置instance.properties的canal.instance.filter.black.regex参数添加黑名单,过滤非关注库表。

    总结,其实这个涉及到了设计模式,策略,根据你的配置,是否是基于zk等选择存贮方式,实现对client的管理。

    相关文章

      网友评论

        本文标题:canal源码解析之canal.meta

        本文链接:https://www.haomeiwen.com/subject/pmiomttx.html