一、实体
分布式调度中重要的实体包括:任务、任务实例和执行节点。
任务:泛指任务的定义。
任务实例:泛指按照任务定义触发的任务运行时。同一个任务的多次触发则对应多个任务实例。
执行节点:任务实例运行的环境。
二、生命周期
各实体在系统运行过程中,实体状态发生改变,在不同状态下系统的各个模块将采取不同的操作以响应实体状态的变化。
1. 生命周期定义
(1)任务的生命周期
任务生命周期中的关键事件包括:新创建、启动、配置更新、暂停和删除。
(2)任务实例的生命周期
任务实例生命周期中的关键事件包括:准备执行、执行中、任务收尾、注册子任务、子任务完成(子任务状态可能是:成果、异常、超时)。
(3)执行节点生命周期
执行节点生命周期中的关键事件包括:节点上线、节点下线。
2. 程序设计
(1)接口定义
生命周期接口
public interface LiftCycle{
}
任务 生命周期
public interface JobLifeCycle extends LifeCycle{
void creating(JobConfig jobConfig);
void starting(JobConfig jobConfig);
void updating(JobConfig oldConfig, JobConfg newConfig);
void stopping(JobConfig jobConfg);
void deleting(JobConfig jobConfig);
}
任务实例 生命周期
public interface JobInstanceLifeCycle extends LifeCycle{
void starting(JobTrigger tgr);
void running(AbstactJob job, JobTrigger tgr);
void finishing(JobTrigger tgr, JobResult result);
void sumitSub(JobTrigger tgr, Class<? extends AbstractSubJob> subJobClass, Map<String, Object> dynamicParameter);
void finishSub(JobTrigger tgr, JobResult subResult);
}
执行节点生命周期
public interface NodeLifeCycle extends LifeCycle{
void joining (ClusterNode node, ClusterMeta meta);
void leaving(ClusterNode node, ClusterMeta meta)
}
(2)生命周期管理器
生命周期管理器负责生命周期实现类的注册, 和事件的触发。
public class LifeCycleManagerEntry<LCT extends LifeCycle> {
private final List<LCT> repository = new ArrayList<>();
pubic synchronized void register(LCT lifeCycleInstance){
if (!lrepository.contains(lifeCycleInstance)){
responsitory.add(lifeCycleInstance);
}
}
public void active(Consumer(? super LCT> action){
repository.stream().forEach(action);
}
}
这个类中有两个方法,一个用于注册生命周期实现,一个用于触发生命周期中的事件。
(3)生命周期中事件的触发
任务生命周期的触发在 JobService 中完成,这个类是针对任务的各种操作的入口。
任务启动触发的代码如下:
LifeCycleManager.job().active(l->l.starting(jobConfig));
其他事件的触发类似。
任务实例的生命周期事件的触发在 JavaExecutor类中。
return executorService.submit((Callable)()->{
LifeCycleManager.jobInstance().active(i->i.starting(trigger));
// ...
LifeCycleManager.jobInstance().active(i->i.finishing(trigger, result));
});
节点生命周期的触发在 ClusterRaftImpl 类中。
if (ClusterMembershipEvent.Type.MEMBER_ADDED.equals(event.type()){
LifeCycleManager.node.active(n->n.joining(ClusterNode.from(id, host, port), this.clusterMeta());
}
三、生命周期事件的处理
回顾一下存储设计:
nched.png
1. 任务 生命周期
(1)creating
创建 /config/job/{jobName}
节点,节点上 保存任务配置数据。
任务基本信息中包含 preferNodes和overload的配置。
(2)starting
修改/config/job/{jobName}
节点数据。
任务启动后,创建/runtime/job/{jobName}
节点。
触发任务分配操作,分配的结果,保存在两个节点:
一个是 /runtime/{jobName}/availableNodes
,一个是/runtime/node/{nodeName}/job/{jobName}
。
(3)stopping
修改/config/job/{jobName}
节点数据。
修改/runtime/{jobName}
节点数据为“STOP”。
删除/runtime/{jobName}
节点(需要等现有任务执行完成)。
删除/runtime/node/{nodeName}/job/{jobName}
数据。
(4)updating
修改/config/job/{jobName}
节点数据。
(5)deleting
修改/config/job/{jobName}
节点数据。
2. 任务实例 生命周期
(1)starting
任务被触发,准备开始执行时,执行starting
事件。
(2)running
任务被触发,开始执行时,执行starting
事件。
创建/runtime/job/{jobName}/instance/{jobInstanceId}
节点。
创建/runtime/node/{nodeName}/jobInstance/{jobInstanceId}
节点。
(3)finishing
如果状态为超时,触发故障转移。
如果正常或异常:
删除/runtime/job/{jobName}/instance/{jobInstanceId}
节点。
删除/runtime/node/{nodeName}/jobInstance/{jobInstanceId}
节点。
创建/archive/{date}/{jobName}/{jobInstanceId}
节点。
(4)submitSub
创建/runtime/job/{jobName}/instance/{jobInstanceId}/children/{childJobInstanceId}
节点。
更新children节点上子任务数量。
创建/runtime/node/{nodeName}/childJobInstance/{childJobInstanceId}
节点。
(5)finishSub
如果状态为超时,触发故障转移。
如果正常或异常:
触发父任务finishing
事件。
删除/runtime/node/{nodeName}/jobInstance/{jobInstanceId}
节点。
3. 节点 生命周期
(1)joining
创建/runtime/node/{nodeName}
节点。
触发任务分配:
更新/runtime/job/{jobName}/availableNodes/{nodeName}
节点。
更新/runtime/node/{nodeName}/job/{jobName}
节点。
(2)leaving
修改/runtime/node/{nodeName}
数据为OFFLINE。
触发故障转移,根据/runtime/{nodeName}/job
、/runtime/{nodeName}/jobInstance
、/runtime/{nodeName}/childJobInstance
获取受影响的任务、任务实例、子任务实例,触发任务重新分配。
删除/runtime/job/{jobName}/availableNodes/{nodeName}
节点。
故障转移完成后,删除/runtime/node/{nodeName}
节点。
网友评论