CanalLauncher (Canal独立版本启动的入口类)
CanalStart (Canal Server启动类)
CanalController (Canal控制器)
CanalServerWithEmbeded (嵌入式版本实现)
1.加载配置文件
String conf = System.getProperty("canal.conf", "classpath:canal.properties");
2.判断是否是manager模式,如果是加载远程canal.properties
3. 启动CanalStart(Canal Server启动类)
final CanalStater canalStater =new CanalStater();
canalStater.start(properties);
3.1 CanalStart start方法解析
1.判断Server模式,默认tcp,我这里采用Kafka
String serverMode = CanalController.getProperty(properties, CanalConstants.CANAL_SERVER_MODE);
2.创建Kafka生产者
if (serverMode.equalsIgnoreCase("kafka")) {
canalMQProducer =new CanalKafkaProducer();
}
3. 创建CanalController启动canal server
CanalController start解析:
1.创建Zookeeper节点信息:/otter/canal/cluster/ip:port
2. 添加zookeeper节点监听
if (zkclientx !=null) {
this.zkclientx.subscribeStateChanges(new IZkStateListener() {
public void handleStateChanged(KeeperState state)throws Exception {
}
public void handleNewSession()throws Exception {
initCid(path);
}
@Override
public void handleSessionEstablishmentError(Throwable error)throws Exception {
logger.error("failed to connect to zookeeper", error);
}
});
}
2.启动Embeded服务
embededCanalServer.start();
3. CanalServerWithEmbeded start分析
1. 创建CanalInstance
canalInstances = MigrateMap.makeComputingMap(new Function() {
public CanalInstanceapply(String destination) {
return canalInstanceGenerator.generate(destination);
}
});
2.尝试启动非Lazy通道
ServerRunningMonitor(针对Server的Running节点控制)
网友评论