美文网首页
Canal部署各角色如何协作

Canal部署各角色如何协作

作者: 无醉_1866 | 来源:发表于2019-10-13 13:39 被阅读0次

    Canal中的角色

    Canal中包含三种角色,分别是:

    • Canal server,通过canal-deployer部署,用于抽取binlog
    • Canal manager,canal的管理控制台,在canal server中被称为manager,代码在canal-admin子工程中,提供webui,可选
    • Zookeeper:用于存储元数据,canal instance的HA保障

    相关源码简单阅读

    canal server 与canal admin

    打开canal的启动类CanalLatcher,在其main方法中可看到如下代码段:

    image

    这里是canal server通过canal admin获取server配置,有两处获取配置的代码:

    • 启动时先获取server的配置配置,参数为null
    • 默认每5秒获取一次配置,参数为上一次获取的配置的md5

    获取配置后,有canalServer.start()方法的调用,用于启动canal server。

    从CanalStarter.start()的调用中可以看到创建了CanalController,而CanalController的构造器中有如下代码片断:

    image

    在InstanceConfigMonitor中,会定时获取Instance配置。

    打开canalServer.start()方法,在最后面有代码片断:

    image

    此处有个canalAdminWithNetty.Start(),从类名和注解来看,这里用了netty并启动了canalAdmin,刚开始看到这里,我就有点懵逼了,这里有蹦出了个admin,manager所在的子工程也叫canal-admin,代码命名上不太友好,canal server中有与manager类似的概念,我们继续进入到canalAdminWithNetty.start()方法:

    image

    这里是使用了netty监听网络数据,端口则是canal.properties里配置的canal.admin.port,我们再来看看创建netty的Bootstrap所使用的ChannelPipeline,其中最重要的是SessionHandler,在SessionHandler中处理读取到的网络数据包:

      public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        logger.info("message receives in session handler...");
        ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
        Packet packet = Packet.parseFrom(buffer.readBytes(buffer.readableBytes()).array());
        try {
          String action = null;
          String message = null;
          String destination = null;
          switch (packet.getType()) {
            case SERVER:
              ServerAdmin serverAdmin = ServerAdmin.parseFrom(packet.getBody());
              action = serverAdmin.getAction();
              switch (action) {
                case "check":
                  message = canalAdmin.check() ? "1" : "0";
                  break;
                case "start":
                  message = canalAdmin.start() ? "1" : "0";
                  break;
                case "stop":
                  message = canalAdmin.stop() ? "1" : "0";
                  break;
                case "restart":
                  message = canalAdmin.restart() ? "1" : "0";
                  break;
                case "list":
                  message = canalAdmin.getRunningInstances();
                  break;
                default:
                  byte[] errorBytes = AdminNettyUtils.errorPacket(301,
                      MessageFormatter.format("ServerAdmin action={} is unknown", action).getMessage());
                  AdminNettyUtils.write(ctx.getChannel(), errorBytes);
                  break;
              }
              AdminNettyUtils.write(ctx.getChannel(), AdminNettyUtils.ackPacket(message));
              break;
            case INSTANCE:
              InstanceAdmin instanceAdmin = InstanceAdmin.parseFrom(packet.getBody());
              destination = instanceAdmin.getDestination();
              action = instanceAdmin.getAction();
              switch (action) {
                case "check":
                  message = canalAdmin.checkInstance(destination) ? "1" : "0";
                  break;
                case "start":
                  message = canalAdmin.startInstance(destination) ? "1" : "0";
                  break;
                case "stop":
                  message = canalAdmin.stopInstance(destination) ? "1" : "0";
                  break;
                case "release":
                  message = canalAdmin.releaseInstance(destination) ? "1" : "0";
                  break;
                case "restart":
                  message = canalAdmin.restartInstance(destination) ? "1" : "0";
                  break;
                default:
                  byte[] errorBytes = AdminNettyUtils.errorPacket(301,
                      MessageFormatter.format("InstanceAdmin action={} is unknown", action).getMessage());
                  AdminNettyUtils.write(ctx.getChannel(), errorBytes);
                  break;
              }
              AdminNettyUtils.write(ctx.getChannel(), AdminNettyUtils.ackPacket(message));
              break;
            case LOG:
              LogAdmin logAdmin = LogAdmin.parseFrom(packet.getBody());
              action = logAdmin.getAction();
              destination = logAdmin.getDestination();
              String type = logAdmin.getType();
              String file = logAdmin.getFile();
              int count = logAdmin.getCount();
              switch (type) {
                case "server":
                  if ("list".equalsIgnoreCase(action)) {
                    message = canalAdmin.listCanalLog();
                  } else {
                    message = canalAdmin.canalLog(count);
                  }
                  break;
                case "instance":
                  if ("list".equalsIgnoreCase(action)) {
                    message = canalAdmin.listInstanceLog(destination);
                  } else {
                    message = canalAdmin.instanceLog(destination, file, count);
                  }
                  break;
                default:
                  byte[] errorBytes = AdminNettyUtils.errorPacket(301,
                      MessageFormatter.format("LogAdmin type={} is unknown", type).getMessage());
                  AdminNettyUtils.write(ctx.getChannel(), errorBytes);
                  break;
              }
              AdminNettyUtils.write(ctx.getChannel(), AdminNettyUtils.ackPacket(message));
              break;
            default:
              byte[] errorBytes = AdminNettyUtils.errorPacket(300,
                  MessageFormatter.format("packet type={} is NOT supported!", packet.getType()).getMessage());
              AdminNettyUtils.write(ctx.getChannel(), errorBytes);
              break;
          }
        } catch (Throwable exception) {
          byte[] errorBytes = AdminNettyUtils.errorPacket(400,
              MessageFormatter.format("something goes wrong with channel:{}, exception={}",
                  ctx.getChannel(),
                  ExceptionUtils.getStackTrace(exception)).getMessage());
          AdminNettyUtils.write(ctx.getChannel(), errorBytes);
        }
      }
    
    

    看到这里也就明白了,这个CanalAdminWithNetty实际上就是给manager调用的,用于对canal做管理,在manager的webui上可以看到对应的功能:

    image

    那么canal server与canal manager之间是一个双向调用的关系:

    image

    canal-admin有两个作用:

    • 类似于配置中心,所有canal server从canal-admin定时摘取配置,包括server配置与instance配置,也就是canal.properties和instance.properties的内容
    • 对canal server和canal instance的状态做管控,控制server/instance的启动停止等

    canal server与canal-admin之间是双向调用,canal server即是canal-admin的客户端,同时也会响应canal admin的请求

    canal server与zookeeper

    回到CanalController类,我们可以在start方法中看到,在启动时,会向zk注册一个临时节点,此目录包含当前机器的ip和端口(配置中的canal.registerIp, canal.port),当有新的session建议时,会重新注册该节点,很明显,这里类似于注册中心的作用,canal server启动时,先向zk注册自身的地址:

    image

    在CanalController中,我们可以看到ServerRunningMonitor对象的创建代码:

    image

    ServerRunningMonitor是针对每 一个destination的,即一个instance会创建一个ServerRunningMonitor,在ServerRunningMonitor.initRunning()方法中,会从zk上创建节点,成功则初始化instance,如果失败,则只从zk获取到运行instance的节点数据:

    image

    总结

    从CanalController与ServerRunningMonitor中可以看到有两处zk交互点,一个是注册自身的ip+端口,一个是创建节点并初始化instance(分布式锁),总体上看,三角色之间有如下交互关系:

    image

    相关文章

      网友评论

          本文标题:Canal部署各角色如何协作

          本文链接:https://www.haomeiwen.com/subject/kywhmctx.html