ElasticJob是一个弹性的分布式任务调度框架,这里的分布式就是采用分片的来进行任务调度和业务执行的解耦,分片信息就是中间进行解耦的。ElasticJob任务调度框架调度触发执行的是分片,然后业务可以在框架触发对应分片信息的时候,增加自己业务的处理。分片这个思想挺不错的,把任务调度框架和实际业务解耦的相当好。
ShardingListenerManager:这个是分片监听的开始
@Override
public void start() {
addDataListener(new ShardingTotalCountChangedJobListener());
addDataListener(new ListenServersChangedJobListener());
}
ShardingListenerManager分片管理监听器在ElasticJob启动的就是开启了监听,这里是开启了2个监听器ShardingTotalCountChangedJobListener(分片节点总数变化监听器)和ListenServersChangedJobListener(服务器改变监听器)。
ShardingTotalCountChangedJobListener:分片总数变化监听器
class ShardingTotalCountChangedJobListener extends AbstractJobListener {
@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
if (configNode.isConfigPath(path) && 0 != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) {
int newShardingTotalCount = YamlEngine.unmarshal(data, JobConfigurationPOJO.class).toJobConfiguration().getShardingTotalCount();
if (newShardingTotalCount != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) {
shardingService.setReshardingFlag();
JobRegistry.getInstance().setCurrentShardingTotalCount(jobName, newShardingTotalCount);
}
}
}
}
获取zookeeper下发的分片个数变化事件的通知,判断新分片数和原分片数是否相等,不相等的话设置需要重新分片的标记,创建/leader/sharding/neccessary持久节点。
ListenServersChangedJobListener:服务器状态变更监听器
class ListenServersChangedJobListener extends AbstractJobListener {
@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
if (!JobRegistry.getInstance().isShutdown(jobName) && (isInstanceChange(eventType, path) || isServerChange(path))) {
shardingService.setReshardingFlag();
}
}
private boolean isInstanceChange(final Type eventType, final String path) {
return instanceNode.isInstancePath(path) && Type.NODE_CHANGED != eventType;
}
private boolean isServerChange(final String path) {
return serverNode.isServerPath(path);
}
}
instance和servers节点下有子节点变化会被监听到,这个时候也会去设置下需要重新分片的标记/leader/sharding/neccessary节点。
上面是触发生成了需要重新分片的标记,具体分片的执行时在任务执行的过程中。在作业任务执行的时候需要获取分片信息,这个时候会完成重新分片的执行。
public final void execute() {
//...
//获取当前作业服务器的分片上下文信息
ShardingContexts shardingContexts = jobFacade.getShardingContexts();
//...
}
@Override
public ShardingContexts getShardingContexts() {
boolean isFailover = configService.load(true).isFailover();
if (isFailover) {
//TODO 获取故障转移到当前节点的分片信息
List<Integer> failoverShardingItems = failoverService.getLocalFailoverItems();
if (!failoverShardingItems.isEmpty()) {
//TODO 获取故障转移分片信息
return executionContextService.getJobShardingContext(failoverShardingItems);
}
}
shardingService.shardingIfNecessary();
List<Integer> shardingItems = shardingService.getLocalShardingItems();
if (isFailover) {
//TODO 删除本节点被故障转移的分片信息
shardingItems.removeAll(failoverService.getLocalTakeOffItems());
}
shardingItems.removeAll(executionService.getDisabledItems(shardingItems));
return executionContextService.getJobShardingContext(shardingItems);
}
- 获取当前节点服务器的分片上下文信息,如果开启了故障转移机制,会优先获取故障转移到当前作业服务器的分片任务。
shardingService.shardingIfNecessary();
public void shardingIfNecessary() {
List<JobInstance> availableJobInstances = instanceService.getAvailableJobInstances();//TODO 获取job存活运行实例
//不必要分片或作业运行实例为空,则不进行分片操作
if (!isNeedSharding() || availableJobInstances.isEmpty()) {
return;
}
//等待选举完成,非leader的话,等待完成分片
if (!leaderService.isLeaderUntilBlock()) {
blockUntilShardingCompleted();
return;
}
waitingOtherShardingItemCompleted();
JobConfiguration jobConfig = configService.load(false);
int shardingTotalCount = jobConfig.getShardingTotalCount();
log.debug("Job '{}' sharding begin.", jobName);
jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, "");//临时processing节点,表示当前正在进行分片
resetShardingInfo(shardingTotalCount);
JobShardingStrategy jobShardingStrategy = JobShardingStrategyFactory.getStrategy(jobConfig.getJobShardingStrategyType());
jobNodeStorage.executeInTransaction(new PersistShardingInfoTransactionExecutionCallback(jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount)));
log.debug("Job '{}' sharding complete.", jobName);
}
- 如果需要就执行重新分片,这里是去判断/leader/sharding/neccessary节点是否存在,如果存在就需要执行重新分片操作。重新分片操作是只能在主服务器才能执行的,因为这个分片信息是需要所有作业集群统一。所以这里需要等待集群节点leader选举完成,然后从节点不会进行分片操作,从节点是阻塞等待主节点分片完成才退出。
//等待当前任务所有分片执行完成才去执行分片操作,这里是防止分片任务被重复执行,一个幂等操作
waitingOtherShardingItemCompleted();
- 等待当前任务所有分片任务执行完成才去执行分片操作,这里是防止分片任务被其它作业服务器重复执行,一个幂等操作的处理。
jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, "");
- 创建一个临时节点/leader/sharding/processing,表示正在执行重新分片操作。
JobShardingStrategy jobShardingStrategy = JobShardingStrategyFactory.getStrategy(jobConfig.getJobShardingStrategyType());
- 获取重新分片策略,这里是可以在配置作业任务的时候进行配置jobShardingStrategyType参数。
jobNodeStorage.executeInTransaction(new PersistShardingInfoTransactionExecutionCallback(jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount)));
jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount))
- 首先是分片策略(默认是平均分配策略)根据当前的作业服务器实例和分片个数,来计算出每个作业实例获取到的对应的分片信息。然后是在一个zookeeper事务中去创建对应的节点信息。
@RequiredArgsConstructor
class PersistShardingInfoTransactionExecutionCallback implements TransactionExecutionCallback {
private final Map<JobInstance, List<Integer>> shardingResults;
@Override
public List<CuratorOp> createCuratorOperators(final TransactionOp transactionOp) throws Exception {
List<CuratorOp> result = new LinkedList<>();
for (Entry<JobInstance, List<Integer>> entry : shardingResults.entrySet()) {
for (int shardingItem : entry.getValue()) {
result.add(transactionOp.create().forPath(jobNodePath.getFullPath(ShardingNode.getInstanceNode(shardingItem)), entry.getKey().getJobInstanceId().getBytes()));
}
}
result.add(transactionOp.delete().forPath(jobNodePath.getFullPath(ShardingNode.NECESSARY)));
result.add(transactionOp.delete().forPath(jobNodePath.getFullPath(ShardingNode.PROCESSING)));
return result;
}
}
- 在这个zookeeper事务中,根据每个作业服务器实例获取到的分片信息,创建一个路径为/sharding/{item}/instance,value为instanceId的节点信息,待所有分片信息对应点的zookeeper节点创建完成之后,会删除掉/leader/sharding/neccessary和/leader/sharding/processing子节点信息。
网友评论