在分布式的场景下由于网络、时钟等原因,可能导致Zookeeper的数据与真实运行的作业产生不一致,这种不一致通过正向的校验无法完全避免。需要另外启动一个线程定时校验注册中心数据与真实作业状态的一致性,即维持Elastic-Job的最终一致性。
Elastic-Job在提供reconcileIntervalMinutes设置修复状态服务执行间隔分钟数,用于修复作业服务器不一致状态,默认每10分钟检测并修复一次。
调解不一致状态服务在elastic job中由io.elasticjob.lite.internal.reconcile.ReconcileService
提供。该类继承com.google.common.util.concurrent.AbstractScheduledService
实现了定时调度服务。
服务调度频率为每隔1分钟调度一次:
Scheduler.newFixedDelaySchedule(0, 1, TimeUnit.MINUTES);
不一致检查时间由任务配置io.elasticjob.lite.config.LiteJobConfiguration
字段reconcileIntervalMinutes
控制,默认值为10分钟。当最后一次检查时间与当前调度时间大于等于配置时间时,设置调度需要重新分片。
LiteJobConfiguration config = configService.load(true);
int reconcileIntervalMinutes = null == config ? -1 : config.getReconcileIntervalMinutes();
if (reconcileIntervalMinutes > 0 && (System.currentTimeMillis() - lastReconcileTime >= reconcileIntervalMinutes * 60 * 1000)) {
lastReconcileTime = System.currentTimeMillis();
if (leaderService.isLeaderUntilBlock() && !shardingService.isNeedSharding() && shardingService.hasShardingInfoInOfflineServers()) {
log.warn("Elastic Job: job status node has inconsistent value,start reconciling...");
shardingService.setReshardingFlag();
}
}
设置需要重新分片:创建zk节点
jobName/leader/sharding/necessary
.
重新分片标识设置之后,下一次任务调度执行时,主节点会重新对任务分片,从而到达最终一致性的目的。
AbstractScheduledService
类用于在运行时处理一些周期性的任务。子类可以实现runOneIteration()
方法定义一个周期执行的任务,以及相应的startUp()
和shutDown()
方法。为了能够描述执行周期,你需要实现scheduler()
方法。通常情况下,你可以使用AbstractScheduledService.Scheduler
类提供的两种调度器:newFixedRateSchedule(initialDelay, delay, TimeUnit)
和newFixedDelaySchedule(initialDelay, delay, TimeUnit)
,类似于JDK并发包中ScheduledExecutorService
类提供的两种调度方式。如要自定义schedules
则可以使用CustomScheduler
类来辅助实现。
网友评论