美文网首页Canal
Canal Server启动过程

Canal Server启动过程

作者: xzb9527 | 来源:发表于2019-02-01 14:11 被阅读0次

    CanalLauncher (Canal独立版本启动的入口类)

    CanalStart (Canal Server启动类)

    CanalController (Canal控制器)

    CanalServerWithEmbeded (嵌入式版本实现)

    1.加载配置文件

    String conf = System.getProperty("canal.conf", "classpath:canal.properties");

    2.判断是否是manager模式,如果是加载远程canal.properties

    3. 启动CanalStart(Canal Server启动类)

    final CanalStater canalStater =new CanalStater();

    canalStater.start(properties);

    3.1 CanalStart start方法解析

    1.判断Server模式,默认tcp,我这里采用Kafka

    String serverMode = CanalController.getProperty(properties, CanalConstants.CANAL_SERVER_MODE);

    2.创建Kafka生产者

    if (serverMode.equalsIgnoreCase("kafka")) {

    canalMQProducer =new CanalKafkaProducer();

    }

    3. 创建CanalController启动canal server

    CanalController start解析:

    1.创建Zookeeper节点信息:/otter/canal/cluster/ip:port

    2. 添加zookeeper节点监听

    if (zkclientx !=null) {

    this.zkclientx.subscribeStateChanges(new IZkStateListener() {

    public void handleStateChanged(KeeperState state)throws Exception {

    }

    public void handleNewSession()throws Exception {

    initCid(path);

            }

    @Override

            public void handleSessionEstablishmentError(Throwable error)throws Exception {

    logger.error("failed to connect to zookeeper", error);

            }

    });

    }

    2.启动Embeded服务

    embededCanalServer.start();

    3. CanalServerWithEmbeded start分析

    1. 创建CanalInstance

    canalInstances = MigrateMap.makeComputingMap(new Function() {

    public CanalInstanceapply(String destination) {

    return canalInstanceGenerator.generate(destination);

        }

    });

    2.尝试启动非Lazy通道

    ServerRunningMonitor(针对Server的Running节点控制)

    相关文章

      网友评论

        本文标题:Canal Server启动过程

        本文链接:https://www.haomeiwen.com/subject/fjjakqtx.html