类结构图
类结构图RegistryCenter 为操作注册中心的顶层接口。
CoordinatorRegistryCenter 继承RegistryCenter 接口,并多加了一些操作节点的方法,比如创建临时节点/持久化顺序节点/临时顺序节点的方法,同时加了本地缓存注册中心节点数据的相关方法。
ZookeeperRegistryCenter 实现CoordinatorRegistryCenter 的所有方法,基于curator实现。
ZookeeperConfiguration为zk注册中心配置类,供ZookeeperRegistryCenter 连接zk时设置相关连接参数时使用。
RegistryCenter
RegistryCenter 接口定义了对注册中心进行初始化和增删改查相关操作的方法,其他子类实现都必须实现该接口
RegistryCenter方法
每个方法的功能如下表
类 | 方法 | 功能 |
---|---|---|
RegistryCenter | init | 初始化注册中心 |
RegistryCenter | close | 关闭注册中心 |
RegistryCenter | get | 获取注册数据 |
RegistryCenter | isExisted | 获取数据是否存在 |
RegistryCenter | persist | 持久化注册数据 |
RegistryCenter | update | 更新注册数据 |
RegistryCenter | remove | 删除注册数据 |
RegistryCenter | getRegistryCenterTime | 获取注册中心当前时间 |
RegistryCenter | getRawClient | 直接获取操作注册中心的原生客户端 |
源代码如下:
public interface RegistryCenter {
/**
* 初始化注册中心.
*/
void init();
/**
* 关闭注册中心.
*/
void close();
/**
* 获取注册数据.
*
* @param key 键
* @return 值
*/
String get(String key);
/**
* 获取数据是否存在.
*
* @param key 键
* @return 数据是否存在
*/
boolean isExisted(String key);
/**
* 持久化注册数据.
*
* @param key 键
* @param value 值
*/
void persist(String key, String value);
/**
* 更新注册数据.
*
* @param key 键
* @param value 值
*/
void update(String key, String value);
/**
* 删除注册数据.
*
* @param key 键
*/
void remove(String key);
/**
* 获取注册中心当前时间.
*
* @param key 用于获取时间的键
* @return 注册中心当前时间
*/
long getRegistryCenterTime(String key);
/**
* 直接获取操作注册中心的原生客户端.
* 如:Zookeeper或Redis等原生客户端.
*
* @return 注册中心的原生客户端
*/
Object getRawClient();
}
CoordinatorRegistryCenter
CoordinatorRegistryCenter接口继承了RegistryCenter,并加了一些方法,比如对注册中心临时/顺序节点进行操作、本地缓存相关操作、子节点相关操作
CoordinatorRegistryCenter方法
每个方法的功能如下表
类 | 方法 | 功能 |
---|---|---|
CoordinatorRegistryCenter | getDirectly | 直接从注册中心而非本地缓存获取数据. |
CoordinatorRegistryCenter | getChildrenKeys | 获取子节点名称集合 |
CoordinatorRegistryCenter | getNumChildren | 获取子节点数量 |
CoordinatorRegistryCenter | persistEphemeral | 持久化临时注册数据 |
CoordinatorRegistryCenter | persistSequential | 持久化顺序注册数据 |
CoordinatorRegistryCenter | persistEphemeralSequential | 持久化临时顺序注册数据 |
CoordinatorRegistryCenter | addCacheData | 添加本地缓存 |
CoordinatorRegistryCenter | evictCacheData | 释放本地缓存 |
CoordinatorRegistryCenter | getRawCache | 获取注册中心数据缓存对象 |
代码如下
import java.util.List;
/**
* 用于协调分布式服务的注册中心.
*
* @author zhangliang
*/
public interface CoordinatorRegistryCenter extends RegistryCenter {
/**
* 直接从注册中心而非本地缓存获取数据.
*
* @param key 键
* @return 值
*/
String getDirectly(String key);
/**
* 获取子节点名称集合.
*
* @param key 键
* @return 子节点名称集合
*/
List<String> getChildrenKeys(String key);
/**
* 获取子节点数量.
*
* @param key 键
* @return 子节点数量
*/
int getNumChildren(String key);
/**
* 持久化临时注册数据.
*
* @param key 键
* @param value 值
*/
void persistEphemeral(String key, String value);
/**
* 持久化顺序注册数据.
*
* @param key 键
* @param value 值
* @return 包含10位顺序数字的znode名称
*/
String persistSequential(String key, String value);
/**
* 持久化临时顺序注册数据.
*
* @param key 键
*/
void persistEphemeralSequential(String key);
/**
* 添加本地缓存.
*
* @param cachePath 需加入缓存的路径
*/
void addCacheData(String cachePath);
/**
* 释放本地缓存.
*
* @param cachePath 需释放缓存的路径
*/
void evictCacheData(String cachePath);
/**
* 获取注册中心数据缓存对象.
*
* @param cachePath 缓存的节点路径
* @return 注册中心数据缓存对象
*/
Object getRawCache(String cachePath);
}
ZookeeperRegistryCenter
ZookeeperRegistryCenter 基于Curator实现了CoordinatorRegistryCenter接口的所有方法,Curator是一个基于zk原生api封装的高水平的客户端jar包,不了解的同学这个框架的可自行搜索相关资料
初始化依赖的配置类-ZookeeperConfiguration
public final class ZookeeperConfiguration {
/**
* 连接Zookeeper服务器的列表.
* 包括IP地址和端口号.
* 多个地址用逗号分隔.
* 如: host1:2181,host2:2181
*/
private final String serverLists;
/**
* 命名空间.
*/
private final String namespace;
/**
* 等待重试的间隔时间的初始值.
* 单位毫秒.
*/
private int baseSleepTimeMilliseconds = 1000;
/**
* 等待重试的间隔时间的最大值.
* 单位毫秒.
*/
private int maxSleepTimeMilliseconds = 3000;
/**
* 最大重试次数.
*/
private int maxRetries = 3;
/**
* 会话超时时间.
* 单位毫秒.
*/
private int sessionTimeoutMilliseconds;
/**
* 连接超时时间.
* 单位毫秒.
*/
private int connectionTimeoutMilliseconds;
/**
* 连接Zookeeper的权限令牌.
* 缺省为不需要权限验证.
*/
private String digest;
}
ZookeeperRegistryCenter 成员属性和构造方法
- 属性
- curator的client
- zk连接配置类zkConfig
- 缓存zk注册中心数据的缓存Map:caches,key=作业名,value=该作业目录下的所有节点数据(目录树结构)
- 构造方法:设置属性zkConfig的值
代码如下
/**
* 基于Zookeeper的注册中心.
*
* @author zhangliang
*/
@Slf4j
public final class ZookeeperRegistryCenter implements CoordinatorRegistryCenter {
@Getter(AccessLevel.PROTECTED)
private ZookeeperConfiguration zkConfig;
private final Map<String, TreeCache> caches = new HashMap<>();
@Getter
private CuratorFramework client;
public ZookeeperRegistryCenter(final ZookeeperConfiguration zkConfig) {
this.zkConfig = zkConfig;
}
初始化
@Override
public void init() {
log.debug("Elastic job: zookeeper registry center init, server lists is: {}.", zkConfig.getServerLists());
// 通过工厂+builder创建Curator client实例
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
// 服务器列表,格式host1:port1,host2:port2,...
.connectString(zkConfig.getServerLists())
// 重试策略
.retryPolicy(new ExponentialBackoffRetry(zkConfig.getBaseSleepTimeMilliseconds(), zkConfig.getMaxRetries(), zkConfig.getMaxSleepTimeMilliseconds()))
//命名空间
.namespace(zkConfig.getNamespace());
// 会话超时时间
if (0 != zkConfig.getSessionTimeoutMilliseconds()) {
builder.sessionTimeoutMs(zkConfig.getSessionTimeoutMilliseconds());
}
//连接创建超时时间
if (0 != zkConfig.getConnectionTimeoutMilliseconds()) {
builder.connectionTimeoutMs(zkConfig.getConnectionTimeoutMilliseconds());
}
//ACL相关
if (!Strings.isNullOrEmpty(zkConfig.getDigest())) {
builder.authorization("digest", zkConfig.getDigest().getBytes(Charsets.UTF_8))
.aclProvider(new ACLProvider() {
@Override
public List<ACL> getDefaultAcl() {
return ZooDefs.Ids.CREATOR_ALL_ACL;
}
@Override
public List<ACL> getAclForPath(final String path) {
return ZooDefs.Ids.CREATOR_ALL_ACL;
}
});
}
// 构建得到Curator client实例
client = builder.build();
//启动Curator client实例
client.start();
try {
if (!client.blockUntilConnected(zkConfig.getMaxSleepTimeMilliseconds() * zkConfig.getMaxRetries(), TimeUnit.MILLISECONDS)) {
client.close();
throw new KeeperException.OperationTimeoutException();
}
//CHECKSTYLE:OFF
} catch (final Exception ex) {
//CHECKSTYLE:ON
RegExceptionHandler.handleException(ex);
}
}
- 通过Curator的工厂CuratorFrameworkFactory+Bulider创建client实例并启动,curator框架的固定套路,不熟悉的同学可以搜索相关资料了解
- ExponentialBackoffRetry为与zk连接断开时的重新连接策略类,它有三个核心参数,分别为重连最大次数retryCount ,等待重试的间隔时间的初始值baseSleepTimeMs ,等待重试的间隔时间的最大值maxSleepTimeMilliseconds ,每次重试睡眠的时间间隔计算方式为:
sleepMs = this.baseSleepTimeMs * Math.max(1, this.random.nextInt(1 << retryCount + 1))
- namespace命名空间,每个作业集群一个命名空间,相互隔离
注册中心异常处理类RegExceptionHandler
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class RegExceptionHandler {
/**
* 处理异常.
*
* <p>处理掉中断和连接失效异常并继续抛注册中心.</p>
*
* @param cause 待处理异常.
*/
public static void handleException(final Exception cause) {
if (null == cause) {
return;
}
if (isIgnoredException(cause) || null != cause.getCause() && isIgnoredException(cause.getCause())) {
log.debug("Elastic job: ignored exception for: {}", cause.getMessage());
} else if (cause instanceof InterruptedException) {
Thread.currentThread().interrupt();
} else {
throw new RegException(cause);
}
}
private static boolean isIgnoredException(final Throwable cause) {
return cause instanceof ConnectionLossException || cause instanceof NoNodeException || cause instanceof NodeExistsException;
}
}
- 注册中心所有方法出现异常都会调用该类的handleException方法
- 部分异常会被无视,仅打印异常。例如连接丢失异常,节点不存在异常等,这种异常将被忽略,其他非中断异常InterruptedException将会被抛出给调用端
缓存
通过 Curator 的TreeCache 实现缓存指定目录的数据,内部有zk的watcher监听该目录的变更事件,该目录及该目录下任何节点的变更都会实时更新到缓存,不熟悉Curator 缓存机制的同学可以自行搜索了解
添加目录缓存数据
@Override
public void addCacheData(final String cachePath) {
// 缓存指定路径下的数据
TreeCache cache = new TreeCache(client, cachePath);
try {
cache.start();
//CHECKSTYLE:OFF
} catch (final Exception ex) {
//CHECKSTYLE:ON
RegExceptionHandler.handleException(ex);
}
// 将当前作业的目录缓存数据加到注册中心caches
caches.put(cachePath + "/", cache);
}
- 在作业启动初始化的时候进行了调用
com.dangdang.ddframe.job.lite.internal.schedule.JobRegistry#registerJob
public void registerJob(final String jobName, final JobScheduleController jobScheduleController, final CoordinatorRegistryCenter regCenter) {
schedulerMap.put(jobName, jobScheduleController);
regCenterMap.put(jobName, regCenter);
regCenter.addCacheData("/" + jobName);
}
订阅缓存目录变更事件
可以订阅TreeCache的缓存目录,具体就是通过增加监听器来监听缓存目录的状态变更事件,当收到该缓存目录下任何节点的变更事件后将会回调监听器的childEvent方法。
不熟悉Curator 缓存机制的同学可以自行搜索了解
后面文章讲的注册中心监听器,都会订阅缓存目录的事件实现其功能逻辑。
com.dangdang.ddframe.job.lite.internal.storage.JobNodeStorage#addDataListener
/**
* 注册数据监听器.
*
* @param listener 数据监听器
*/
public void addDataListener(final TreeCacheListener listener) {
TreeCache cache = (TreeCache) regCenter.getRawCache("/" + jobName);
cache.getListenable().addListener(listener);
}
订阅发生的时机在作业启动初始时的如下类中,作业启动初始化后面讲解
订阅缓存目录变更事件类
释放指定目录下缓存数据
//释放指定路径下的缓存数据
@Override
public void evictCacheData(final String cachePath) {
TreeCache cache = caches.remove(cachePath + "/");
if (null != cache) {
cache.close();
}
}
获取指定路径下的缓存数据
@Override
//获取指定路径的缓存数据
public Object getRawCache(final String cachePath) {
return caches.get(cachePath + "/");
}
获取数据
优先从缓存获取数据
@Override
public String get(final String key) {
//先优先从本地缓存获取数据
TreeCache cache = findTreeCache(key);
if (null == cache) {
return getDirectly(key);
}
//根据key从缓存获取数据
ChildData resultInCache = cache.getCurrentData(key);
if (null != resultInCache) {
return null == resultInCache.getData() ? null : new String(resultInCache.getData(), Charsets.UTF_8);
}
// 本地缓存获取不到,直接从注册中心获取
return getDirectly(key);
}
// 从缓存获取数据
private TreeCache findTreeCache(final String key) {
for (Entry<String, TreeCache> entry : caches.entrySet()) {
if (key.startsWith(entry.getKey())) {
return entry.getValue();
}
}
return null;
}
- 优先从本地缓存获取数据
- 如果本地缓存没有,从注册中心获取数据
从注册中心获取数据
//从注册中心获取数据
@Override
public String getDirectly(final String key) {
try {
return new String(client.getData().forPath(key), Charsets.UTF_8);
//CHECKSTYLE:OFF
} catch (final Exception ex) {
//CHECKSTYLE:ON
RegExceptionHandler.handleException(ex);
return null;
}
}
子节点相关
获取子节点方法
@Override
public List<String> getChildrenKeys(final String key) {
try {
//获取节点的子节点
List<String> result = client.getChildren().forPath(key);
//节点倒序
Collections.sort(result, new Comparator<String>() {
@Override
public int compare(final String o1, final String o2) {
return o2.compareTo(o1);
}
});
return result;
//CHECKSTYLE:OFF
} catch (final Exception ex) {
//CHECKSTYLE:ON
RegExceptionHandler.handleException(ex);
return Collections.emptyList();
}
}
- 对获取的子节点按节点名称进行了排序,倒叙
获取子节点数量方法
@Override
public int getNumChildren(final String key) {
try {
// 获取节点的子节点数量
Stat stat = client.checkExists().forPath(key);
if (null != stat) {
return stat.getNumChildren();
}
//CHECKSTYLE:OFF
} catch (final Exception ex) {
//CHECKSTYLE:ON
RegExceptionHandler.handleException(ex);
}
return 0;
}
节点操作相关
判断节点是否存在
@Override
public boolean isExisted(final String key) {
try {
//判断节点是否存在
return null != client.checkExists().forPath(key);
//CHECKSTYLE:OFF
} catch (final Exception ex) {
//CHECKSTYLE:ON
RegExceptionHandler.handleException(ex);
return false;
}
}
创建持久化节点
@Override
public void persist(final String key, final String value) {
try {
//不存在则创建持久化节点
if (!isExisted(key)) {
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(key, value.getBytes(Charsets.UTF_8));
} else {
//存在则更新
update(key, value);
}
//CHECKSTYLE:OFF
} catch (final Exception ex) {
//CHECKSTYLE:ON
RegExceptionHandler.handleException(ex);
}
}
更新节点方法
@Override
public void update(final String key, final String value) {
try {
// 更新数据
client.inTransaction().check().forPath(key).and().setData().forPath(key, value.getBytes(Charsets.UTF_8)).and().commit();
//CHECKSTYLE:OFF
} catch (final Exception ex) {
//CHECKSTYLE:ON
RegExceptionHandler.handleException(ex);
}
}
- 使用事务校验键节点存在才进行更新
创建临时节点方法
@Override
public void persistEphemeral(final String key, final String value) {
try {
//存在则删除节点
if (isExisted(key)) {
client.delete().deletingChildrenIfNeeded().forPath(key);
}
//创建临时节点
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(key, value.getBytes(Charsets.UTF_8));
//CHECKSTYLE:OFF
} catch (final Exception ex) {
//CHECKSTYLE:ON
RegExceptionHandler.handleException(ex);
}
}
创建持久化顺序节点
该方法在当前版本未使用,可以不用关注
@Override
public String persistSequential(final String key, final String value) {
try {
//创建持久化的顺序节点
return client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(key, value.getBytes(Charsets.UTF_8));
//CHECKSTYLE:OFF
} catch (final Exception ex) {
//CHECKSTYLE:ON
RegExceptionHandler.handleException(ex);
}
return null;
}
创建临时顺序节点方法
@Override
public void persistEphemeralSequential(final String key) {
try {
//创建临时顺序节点
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(key);
//CHECKSTYLE:OFF
} catch (final Exception ex) {
//CHECKSTYLE:ON
RegExceptionHandler.handleException(ex);
}
}
移除节点方法
@Override
public void remove(final String key) {
try {
//移除节点
client.delete().deletingChildrenIfNeeded().forPath(key);
//CHECKSTYLE:OFF
} catch (final Exception ex) {
//CHECKSTYLE:ON
RegExceptionHandler.handleException(ex);
}
}
获取注册中心当前时间
@Override
public long getRegistryCenterTime(final String key) {
long result = 0L;
try {
persist(key, "");
//获取指定节点的注册中心时间
result = client.checkExists().forPath(key).getMtime();
//CHECKSTYLE:OFF
} catch (final Exception ex) {
//CHECKSTYLE:ON
RegExceptionHandler.handleException(ex);
}
Preconditions.checkState(0L != result, "Cannot get registry center time.");
return result;
}
- 通过更新节点,获取节点最后的更新时间作为注册中心的当前时间
获取注册中心原生客户端
@Override
public Object getRawClient() {
//获取curator的client
return client;
}
关闭注册中心连接
@Override
public void close() {
// 先关闭缓存
for (Entry<String, TreeCache> each : caches.entrySet()) {
each.getValue().close();
}
waitForCacheClose();
//再关闭连接
CloseableUtils.closeQuietly(client);
}
/* TODO 等待500ms, cache先关闭再关闭client, 否则会抛异常
* 因为异步处理, 可能会导致client先关闭而cache还未关闭结束.
* 等待Curator新版本解决这个bug.
* BUG地址:https://issues.apache.org/jira/browse/CURATOR-157
*/
private void waitForCacheClose() {
try {
Thread.sleep(500L);
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
- 先关闭缓存,清空缓存相关数据
- 再关闭client
网友评论