elastic job通过下面代码启动作业:
new JobScheduler(createRegistryCenter(), createJobConfiguration()).init();
启动作业过程中,首先会更新作业配置:
LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig);
该方法会做两件事情:
1、当前实例作业配置持久化到zk;
2、从zk重新加载作业配置覆盖当前实例作业配置。
持久化配置
configService.persist(liteJobConfig);
持久化配置时首先会校验作业配置信息:判断zk是否包含当前作业配置,如果存在且实现类不一致时,抛出异常:
throw new JobConfigurationException("Job conflict with register center. The job '%s' in register center's class is '%s', your job class is '%s'", liteJobConfig.getJobName(), liteJobConfigFromZk.get().getTypeConfig().getJobClass(), liteJobConfig.getTypeConfig().getJobClass());
校验通过后,以下两种情况会已当前实例作业配置覆盖zk作业配置:
zk不存在该作业配置;
zk存在,但作业配置包含需要覆盖zk作业配置属性(overwrite = true)
if (!jobNodeStorage.isJobNodeExisted(ConfigurationNode.ROOT) || liteJobConfig.isOverwrite()) {
// 覆盖zk作业配置
}
持久化作业配置即把LiteJobConfiguration
对象转换为json格式,然后创建zk节点jobName/config
,值为转换后的json数据。
重新加载配置
持久化作业配置后,当前实例会重新从zk服务器加载最新的作业配置信息,且不使用缓存。
configService.load(false);
该方法会直接从zk节点jobName/config
获取数据(json格式),而后把原始数据转换为LiteJobConfiguration
对象,并覆盖本地配置。
网友评论