1. 介绍
1.1 简介
1.2 测试代码
public static void main(String[] args) {
// zokeeper注册中心
CoordinatorRegistryCenter registerCenter = createRegistryCenter();
// lite job 配置
LiteJobConfiguration liteJobCfg = createJobConfiguration();
new JobScheduler(registerCenter, liteJobCfg).init();
}
private static CoordinatorRegistryCenter createRegistryCenter() {
CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(
// zookeeper地址和任务节点
new ZookeeperConfiguration("localhost:2181", "elastic-job-demo"));
// 初始化zookeeper注册中心
regCenter.init();
return regCenter;
}
private static LiteJobConfiguration createJobConfiguration() {
// 任务作业配置
JobCoreConfiguration simpleCoreConfig =
JobCoreConfiguration.newBuilder("demoSimpleJob", "*/5 * * * * ?", 3).build();
// 任务
SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig,
DemoJob.class.getCanonicalName());
// 定义Lite作业根配置
LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).build();
return simpleJobRootConfig;
}
1.3 流程简介
1.3.1 初始化启动
启动quartz.png1.3.2 执行
执行.png1.3.3 zk节点
节点大体分配.png- 蓝色代表主节点
2. 初始化
2.1 初始化zk
public final class ZookeeperConfiguration {
/**
* 连接Zookeeper服务器的列表.
* 包括IP地址和端口号.
* 多个地址用逗号分隔.
* 如: host1:2181,host2:2181
*/
private final String serverLists;
/**
* 命名空间.
*/
private final String namespace;
/**
* 等待重试的间隔时间的初始值.
* 单位毫秒.
*/
private int baseSleepTimeMilliseconds = 1000;
/**
* 等待重试的间隔时间的最大值.
* 单位毫秒.
*/
private int maxSleepTimeMilliseconds = 3000;
/**
* 最大重试次数.
*/
private int maxRetries = 3;
/**
* 会话超时时间.
* 单位毫秒.
*/
private int sessionTimeoutMilliseconds;
/**
* 连接超时时间.
* 单位毫秒.
*/
private int connectionTimeoutMilliseconds;
/**
* 连接Zookeeper的权限令牌.
* 缺省为不需要权限验证.
*/
private String digest;
}
public void init() {
log.debug("Elastic job: zookeeper registry center init, server lists is: {}.", zkConfig.getServerLists());
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
.connectString(zkConfig.getServerLists())
.retryPolicy(new ExponentialBackoffRetry(zkConfig.getBaseSleepTimeMilliseconds(), zkConfig.getMaxRetries(), zkConfig.getMaxSleepTimeMilliseconds()))
.namespace(zkConfig.getNamespace());
if (0 != zkConfig.getSessionTimeoutMilliseconds()) {
builder.sessionTimeoutMs(zkConfig.getSessionTimeoutMilliseconds());
}
if (0 != zkConfig.getConnectionTimeoutMilliseconds()) {
builder.connectionTimeoutMs(zkConfig.getConnectionTimeoutMilliseconds());
}
if (!Strings.isNullOrEmpty(zkConfig.getDigest())) {
builder.authorization("digest", zkConfig.getDigest().getBytes(Charsets.UTF_8))
.aclProvider(new ACLProvider() {
@Override
public List<ACL> getDefaultAcl() {
return ZooDefs.Ids.CREATOR_ALL_ACL;
}
@Override
public List<ACL> getAclForPath(final String path) {
return ZooDefs.Ids.CREATOR_ALL_ACL;
}
});
}
client = builder.build();
client.start();
try {
if (!client.blockUntilConnected(zkConfig.getMaxSleepTimeMilliseconds() * zkConfig.getMaxRetries(), TimeUnit.MILLISECONDS)) {
client.close();
throw new KeeperException.OperationTimeoutException();
}
//CHECKSTYLE:OFF
} catch (final Exception ex) {
//CHECKSTYLE:ON
RegExceptionHandler.handleException(ex);
}
}
2.2 初始化配置
2.2.1 JobCoreConfiguration
private final String jobName;
private final String cron;
private final int shardingTotalCount;
/**
* 设置分片序列号和个性化参数对照表.
*
* <p>
* 分片序列号和参数用等号分隔, 多个键值对用逗号分隔. 类似map.
* 分片序列号从0开始, 不可大于或等于作业分片总数.
* 如:
* 0=a,1=b,2=c
* </p>
*
* @param shardingItemParameters 分片序列号和个性化参数对照表
*
* @return 作业配置构建器
*/
private final String shardingItemParameters;
private final String jobParameter;
// 是否开启失效转移
private final boolean failover;
// 是否执行错过作业
private final boolean misfire;
private final String description;
private final JobProperties jobProperties;
2.2.2 JobConfiguration
JobConfiguration.pngSimpleJobConfiguration
// 通用作业配置
private final JobCoreConfiguration coreConfig;
// 作业类型
private final JobType jobType = JobType.SIMPLE;
// 作业类
private final String jobClass;
2.2.3 LiteJobConfiguration
// 作业配置
private final JobTypeConfiguration typeConfig;
// 是否监控作业状态
private final boolean monitorExecution;
// 允许本机与注册中心的时间误差秒数
private final int maxTimeDiffSeconds;
// 监控通信端口
private final int monitorPort;
// 分片策略类
private final String jobShardingStrategyClass;
// 设置修复作业服务器不一致状态服务执行间隔分钟数
private final int reconcileIntervalMinutes;
// 设置作业是否启动时禁止
private final boolean disabled;
// 本地配置是否覆盖注册中心配置
private final boolean overwrite;
3. 启动
3.1 通用service
// 作业配置服务, 就是LiteJobConfiguration
new ConfigurationService(regCenter, jobName);
// 主节点服务
new LeaderService(regCenter, jobName);
// 可用作业服务器服务
new ServerService(regCenter, jobName);
// 运行作业节点服务
new InstanceService(regCenter, jobName);
// 分片服务
new ShardingService(regCenter, jobName);
// 作业运行服务
new ExecutionService(regCenter, jobName);
// 作业监控服务
new MonitorService(regCenter, jobName);
// 调解分布式作业不一致状态服务
new ReconcileService(regCenter, jobName);
// 作业失效转移服务
new FailoverService(regCenter, jobName)
3.2 JobScheduler
/**
* 作业调度
*
* @param regCenter
* @param liteJobConfig
* @param jobEventBus
* @param elasticJobListeners
*/
private JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final JobEventBus jobEventBus, final ElasticJobListener... elasticJobListeners) {
// 全局运行信息
// 作业名称 = 作业实例主键(IP + @-@ + xxxxx(JVM-PID)
JobRegistry.getInstance().addJobInstance(liteJobConfig.getJobName(), new JobInstance());
this.liteJobConfig = liteJobConfig;
this.regCenter = regCenter;
List<ElasticJobListener> elasticJobListenerList = Arrays.asList(elasticJobListeners);
// 向AbstractDistributeOnceElasticJobListener类型listener, 添加注册中心
setGuaranteeServiceForElasticJobListeners(regCenter, elasticJobListenerList);
// 调度门面类
schedulerFacade = new SchedulerFacade(regCenter, liteJobConfig.getJobName(), elasticJobListenerList);
// 作业服务门面类
jobFacade = new LiteJobFacade(regCenter, liteJobConfig.getJobName(), Arrays.asList(elasticJobListeners), jobEventBus);
}
3.2.1 SchedulerFacade
public SchedulerFacade(final CoordinatorRegistryCenter regCenter, final String jobName, final List<ElasticJobListener> elasticJobListeners) {
this.jobName = jobName;
// 作业配置服务, 就是LiteJobConfiguration
configService = new ConfigurationService(regCenter, jobName);
// 主节点服务
leaderService = new LeaderService(regCenter, jobName);
// 可用作业服务器服务
serverService = new ServerService(regCenter, jobName);
// 运行作业节点服务
instanceService = new InstanceService(regCenter, jobName);
// 分片服务
shardingService = new ShardingService(regCenter, jobName);
// 作业运行服务
executionService = new ExecutionService(regCenter, jobName);
// 作业监控服务
monitorService = new MonitorService(regCenter, jobName);
// 调解分布式作业不一致状态服务
reconcileService = new ReconcileService(regCenter, jobName);
// 节点监听
// 监听的${namespace}/${job-name}节点
listenerManager = new ListenerManager(regCenter, jobName, elasticJobListeners);
}
- ListenerManager 监听${namespace}/${job-name}节点, 重要!!!
3.2.2 LiteJobFacade
public LiteJobFacade(final CoordinatorRegistryCenter regCenter, final String jobName, final List<ElasticJobListener> elasticJobListeners, final JobEventBus jobEventBus) {
configService = new ConfigurationService(regCenter, jobName);
shardingService = new ShardingService(regCenter, jobName);
executionContextService = new ExecutionContextService(regCenter, jobName);
executionService = new ExecutionService(regCenter, jobName);
failoverService = new FailoverService(regCenter, jobName);
this.elasticJobListeners = elasticJobListeners;
this.jobEventBus = jobEventBus;
}
3.3 init
public void init() {
// 更新并获得 ${job-name}/config 的节点配置数据(LiteJobConfiguration)
LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig);
// 全局运行信息
// job-name : 分片数
JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(), liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount());
// 创建JobScheduleController 封装了quartz的信息
JobScheduleController jobScheduleController = new JobScheduleController(
// 创建org.quartz.Scheduler信息
createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName());
// 全局运行信息
// job-name : JobScheduleController : registerCenter
JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter);
// 启动前操作
schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());
// 启动本地quartz
jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron());
}
- schedulerFacade.updateJobConfiguration(liteJobConfig); 添加或者更新${namespace}/${job-name}/config节点信息
3.3.1 createScheduler
private Scheduler createScheduler() {
Scheduler result;
try {
StdSchedulerFactory factory = new StdSchedulerFactory();
factory.initialize(getBaseQuartzProperties());
result = factory.getScheduler();
result.getListenerManager().addTriggerListener(schedulerFacade.newJobTriggerListener());
} catch (final SchedulerException ex) {
throw new JobSystemException(ex);
}
return result;
}
private Properties getBaseQuartzProperties() {
Properties result = new Properties();
result.put("org.quartz.threadPool.class", org.quartz.simpl.SimpleThreadPool.class.getName());
result.put("org.quartz.threadPool.threadCount", "1");
result.put("org.quartz.scheduler.instanceName", liteJobConfig.getJobName());
result.put("org.quartz.jobStore.misfireThreshold", "1");
result.put("org.quartz.plugin.shutdownhook.class", JobShutdownHookPlugin.class.getName());
result.put("org.quartz.plugin.shutdownhook.cleanShutdown", Boolean.TRUE.toString());
return result;
}
3.3.2 createJobDetail
- 统一LiteJob执行
private JobDetail createJobDetail(final String jobClass) {
// 设置执行任务
JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(liteJobConfig.getJobName()).build();
result.getJobDataMap().put(JOB_FACADE_DATA_MAP_KEY, jobFacade);
Optional<ElasticJob> elasticJobInstance = createElasticJobInstance();
if (elasticJobInstance.isPresent()) {
result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, elasticJobInstance.get());
} else if (!jobClass.equals(ScriptJob.class.getCanonicalName())) {
try {
result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, Class.forName(jobClass).newInstance());
} catch (final ReflectiveOperationException ex) {
throw new JobConfigurationException("Elastic-Job: Job class '%s' can not initialize.", jobClass);
}
}
return result;
}
3.3.3 registerStartUpInfo !!!
public void registerStartUpInfo(final boolean enabled) {
// 启动zk节点监听器
listenerManager.startAllListeners();
// 选主: ${namespace}/${job-name}/leader/election
// 监听器发现节点变化: ${namespace}/${job-name}/leader/sharding/necessary
leaderService.electLeader();
// 添加作业服务器节点: ${namespace}/${job-name}/server节点
serverService.persistOnline(enabled);
// 运行实例: ${namespace}/${job-name}/instances
instanceService.persistOnline();
// 重新设置需要分片标记: ${namespace}/${job-name}/leader/sharding/necessary
shardingService.setReshardingFlag();
// 作业监控服务
monitorService.listen();
// 是否开启动态监听是否重新分片服务
if (!reconcileService.isRunning()) {
reconcileService.startAsync();
}
}
3.3.3.1 leaderService.electLeader()
主节点选举成功, 回调LeaderElectionExecutionCallback
/**
* 选举主节点.
*/
public void electLeader() {
log.debug("Elect a new leader now.");
jobNodeStorage.executeInLeader(LeaderNode.LATCH, new LeaderElectionExecutionCallback());
log.debug("Leader election completed.");
}
/**
* 在主节点执行操作.
*
* @param latchNode 分布式锁使用的作业节点名称
* @param callback 执行操作的回调
*/
public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) {
try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) {
latch.start();
latch.await();
callback.execute();
//CHECKSTYLE:OFF
} catch (final Exception ex) {
//CHECKSTYLE:ON
handleException(ex);
}
}
@RequiredArgsConstructor
class LeaderElectionExecutionCallback implements LeaderExecutionCallback {
@Override
public void execute() {
// 判断是否有${namespace}/${job-name}/leader/election/instance节点
if (!hasLeader()) {
jobNodeStorage.fillEphemeralJobNode(LeaderNode.INSTANCE, JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
}
}
}
/**
* 判断是否已经有主节点.
*
* @return 是否已经有主节点
*/
public boolean hasLeader() {
return jobNodeStorage.isJobNodeExisted(LeaderNode.INSTANCE);
}
- 选主相当于一个分布式的锁,保证一个节点成为主节点
3.3.3.2 reconcileService.startAsync()
动态监听是否重新分片服务
public final class ReconcileService extends AbstractScheduledService {
private long lastReconcileTime;
private final ConfigurationService configService;
private final ShardingService shardingService;
private final LeaderService leaderService;
public ReconcileService(final CoordinatorRegistryCenter regCenter, final String jobName) {
lastReconcileTime = System.currentTimeMillis();
configService = new ConfigurationService(regCenter, jobName);
shardingService = new ShardingService(regCenter, jobName);
leaderService = new LeaderService(regCenter, jobName);
}
@Override
protected void runOneIteration() throws Exception {
LiteJobConfiguration config = configService.load(true);
int reconcileIntervalMinutes = null == config ? -1 : config.getReconcileIntervalMinutes();
// 检测间隔时间
if (reconcileIntervalMinutes > 0 && (System.currentTimeMillis() - lastReconcileTime >= reconcileIntervalMinutes * 60 * 1000)) {
lastReconcileTime = System.currentTimeMillis();
//
if (leaderService.isLeaderUntilBlock() //主节点操作
&& !shardingService.isNeedSharding() // 当前不需要分片
&& shardingService.hasShardingInfoInOfflineServers()) { // 可作业实例和"分片节点作业实例"不一致
log.warn("Elastic Job: job status node has inconsistent value,start reconciling...");
shardingService.setReshardingFlag();
}
}
}
/**
* 一分钟执行一次
*/
@Override
protected Scheduler scheduler() {
return Scheduler.newFixedDelaySchedule(0, 1, TimeUnit.MINUTES);
}
}
3.4 执行LiteJob
quartz定时执行
Executor.pngpublic final class LiteJob implements Job {
@Setter
private ElasticJob elasticJob;
@Setter
private JobFacade jobFacade;
@Override
public void execute(final JobExecutionContext context) throws JobExecutionException {
JobExecutorFactory.getJobExecutor(elasticJob, jobFacade).execute();
}
}
public final void execute() {
try {
// 检查本机与注册中心的时间误差秒数是否在允许范围
// 误差范围外还继续执行
jobFacade.checkJobExecutionEnvironment();
} catch (final JobExecutionEnvironmentException cause) {
jobExceptionHandler.handleException(jobName, cause);
}
// 获得分片
ShardingContexts shardingContexts = jobFacade.getShardingContexts();
// JobEvent
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, String.format("Job '%s' execute begin.", jobName));
}
// 如果当前分片项仍在运行
// 设置任务被错过执行的标记: ${namespace}/${job-name}/sharding/${item}/misfire
// 并停止执行
if (jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())) {
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format(
"Previous job '%s' - shardingItems '%s' is still running, misfired job will start after previous job completed.", jobName,
shardingContexts.getShardingItemParameters().keySet()));
}
return;
}
// before ElasticJobListener
try {
jobFacade.beforeJobExecuted(shardingContexts);
//CHECKSTYLE:OFF
} catch (final Throwable cause) {
//CHECKSTYLE:ON
jobExceptionHandler.handleException(jobName, cause);
}
execute(shardingContexts, JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER);
// 不需要重新分片
// 存在错过执行任务
// 并且config可以执行错过任务
// while循环, 防止错误再次启动而未执行的任务
while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {
// 清除错过任务标记
jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet());
// 重新执行
execute(shardingContexts, JobExecutionEvent.ExecutionSource.MISFIRE);
}
// 开启失败转移
// 当前任务执行结束
// ${namespace}/${job-name}/leader/FAILOVER/latch下选取主节点
// 主节点-锁, 获得${namespace}/${job-name}/leader/FAILOVER/${items} 节点下一个失败转移分片
// FailoverLeaderExecutionCallback设置${namespace}/${job-name}/sharding/${item}/FAILOVER = 当前作业实例
// 删除${namespace}/${job-name}/leader/FAILOVER/${items}
// 重新调度任务
jobFacade.failoverIfNecessary();
try {
// after ElasticJobListener
jobFacade.afterJobExecuted(shardingContexts);
//CHECKSTYLE:OFF
} catch (final Throwable cause) {
//CHECKSTYLE:ON
jobExceptionHandler.handleException(jobName, cause);
}
}
3.4.1 分片
@Override
public ShardingContexts getShardingContexts() {
boolean isFailover = configService.load(true).isFailover();
// 开启失败转移, 获得失败转移的分片
if (isFailover) {
// 获得${namespace}/${job-name}/sharding/${item}/FAILOVER = 当前运行实例, 的分片
List<Integer> failoverShardingItems = failoverService.getLocalFailoverItems();
if (!failoverShardingItems.isEmpty()) {
return executionContextService.getJobShardingContext(failoverShardingItems);
}
}
// 分片
shardingService.shardingIfNecessary();
// 获得${namespace}/${job-name}/sharding/${item}/instance = 当前运行实例, 的分片
List<Integer> shardingItems = shardingService.getLocalShardingItems();
if (isFailover) {
// 移除失效的分片, 交给获得失败转移的作业实例执行
shardingItems.removeAll(failoverService.getLocalTakeOffItems());
}
// 移除被禁用的分片
shardingItems.removeAll(executionService.getDisabledItems(shardingItems));
return executionContextService.getJobShardingContext(shardingItems);
}
public void shardingIfNecessary() {
// 获得可作业的运行实例: ${namespace}/${job-name}/instances下节点
List<JobInstance> availableJobInstances = instanceService.getAvailableJobInstances();
// 不需要分片 || 没有运行实例
if (!isNeedSharding() || availableJobInstances.isEmpty()) {
return;
}
// 不是主节点
// 等待主节点, 分片结束
// 就是不存在 ${namespace}/${job-name}/leader/sharding/necessary 和 processing节点
if (!leaderService.isLeaderUntilBlock()) {
blockUntilShardingCompleted();
return;
}
/**--------------------**/
/** 主节点分片 **/
/**--------------------**/
// 等待正在运行的分片实例结束
// 判断running节点
waitingOtherShardingItemCompleted();
LiteJobConfiguration liteJobConfig = configService.load(false);
int shardingTotalCount = liteJobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount();
log.debug("Job '{}' sharding begin.", jobName);
// 正在分片节点
jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, "");
// 创建或者刷新分片节点${namespace}/${job-name}/sharding/${item}
resetShardingInfo(shardingTotalCount);
// 获得分片策略
JobShardingStrategy jobShardingStrategy = JobShardingStrategyFactory.getStrategy(liteJobConfig.getJobShardingStrategyClass());
// JobShardingStrategy分片
// PersistShardingInfoTransactionExecutionCallback
// 创建对应的创建或者刷新分片节点${namespace}/${job-name}/sharding/${item}/instance = 当前运行实例
// 移除ShardingNode.NECESSARY 和 ShardingNode.PROCESSING
// 释放掉等待分片的从节点
jobNodeStorage.executeInTransaction(new PersistShardingInfoTransactionExecutionCallback(jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount)));
log.debug("Job '{}' sharding complete.", jobName);
}
3.4.2 运行前和后zk节点处理
- 设置 ${namespace}/${job-name}/sharding/${item}/running
- 执行结束清除
删除${namespace}/${job-name}/sharding/${item}/running
删除${namespace}/${job-name}/sharding/${item}/failover
private void execute(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) {
// 空分片不处理
if (shardingContexts.getShardingItemParameters().isEmpty()) {
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format("Sharding item for job '%s' is empty.", jobName));
}
return;
}
// 作业运行标识节点
// ${namespace}/${job-name}/sharding/${item}/running
jobFacade.registerJobBegin(shardingContexts);
String taskId = shardingContexts.getTaskId();
// 事件记录
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobStatusTraceEvent(taskId, State.TASK_RUNNING, "");
}
try {
process(shardingContexts, executionSource);
} finally {
// TODO 考虑增加作业失败的状态,并且考虑如何处理作业失败的整体回路
// 作业结束
// 删除${namespace}/${job-name}/sharding/${item}/running
// 删除${namespace}/${job-name}/sharding/${item}/failover
jobFacade.registerJobCompleted(shardingContexts);
if (itemErrorMessages.isEmpty()) {
// event 结束处理
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobStatusTraceEvent(taskId, State.TASK_FINISHED, "");
}
} else {
if (shardingContexts.isAllowSendJobEvent()) {
// event 处理错误信息
jobFacade.postJobStatusTraceEvent(taskId, State.TASK_ERROR, itemErrorMessages.toString());
}
}
}
}
3.4.3 执行job
private void process(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) {
Collection<Integer> items = shardingContexts.getShardingItemParameters().keySet();
// 一个分片直接执行了
if (1 == items.size()) {
int item = shardingContexts.getShardingItemParameters().keySet().iterator().next();
JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(shardingContexts.getTaskId(), jobName, executionSource, item);
process(shardingContexts, item, jobExecutionEvent);
return;
}
// 闭锁, 等待所有分片任务执行结束, 返回
final CountDownLatch latch = new CountDownLatch(items.size());
for (final int each : items) {
final JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(shardingContexts.getTaskId(), jobName, executionSource, each);
if (executorService.isShutdown()) {
return;
}
executorService.submit(new Runnable() {
@Override
public void run() {
try {
process(shardingContexts, each, jobExecutionEvent);
} finally {
latch.countDown();
}
}
});
}
try {
latch.await();
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
private void process(final ShardingContexts shardingContexts, final int item, final JobExecutionEvent startEvent) {
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobExecutionEvent(startEvent);
}
log.trace("Job '{}' executing, item is: '{}'.", jobName, item);
JobExecutionEvent completeEvent;
try {
// 执行job
process(new ShardingContext(shardingContexts, item));
completeEvent = startEvent.executionSuccess();
log.trace("Job '{}' executed, item is: '{}'.", jobName, item);
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobExecutionEvent(completeEvent);
}
// CHECKSTYLE:OFF
} catch (final Throwable cause) {
// 异常处理
// CHECKSTYLE:ON
completeEvent = startEvent.executionFailure(cause);
jobFacade.postJobExecutionEvent(completeEvent);
itemErrorMessages.put(item, ExceptionUtil.transform(cause));
jobExceptionHandler.handleException(jobName, cause);
}
}
4. 其他
4.1 监听器
- ListenerManager管理
- 基类
public abstract class AbstractJobListener implements TreeCacheListener {
@Override
public final void childEvent(final CuratorFramework client, final TreeCacheEvent event) throws Exception {
ChildData childData = event.getData();
if (null == childData) {
return;
}
String path = childData.getPath();
if (path.isEmpty()) {
return;
}
dataChanged(path, event.getType(), null == childData.getData() ? "" : new String(childData.getData(), Charsets.UTF_8));
}
protected abstract void dataChanged(final String path, final Type eventType, final String data);
}
- 主节点相关: ElectionListenerManager.start()
/**
* 重新选主
*/
class LeaderElectionJobListener extends AbstractJobListener {
@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
// 没有关闭
// 不是主节点 && 当前作业节点存活
// ${namespace}/${job-name}/leander/election/instace 移除了 && 当前作业节点可用
if (!JobRegistry.getInstance().isShutdown(jobName) && (isActiveElection(path, data) || isPassiveElection(path, eventType))) {
// 重新选举主
leaderService.electLeader();
}
}
private boolean isActiveElection(final String path, final String data) {
return !leaderService.hasLeader() && isLocalServerEnabled(path, data);
}
private boolean isPassiveElection(final String path, final Type eventType) {
return isLeaderCrashed(path, eventType) && serverService.isAvailableServer(JobRegistry.getInstance().getJobInstance(jobName).getIp());
}
private boolean isLeaderCrashed(final String path, final Type eventType) {
return leaderNode.isLeaderInstancePath(path) && Type.NODE_REMOVED == eventType;
}
private boolean isLocalServerEnabled(final String path, final String data) {
return serverNode.isLocalServerPath(path) && !ServerStatus.DISABLED.name().equals(data);
}
}
//-------------------------------------------------------------
/**
* 当前job节点失效, 并且是主节点删除${namespace}/${job-name}/leander/election/instace 主节点
*/
class LeaderAbdicationJobListener extends AbstractJobListener {
@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
if (leaderService.isLeader() && isLocalServerDisabled(path, data)) {
leaderService.removeLeader();
}
}
private boolean isLocalServerDisabled(final String path, final String data) {
return serverNode.isLocalServerPath(path) && ServerStatus.DISABLED.name().equals(data);
}
}
- 分片相关: ShardingListenerManager.start()
/**
* 重新分片标识
*/
class ShardingTotalCountChangedJobListener extends AbstractJobListener {
@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
// config节点变化
// 总分片数变化
if (configNode.isConfigPath(path) && 0 != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) {
int newShardingTotalCount = LiteJobConfigurationGsonFactory.fromJson(data).getTypeConfig().getCoreConfig().getShardingTotalCount();
if (newShardingTotalCount != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) {
// 重新分片
shardingService.setReshardingFlag();
JobRegistry.getInstance().setCurrentShardingTotalCount(jobName, newShardingTotalCount);
}
}
}
}
//-------------------------------------------------------------
class ListenServersChangedJobListener extends AbstractJobListener {
@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
// 作业正在运行
// 作业节点server变化 || 作业实例insatnces变化
if (!JobRegistry.getInstance().isShutdown(jobName) && (isInstanceChange(eventType, path) || isServerChange(path))) {
// 需要重新分片
shardingService.setReshardingFlag();
}
}
private boolean isInstanceChange(final Type eventType, final String path) {
return instanceNode.isInstancePath(path) && Type.NODE_UPDATED != eventType;
}
private boolean isServerChange(final String path) {
return serverNode.isServerPath(path);
}
}
- 失效转移: FailoverListenerManager.start()
class JobCrashedJobListener extends AbstractJobListener {
@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
// 开启失败转移
// 删除类型
// ${namespace}/${job-name}/instances 作业实例无效
if (isFailoverEnabled() && Type.NODE_REMOVED == eventType && instanceNode.isInstancePath(path)) {
// 获得无效作业实例id
String jobInstanceId = path.substring(instanceNode.getInstanceFullPath().length() + 1);
// 相同就算了
if (jobInstanceId.equals(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId())) {
return;
}
// 获得无效实例的需要执行的失效转移分片
List<Integer> failoverItems = failoverService.getFailoverItems(jobInstanceId);
if (!failoverItems.isEmpty()) {
for (int each : failoverItems) {
// 添加leader下
failoverService.setCrashedFailoverFlag(each);
// 设置当前实例执行失效转移分片
failoverService.failoverIfNecessary();
}
} else {
for (int each : shardingService.getShardingItems(jobInstanceId)) {
failoverService.setCrashedFailoverFlag(each);
failoverService.failoverIfNecessary();
}
}
}
}
}
class FailoverSettingsChangedJobListener extends AbstractJobListener {
@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
// 不需要进行失效转移了
// 删除${namespace}/${job-name}/sharding/${item}/failover节点
if (configNode.isConfigPath(path) && Type.NODE_UPDATED == eventType && !LiteJobConfigurationGsonFactory.fromJson(data).isFailover()) {
failoverService.removeFailoverInfo();
}
}
}
- 其他不做介绍了
4.2 失效转移
运行中的某一作业节点崩溃,崩溃节点的分片会被分配到其他存活作业节点执行。执行崩溃节点分片,不会重新分片,只会在下次作业重新启动时重新分片。
- job-node1、job-node2、job-node3, 9个分片
- job-node1失效, job-node2、job-node3的JobCrashedJobListener 监听到
- job-node2或job-node3会优先添加job-node1的分片到${namespace}/${job-name}/leader/failover/${item},不存在在添加${namespace}/${job-name}/sharding下面的分片
- 因为只有在节点执行完${namespace}/${job-name}/sharding下面的分片, 才会添加${namespace}/${job-name}/leader/failover/${item}失效分片
public void failoverIfNecessary() {
if (needFailover()) {
jobNodeStorage.executeInLeader(FailoverNode.LATCH, new FailoverLeaderExecutionCallback());
}
}
private boolean needFailover() {
// 失效后failoverService.setCrashedFailoverFlag(each)添加了失效分片
return jobNodeStorage.isJobNodeExisted(FailoverNode.ITEMS_ROOT) && !jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).isEmpty()
// 当前作业运行完${namespace}/${job-name}/sharding下面的分片
&& !JobRegistry.getInstance().isJobRunning(jobName);
}
- leader(相当于分布式锁), 领取一个失效分片执行
- FailoverLeaderExecutionCallback.execute
int crashedItem = Integer.parseInt(jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).get(0));
- job-node2和job-node3分别领取一个失败转移的分片, 还会剩下一个
- 等待下次重新启动任务, 3.4获得剩余分片执行
jobFacade.failoverIfNecessary();
网友评论