美文网首页技术干货微服务架构和实践
elasticJob 源码解析之自诊断恢复

elasticJob 源码解析之自诊断恢复

作者: 一滴水的坚持 | 来源:发表于2017-12-04 22:05 被阅读0次

什么叫做自诊断恢复??
在分布式的场景下由于网络、时钟等原因,可能导致 Zookeeper 的数据与真实运行的作业产生不一致,这种不一致通过正向的校验无法完全避免。需要另外启动一个线程定时校验注册中心数据与真实作业状态的一致性,即维持 Elastic-Job 的最终一致性。

public final class ReconcileService extends AbstractScheduledService

AbstractScheduledService是什么??这里是guava实现的接口。看该接口的描述:
​ AbstractScheduledService类用于在运行时处理一些周期性的任务。子类可以实现 runOneIteration()方法定义一个周期执行的任务,以及相应的startUp()和shutDown()方法。为了能够描述执行周期,你需要实现scheduler()方法。通常情况下,你可以使用AbstractScheduledService.Scheduler类提供的两种调度器:newFixedRateSchedule(initialDelay, delay, TimeUnit) 和newFixedDelaySchedule(initialDelay, delay, TimeUnit)。

懂了,就是一个周期性任务。接着往下看。

@Override
protected void runOneIteration() throws Exception {
  LiteJobConfiguration config = configService.load(true);
  int reconcileIntervalMinutes = null == config ? -1 : config.getReconcileIntervalMinutes();
  if (reconcileIntervalMinutes > 0 && (System.currentTimeMillis() - lastReconcileTime >= reconcileIntervalMinutes * 60 * 1000)) {
    lastReconcileTime = System.currentTimeMillis();
    if (leaderService.isLeaderUntilBlock() && !shardingService.isNeedSharding() && shardingService.hasShardingInfoInOfflineServers()) {
      log.warn("Elastic Job: job status node has inconsistent value,start reconciling...");
      shardingService.setReshardingFlag();
    }
  }
}

@Override
protected Scheduler scheduler() {
  return Scheduler.newFixedDelaySchedule(0, 1, TimeUnit.MINUTES);
}

在runOneIteration中获取配置项reconcile-interval-minutes,默认十分钟,当有主节点,且不需要分片,且分片项中有下线的机器的时候,需要重设分片标记,在下次执行的时候重新分片。

/**
     * 查询是包含有分片节点的不在线服务器.
     * 
     * @return 是包含有分片节点的不在线服务器
     */
public boolean hasShardingInfoInOfflineServers() {
  List<String> onlineInstances = jobNodeStorage.getJobNodeChildrenKeys(InstanceNode.ROOT);
  int shardingTotalCount = configService.load(true).getTypeConfig().getCoreConfig().getShardingTotalCount();
  for (int i = 0; i < shardingTotalCount; i++) {
    //如果上线的机器列表中没有分片参数的节点,则返回true,需要重新分片。
    if (!onlineInstances.contains(jobNodeStorage.getJobNodeData(ShardingNode.getInstanceNode(i)))) {
      return true;
    }
  }
  return false;
}

fyi 这章比较水。。

相关文章

网友评论

    本文标题:elasticJob 源码解析之自诊断恢复

    本文链接:https://www.haomeiwen.com/subject/uqmfixtx.html