elastic job启动时,分片监听管理器io.elasticjob.lite.internal.sharding.ShardingListenerManager
会启动2个监听器,分别是任务分片总数改变监听器和服务改变监听器。
public void start() {
// 启动任务分片总数改变监听器
addDataListener(new ShardingTotalCountChangedJobListener());
// 启动服务改变监听器
addDataListener(new ListenServersChangedJobListener());
}
任务分片总数改变监听器
任务分片总数改变监听器监听zk上的任务配置节点jobName/config
,当该节点值改变时,实例判断当前配置的分片数是否和本地缓存分片数不一致,否则设置需要重新分片的标记,并修改本地缓存的分片数为最新值。
if (configNode.isConfigPath(path) && 0 != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) {
int newShardingTotalCount = LiteJobConfigurationGsonFactory.fromJson(data).getTypeConfig().getCoreConfig().getShardingTotalCount();
if (newShardingTotalCount != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) {
shardingService.setReshardingFlag();
JobRegistry.getInstance().setCurrentShardingTotalCount(jobName, newShardingTotalCount);
}
}
设置需要重新分片的标记即在zk创建节点
jobName/leader/sharding/necessary
.
服务改变监听器
服务改变监听器会监听zk实例节点jobName/instances
的变化和服务节点jobName/servers
的变化,当改变条件满足时,会设置需要重新分片的标记。
实例节点变化需要满足:非实例节点数据值改变:
instanceNode.isInstancePath(path) && Type.NODE_UPDATED != eventType;
服务节点只要发生改变,即满足条件:
serverNode.isServerPath(path);
网友评论