美文网首页
注册中心

注册中心

作者: 理想是一盏灯 | 来源:发表于2021-08-05 15:09 被阅读0次

    类结构图

    类结构图

    RegistryCenter 为操作注册中心的顶层接口。
    CoordinatorRegistryCenter 继承RegistryCenter 接口,并多加了一些操作节点的方法,比如创建临时节点/持久化顺序节点/临时顺序节点的方法,同时加了本地缓存注册中心节点数据的相关方法。
    ZookeeperRegistryCenter 实现CoordinatorRegistryCenter 的所有方法,基于curator实现。
    ZookeeperConfiguration为zk注册中心配置类,供ZookeeperRegistryCenter 连接zk时设置相关连接参数时使用。

    RegistryCenter

    RegistryCenter 接口定义了对注册中心进行初始化和增删改查相关操作的方法,其他子类实现都必须实现该接口


    RegistryCenter方法

    每个方法的功能如下表

    方法 功能
    RegistryCenter init 初始化注册中心
    RegistryCenter close 关闭注册中心
    RegistryCenter get 获取注册数据
    RegistryCenter isExisted 获取数据是否存在
    RegistryCenter persist 持久化注册数据
    RegistryCenter update 更新注册数据
    RegistryCenter remove 删除注册数据
    RegistryCenter getRegistryCenterTime 获取注册中心当前时间
    RegistryCenter getRawClient 直接获取操作注册中心的原生客户端

    源代码如下:

    public interface RegistryCenter {
        
        /**
         * 初始化注册中心.
         */
        void init();
        /**
         * 关闭注册中心.
         */
        void close();
        /**
         * 获取注册数据.
         * 
         * @param key 键
         * @return 值
         */
        String get(String key);
        /**
         * 获取数据是否存在.
         * 
         * @param key 键
         * @return 数据是否存在
         */
        boolean isExisted(String key);
        /**
         * 持久化注册数据.
         * 
         * @param key 键
         * @param value 值
         */
        void persist(String key, String value);
        /**
         * 更新注册数据.
         * 
         * @param key 键
         * @param value 值
         */
        void update(String key, String value);
        
        /**
         * 删除注册数据.
         * 
         * @param key 键
         */
        void remove(String key);
        
        /**
         * 获取注册中心当前时间.
         * 
         * @param key 用于获取时间的键
         * @return 注册中心当前时间
         */
        long getRegistryCenterTime(String key);
        
        /**
         * 直接获取操作注册中心的原生客户端.
         * 如:Zookeeper或Redis等原生客户端.
         * 
         * @return 注册中心的原生客户端
         */
        Object getRawClient();
    }
    

    CoordinatorRegistryCenter

    CoordinatorRegistryCenter接口继承了RegistryCenter,并加了一些方法,比如对注册中心临时/顺序节点进行操作、本地缓存相关操作、子节点相关操作


    CoordinatorRegistryCenter方法

    每个方法的功能如下表

    方法 功能
    CoordinatorRegistryCenter getDirectly 直接从注册中心而非本地缓存获取数据.
    CoordinatorRegistryCenter getChildrenKeys 获取子节点名称集合
    CoordinatorRegistryCenter getNumChildren 获取子节点数量
    CoordinatorRegistryCenter persistEphemeral 持久化临时注册数据
    CoordinatorRegistryCenter persistSequential 持久化顺序注册数据
    CoordinatorRegistryCenter persistEphemeralSequential 持久化临时顺序注册数据
    CoordinatorRegistryCenter addCacheData 添加本地缓存
    CoordinatorRegistryCenter evictCacheData 释放本地缓存
    CoordinatorRegistryCenter getRawCache 获取注册中心数据缓存对象

    代码如下

    import java.util.List;
    
    /**
     * 用于协调分布式服务的注册中心.
     * 
     * @author zhangliang
     */
    public interface CoordinatorRegistryCenter extends RegistryCenter {
        
        /**
         * 直接从注册中心而非本地缓存获取数据.
         * 
         * @param key 键
         * @return 值
         */
        String getDirectly(String key);
        
        /**
         * 获取子节点名称集合.
         * 
         * @param key 键
         * @return 子节点名称集合
         */
        List<String> getChildrenKeys(String key);
        
        /**
         * 获取子节点数量.
         *
         * @param key 键
         * @return 子节点数量
         */
        int getNumChildren(String key);
        
        /**
         * 持久化临时注册数据.
         * 
         * @param key 键
         * @param value 值
         */
        void persistEphemeral(String key, String value);
        
        /**
         * 持久化顺序注册数据.
         *
         * @param key 键
         * @param value 值
         * @return 包含10位顺序数字的znode名称
         */
        String persistSequential(String key, String value);
        
        /**
         * 持久化临时顺序注册数据.
         * 
         * @param key 键
         */
        void persistEphemeralSequential(String key);
        
        /**
         * 添加本地缓存.
         * 
         * @param cachePath 需加入缓存的路径
         */
        void addCacheData(String cachePath);
        
        /**
         * 释放本地缓存.
         *
         * @param cachePath 需释放缓存的路径
         */
        void evictCacheData(String cachePath);
        
        /**
         * 获取注册中心数据缓存对象.
         * 
         * @param cachePath 缓存的节点路径
         * @return 注册中心数据缓存对象
         */
        Object getRawCache(String cachePath);
    }
    

    ZookeeperRegistryCenter

    ZookeeperRegistryCenter 基于Curator实现了CoordinatorRegistryCenter接口的所有方法,Curator是一个基于zk原生api封装的高水平的客户端jar包,不了解的同学这个框架的可自行搜索相关资料

    初始化依赖的配置类-ZookeeperConfiguration

    public final class ZookeeperConfiguration {
        
        /**
         * 连接Zookeeper服务器的列表.
         * 包括IP地址和端口号.
         * 多个地址用逗号分隔.
         * 如: host1:2181,host2:2181
         */
        private final String serverLists;
        
        /**
         * 命名空间.
         */
        private final String namespace;
        
        /**
         * 等待重试的间隔时间的初始值.
         * 单位毫秒.
         */
        private int baseSleepTimeMilliseconds = 1000;
        
        /**
         * 等待重试的间隔时间的最大值.
         * 单位毫秒.
         */
        private int maxSleepTimeMilliseconds = 3000;
        
        /**
         * 最大重试次数.
         */
        private int maxRetries = 3;
        
        /**
         * 会话超时时间.
         * 单位毫秒.
         */
        private int sessionTimeoutMilliseconds;
        
        /**
         * 连接超时时间.
         * 单位毫秒.
         */
        private int connectionTimeoutMilliseconds;
        
        /**
         * 连接Zookeeper的权限令牌.
         * 缺省为不需要权限验证.
         */
        private String digest;
    }
    

    ZookeeperRegistryCenter 成员属性和构造方法

    1. 属性
    • curator的client
    • zk连接配置类zkConfig
    • 缓存zk注册中心数据的缓存Map:caches,key=作业名,value=该作业目录下的所有节点数据(目录树结构)
    1. 构造方法:设置属性zkConfig的值

    代码如下

    /**
     * 基于Zookeeper的注册中心.
     * 
     * @author zhangliang
     */
    @Slf4j
    public final class ZookeeperRegistryCenter implements CoordinatorRegistryCenter {
        
        @Getter(AccessLevel.PROTECTED)
        private ZookeeperConfiguration zkConfig;
        
        private final Map<String, TreeCache> caches = new HashMap<>();
        
        @Getter
        private CuratorFramework client;
        
        public ZookeeperRegistryCenter(final ZookeeperConfiguration zkConfig) {
            this.zkConfig = zkConfig;
        }
    

    初始化

     @Override
     public void init() {
         log.debug("Elastic job: zookeeper registry center init, server lists is: {}.", zkConfig.getServerLists());
         // 通过工厂+builder创建Curator client实例
         CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
                 // 服务器列表,格式host1:port1,host2:port2,...
                 .connectString(zkConfig.getServerLists())
                 // 重试策略
                 .retryPolicy(new ExponentialBackoffRetry(zkConfig.getBaseSleepTimeMilliseconds(), zkConfig.getMaxRetries(), zkConfig.getMaxSleepTimeMilliseconds()))
                 //命名空间
                 .namespace(zkConfig.getNamespace());
         // 会话超时时间
         if (0 != zkConfig.getSessionTimeoutMilliseconds()) {
             builder.sessionTimeoutMs(zkConfig.getSessionTimeoutMilliseconds());
         }
         //连接创建超时时间
         if (0 != zkConfig.getConnectionTimeoutMilliseconds()) {
             builder.connectionTimeoutMs(zkConfig.getConnectionTimeoutMilliseconds());
         }
         //ACL相关
         if (!Strings.isNullOrEmpty(zkConfig.getDigest())) {
             builder.authorization("digest", zkConfig.getDigest().getBytes(Charsets.UTF_8))
                     .aclProvider(new ACLProvider() {
                     
                         @Override
                         public List<ACL> getDefaultAcl() {
                             return ZooDefs.Ids.CREATOR_ALL_ACL;
                         }
                     
                         @Override
                         public List<ACL> getAclForPath(final String path) {
                             return ZooDefs.Ids.CREATOR_ALL_ACL;
                         }
                     });
         }
         // 构建得到Curator client实例
         client = builder.build();
         //启动Curator client实例
         client.start();
         try {
             if (!client.blockUntilConnected(zkConfig.getMaxSleepTimeMilliseconds() * zkConfig.getMaxRetries(), TimeUnit.MILLISECONDS)) {
                 client.close();
                 throw new KeeperException.OperationTimeoutException();
             }
             //CHECKSTYLE:OFF
         } catch (final Exception ex) {
             //CHECKSTYLE:ON
             RegExceptionHandler.handleException(ex);
         }
     }
    
    • 通过Curator的工厂CuratorFrameworkFactory+Bulider创建client实例并启动,curator框架的固定套路,不熟悉的同学可以搜索相关资料了解
    • ExponentialBackoffRetry为与zk连接断开时的重新连接策略类,它有三个核心参数,分别为重连最大次数retryCount ,等待重试的间隔时间的初始值baseSleepTimeMs ,等待重试的间隔时间的最大值maxSleepTimeMilliseconds ,每次重试睡眠的时间间隔计算方式为:
    sleepMs = this.baseSleepTimeMs * Math.max(1, this.random.nextInt(1 << retryCount + 1))
    
    • namespace命名空间,每个作业集群一个命名空间,相互隔离

    注册中心异常处理类RegExceptionHandler

    @NoArgsConstructor(access = AccessLevel.PRIVATE)
    public final class RegExceptionHandler {
        
        /**
         * 处理异常.
         * 
         * <p>处理掉中断和连接失效异常并继续抛注册中心.</p>
         * 
         * @param cause 待处理异常.
         */
        public static void handleException(final Exception cause) {
            if (null == cause) {
                return;
            }
            if (isIgnoredException(cause) || null != cause.getCause() && isIgnoredException(cause.getCause())) {
                log.debug("Elastic job: ignored exception for: {}", cause.getMessage());
            } else if (cause instanceof InterruptedException) {
                Thread.currentThread().interrupt();
            } else {
                throw new RegException(cause);
            }
        }
        
        private static boolean isIgnoredException(final Throwable cause) {
            return cause instanceof ConnectionLossException || cause instanceof NoNodeException || cause instanceof NodeExistsException;
        }
    }
    
    • 注册中心所有方法出现异常都会调用该类的handleException方法
    • 部分异常会被无视,仅打印异常。例如连接丢失异常,节点不存在异常等,这种异常将被忽略,其他非中断异常InterruptedException将会被抛出给调用端

    缓存

    通过 Curator 的TreeCache 实现缓存指定目录的数据,内部有zk的watcher监听该目录的变更事件,该目录及该目录下任何节点的变更都会实时更新到缓存,不熟悉Curator 缓存机制的同学可以自行搜索了解

    添加目录缓存数据

        @Override
        public void addCacheData(final String cachePath) {
            // 缓存指定路径下的数据
            TreeCache cache = new TreeCache(client, cachePath);
            try {
                cache.start();
            //CHECKSTYLE:OFF
            } catch (final Exception ex) {
            //CHECKSTYLE:ON
                RegExceptionHandler.handleException(ex);
            }
           // 将当前作业的目录缓存数据加到注册中心caches
            caches.put(cachePath + "/", cache);
        }
    
    
    • 在作业启动初始化的时候进行了调用
      com.dangdang.ddframe.job.lite.internal.schedule.JobRegistry#registerJob
        public void registerJob(final String jobName, final JobScheduleController jobScheduleController, final CoordinatorRegistryCenter regCenter) {
            schedulerMap.put(jobName, jobScheduleController);
            regCenterMap.put(jobName, regCenter);
            regCenter.addCacheData("/" + jobName);
        }
    
    

    订阅缓存目录变更事件

    可以订阅TreeCache的缓存目录,具体就是通过增加监听器来监听缓存目录的状态变更事件,当收到该缓存目录下任何节点的变更事件后将会回调监听器的childEvent方法。
    不熟悉Curator 缓存机制的同学可以自行搜索了解
    后面文章讲的注册中心监听器,都会订阅缓存目录的事件实现其功能逻辑。
    com.dangdang.ddframe.job.lite.internal.storage.JobNodeStorage#addDataListener

       /**
         * 注册数据监听器.
         * 
         * @param listener 数据监听器
    
         */
        public void addDataListener(final TreeCacheListener listener) {
            TreeCache cache = (TreeCache) regCenter.getRawCache("/" + jobName);
            cache.getListenable().addListener(listener);
        }
        
    

    订阅发生的时机在作业启动初始时的如下类中,作业启动初始化后面讲解


    订阅缓存目录变更事件类

    释放指定目录下缓存数据

        //释放指定路径下的缓存数据
        @Override
        public void evictCacheData(final String cachePath) {
            TreeCache cache = caches.remove(cachePath + "/");
            if (null != cache) {
                cache.close();
            }
        }   
    

    获取指定路径下的缓存数据

      @Override
        //获取指定路径的缓存数据
        public Object getRawCache(final String cachePath) {
            return caches.get(cachePath + "/");
        }
    

    获取数据

    优先从缓存获取数据

        @Override
        public String get(final String key) {
            //先优先从本地缓存获取数据
            TreeCache cache = findTreeCache(key);
            if (null == cache) {
                return getDirectly(key);
            }
            //根据key从缓存获取数据
            ChildData resultInCache = cache.getCurrentData(key);
            if (null != resultInCache) {
                return null == resultInCache.getData() ? null : new String(resultInCache.getData(), Charsets.UTF_8);
            }
            // 本地缓存获取不到,直接从注册中心获取
            return getDirectly(key);
        }
    
        // 从缓存获取数据
        private TreeCache findTreeCache(final String key) {
            for (Entry<String, TreeCache> entry : caches.entrySet()) {
                if (key.startsWith(entry.getKey())) {
                    return entry.getValue();
                }
            }
            return null;
        }
    
    • 优先从本地缓存获取数据
    • 如果本地缓存没有,从注册中心获取数据

    从注册中心获取数据

        //从注册中心获取数据
        @Override
        public String getDirectly(final String key) {
            try {
                return new String(client.getData().forPath(key), Charsets.UTF_8);
            //CHECKSTYLE:OFF
            } catch (final Exception ex) {
            //CHECKSTYLE:ON
                RegExceptionHandler.handleException(ex);
                return null;
            }
        }
    

    子节点相关

    获取子节点方法

        @Override
        public List<String> getChildrenKeys(final String key) {
            try {
                //获取节点的子节点
                List<String> result = client.getChildren().forPath(key);
                //节点倒序
                Collections.sort(result, new Comparator<String>() {
                    
                    @Override
                    public int compare(final String o1, final String o2) {
                        return o2.compareTo(o1);
                    }
                });
                return result;
             //CHECKSTYLE:OFF
            } catch (final Exception ex) {
            //CHECKSTYLE:ON
                RegExceptionHandler.handleException(ex);
                return Collections.emptyList();
            }
        }
    
    • 对获取的子节点按节点名称进行了排序,倒叙

    获取子节点数量方法

        @Override
        public int getNumChildren(final String key) {
            try {
                // 获取节点的子节点数量
                Stat stat = client.checkExists().forPath(key);
                if (null != stat) {
                    return stat.getNumChildren();
                }
                //CHECKSTYLE:OFF
            } catch (final Exception ex) {
                //CHECKSTYLE:ON
                RegExceptionHandler.handleException(ex);
            }
            return 0;
        }
    

    节点操作相关

    判断节点是否存在

        @Override
        public boolean isExisted(final String key) {
            try {
                //判断节点是否存在
                return null != client.checkExists().forPath(key);
            //CHECKSTYLE:OFF
            } catch (final Exception ex) {
            //CHECKSTYLE:ON
                RegExceptionHandler.handleException(ex);
                return false;
            }
        }
    

    创建持久化节点

        @Override
        public void persist(final String key, final String value) {
            try {
                //不存在则创建持久化节点
                if (!isExisted(key)) {
                    client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(key, value.getBytes(Charsets.UTF_8));
                } else {
                    //存在则更新
                    update(key, value);
                }
            //CHECKSTYLE:OFF
            } catch (final Exception ex) {
            //CHECKSTYLE:ON
                RegExceptionHandler.handleException(ex);
            }
        }
       
    

    更新节点方法

        @Override
        public void update(final String key, final String value) {
            try {
                // 更新数据
                client.inTransaction().check().forPath(key).and().setData().forPath(key, value.getBytes(Charsets.UTF_8)).and().commit();
            //CHECKSTYLE:OFF
            } catch (final Exception ex) {
            //CHECKSTYLE:ON
                RegExceptionHandler.handleException(ex);
            }
        }
        
    
    • 使用事务校验键节点存在才进行更新

    创建临时节点方法

        @Override
        public void persistEphemeral(final String key, final String value) {
            try {
                //存在则删除节点
                if (isExisted(key)) {
                    client.delete().deletingChildrenIfNeeded().forPath(key);
                }
                //创建临时节点
                client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(key, value.getBytes(Charsets.UTF_8));
            //CHECKSTYLE:OFF
            } catch (final Exception ex) {
            //CHECKSTYLE:ON
                RegExceptionHandler.handleException(ex);
            }
        }
       
    

    创建持久化顺序节点

    该方法在当前版本未使用,可以不用关注

        @Override
        public String persistSequential(final String key, final String value) {
            try {
                //创建持久化的顺序节点
                return client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(key, value.getBytes(Charsets.UTF_8));
            //CHECKSTYLE:OFF
            } catch (final Exception ex) {
            //CHECKSTYLE:ON
                RegExceptionHandler.handleException(ex);
            }
            return null;
        }
        
    

    创建临时顺序节点方法

        @Override
        public void persistEphemeralSequential(final String key) {
            try {
                //创建临时顺序节点
                client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(key);
            //CHECKSTYLE:OFF
            } catch (final Exception ex) {
            //CHECKSTYLE:ON
                RegExceptionHandler.handleException(ex);
            }
        }
    

    移除节点方法

        @Override
        public void remove(final String key) {
            try {
                //移除节点
                client.delete().deletingChildrenIfNeeded().forPath(key);
            //CHECKSTYLE:OFF
            } catch (final Exception ex) {
            //CHECKSTYLE:ON
                RegExceptionHandler.handleException(ex);
            }
        }    
    

    获取注册中心当前时间

        @Override
        public long getRegistryCenterTime(final String key) {
            long result = 0L;
            try {
                persist(key, "");
                //获取指定节点的注册中心时间
                result = client.checkExists().forPath(key).getMtime();
            //CHECKSTYLE:OFF
            } catch (final Exception ex) {
            //CHECKSTYLE:ON
                RegExceptionHandler.handleException(ex);
            }
            Preconditions.checkState(0L != result, "Cannot get registry center time.");
            return result;
        }
    
    • 通过更新节点,获取节点最后的更新时间作为注册中心的当前时间

    获取注册中心原生客户端

        @Override
        public Object getRawClient() {
            //获取curator的client
            return client;
        }
    

    关闭注册中心连接

        @Override
        public void close() {
            // 先关闭缓存
            for (Entry<String, TreeCache> each : caches.entrySet()) {
                each.getValue().close();
            }
            waitForCacheClose();
            //再关闭连接
            CloseableUtils.closeQuietly(client);
        }
        
        /* TODO 等待500ms, cache先关闭再关闭client, 否则会抛异常
         * 因为异步处理, 可能会导致client先关闭而cache还未关闭结束.
         * 等待Curator新版本解决这个bug.
         * BUG地址:https://issues.apache.org/jira/browse/CURATOR-157
         */
        private void waitForCacheClose() {
            try {
                Thread.sleep(500L);
            } catch (final InterruptedException ex) {
                Thread.currentThread().interrupt();
            }
        }
    
    • 先关闭缓存,清空缓存相关数据
    • 再关闭client

    相关文章

      网友评论

          本文标题:注册中心

          本文链接:https://www.haomeiwen.com/subject/sginmltx.html