Canal中的角色
Canal中包含三种角色,分别是:
- Canal server,通过canal-deployer部署,用于抽取binlog
- Canal manager,canal的管理控制台,在canal server中被称为manager,代码在canal-admin子工程中,提供webui,可选
- Zookeeper:用于存储元数据,canal instance的HA保障
相关源码简单阅读
canal server 与canal admin
打开canal的启动类CanalLatcher,在其main方法中可看到如下代码段:
image这里是canal server通过canal admin获取server配置,有两处获取配置的代码:
- 启动时先获取server的配置配置,参数为null
- 默认每5秒获取一次配置,参数为上一次获取的配置的md5
获取配置后,有canalServer.start()方法的调用,用于启动canal server。
从CanalStarter.start()的调用中可以看到创建了CanalController,而CanalController的构造器中有如下代码片断:
image在InstanceConfigMonitor中,会定时获取Instance配置。
打开canalServer.start()方法,在最后面有代码片断:
image此处有个canalAdminWithNetty.Start(),从类名和注解来看,这里用了netty并启动了canalAdmin,刚开始看到这里,我就有点懵逼了,这里有蹦出了个admin,manager所在的子工程也叫canal-admin,代码命名上不太友好,canal server中有与manager类似的概念,我们继续进入到canalAdminWithNetty.start()方法:
image这里是使用了netty监听网络数据,端口则是canal.properties里配置的canal.admin.port,我们再来看看创建netty的Bootstrap所使用的ChannelPipeline,其中最重要的是SessionHandler,在SessionHandler中处理读取到的网络数据包:
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
logger.info("message receives in session handler...");
ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
Packet packet = Packet.parseFrom(buffer.readBytes(buffer.readableBytes()).array());
try {
String action = null;
String message = null;
String destination = null;
switch (packet.getType()) {
case SERVER:
ServerAdmin serverAdmin = ServerAdmin.parseFrom(packet.getBody());
action = serverAdmin.getAction();
switch (action) {
case "check":
message = canalAdmin.check() ? "1" : "0";
break;
case "start":
message = canalAdmin.start() ? "1" : "0";
break;
case "stop":
message = canalAdmin.stop() ? "1" : "0";
break;
case "restart":
message = canalAdmin.restart() ? "1" : "0";
break;
case "list":
message = canalAdmin.getRunningInstances();
break;
default:
byte[] errorBytes = AdminNettyUtils.errorPacket(301,
MessageFormatter.format("ServerAdmin action={} is unknown", action).getMessage());
AdminNettyUtils.write(ctx.getChannel(), errorBytes);
break;
}
AdminNettyUtils.write(ctx.getChannel(), AdminNettyUtils.ackPacket(message));
break;
case INSTANCE:
InstanceAdmin instanceAdmin = InstanceAdmin.parseFrom(packet.getBody());
destination = instanceAdmin.getDestination();
action = instanceAdmin.getAction();
switch (action) {
case "check":
message = canalAdmin.checkInstance(destination) ? "1" : "0";
break;
case "start":
message = canalAdmin.startInstance(destination) ? "1" : "0";
break;
case "stop":
message = canalAdmin.stopInstance(destination) ? "1" : "0";
break;
case "release":
message = canalAdmin.releaseInstance(destination) ? "1" : "0";
break;
case "restart":
message = canalAdmin.restartInstance(destination) ? "1" : "0";
break;
default:
byte[] errorBytes = AdminNettyUtils.errorPacket(301,
MessageFormatter.format("InstanceAdmin action={} is unknown", action).getMessage());
AdminNettyUtils.write(ctx.getChannel(), errorBytes);
break;
}
AdminNettyUtils.write(ctx.getChannel(), AdminNettyUtils.ackPacket(message));
break;
case LOG:
LogAdmin logAdmin = LogAdmin.parseFrom(packet.getBody());
action = logAdmin.getAction();
destination = logAdmin.getDestination();
String type = logAdmin.getType();
String file = logAdmin.getFile();
int count = logAdmin.getCount();
switch (type) {
case "server":
if ("list".equalsIgnoreCase(action)) {
message = canalAdmin.listCanalLog();
} else {
message = canalAdmin.canalLog(count);
}
break;
case "instance":
if ("list".equalsIgnoreCase(action)) {
message = canalAdmin.listInstanceLog(destination);
} else {
message = canalAdmin.instanceLog(destination, file, count);
}
break;
default:
byte[] errorBytes = AdminNettyUtils.errorPacket(301,
MessageFormatter.format("LogAdmin type={} is unknown", type).getMessage());
AdminNettyUtils.write(ctx.getChannel(), errorBytes);
break;
}
AdminNettyUtils.write(ctx.getChannel(), AdminNettyUtils.ackPacket(message));
break;
default:
byte[] errorBytes = AdminNettyUtils.errorPacket(300,
MessageFormatter.format("packet type={} is NOT supported!", packet.getType()).getMessage());
AdminNettyUtils.write(ctx.getChannel(), errorBytes);
break;
}
} catch (Throwable exception) {
byte[] errorBytes = AdminNettyUtils.errorPacket(400,
MessageFormatter.format("something goes wrong with channel:{}, exception={}",
ctx.getChannel(),
ExceptionUtils.getStackTrace(exception)).getMessage());
AdminNettyUtils.write(ctx.getChannel(), errorBytes);
}
}
看到这里也就明白了,这个CanalAdminWithNetty实际上就是给manager调用的,用于对canal做管理,在manager的webui上可以看到对应的功能:
image那么canal server与canal manager之间是一个双向调用的关系:
imagecanal-admin有两个作用:
- 类似于配置中心,所有canal server从canal-admin定时摘取配置,包括server配置与instance配置,也就是canal.properties和instance.properties的内容
- 对canal server和canal instance的状态做管控,控制server/instance的启动停止等
canal server与canal-admin之间是双向调用,canal server即是canal-admin的客户端,同时也会响应canal admin的请求
canal server与zookeeper
回到CanalController类,我们可以在start方法中看到,在启动时,会向zk注册一个临时节点,此目录包含当前机器的ip和端口(配置中的canal.registerIp, canal.port),当有新的session建议时,会重新注册该节点,很明显,这里类似于注册中心的作用,canal server启动时,先向zk注册自身的地址:
image在CanalController中,我们可以看到ServerRunningMonitor对象的创建代码:
imageServerRunningMonitor是针对每 一个destination的,即一个instance会创建一个ServerRunningMonitor,在ServerRunningMonitor.initRunning()方法中,会从zk上创建节点,成功则初始化instance,如果失败,则只从zk获取到运行instance的节点数据:
image总结
从CanalController与ServerRunningMonitor中可以看到有两处zk交互点,一个是注册自身的ip+端口,一个是创建节点并初始化instance(分布式锁),总体上看,三角色之间有如下交互关系:
image
网友评论