MasterContext:
Master上下文
private Map<Channel, MasterWorkerHolder> workers=new ConcurrentHashMap<Channel, MasterWorkerHolder>();
private ApplicationContext applicationContext;
private Master master;
private Scheduler scheduler;
private Dispatcher dispatcher;//调度任务 jobId
private Queue<JobElement> queue=new ArrayBlockingQueue<JobElement>(10000);//调试任务 debugId
private Queue<JobElement> debugQueue=new ArrayBlockingQueue<JobElement>(1000);//手动任务 historyId
private Queue<JobElement> manualQueue=new ArrayBlockingQueue<JobElement>(1000);
private MasterHandler handler;
private MasterServer server;
private ExecutorService threadPool=Executors.newCachedThreadPool();
private ScheduledExecutorService schedulePool=Executors.newScheduledThreadPool(5);
内容说明:
- workers: 维护了与所有worker的rpc通信管道,同时包含了worker的心跳信息,(内存、正在跑的任务、心跳时间戳、worker host标识)
- applicationContext:spring 管理数据库对象;
- master: master中初始化了对Dispatcher的监听,同时维护了两个线程,一个线程将任务池中的任务分配给当前最合适的worker机器,并启动;另个线程用了监测与worker的心跳连接,对于超时一分钟的则进行关闭;
- scheduler: 开源框架quartz的调度api,对于周期性的任务调度,zeus使用quarz的调度策略来进行调度;
- dispatcher: 事件分发类;
- queue: 正常调度的任务池;
- debugQueue: 调试任务的任务池;
- manualQueue: 手动任务的任务池;
- handler: master与web和worker的rpc通信封装类;
- server: 主节点 master netty服务器启动;
- threadPool: 主节点rpc通信线程池;
- schedulePool: 主节点调度线程池;
JobController:
任务控制器:
private final String jobId;
private CacheJobDescriptor cache;
private JobHistoryManager jobHistoryManager;
private GroupManager groupManager;
private Master master;
private MasterContext context;
任务状态的控制器,每个任务对应一个 JobController;通过接受不同的事件,来转换任务的状态机状态,从而到达管理任务生命周期的作用;
Zeus 中设定的任务的触发方式分为三种:
/** * 触发任务执行的3种类型
* 1: 定时执行
* 2:手动执行(不产生连锁反应)
* 3:手动恢复(产生连锁反应)
*
*/
public enum TriggerType{
SCHEDULE(1),MANUAL(2),MANUAL_RECOVER(3);
public String toName(){
if(id==1){
return "自动调度";
}else if(id==2){
return "手动触发";
}else if(id==3){
return "手动恢复";
}
return "未知";
}
Zeus中设定的任务状态:
/**
* WAIT:
* Job没有开始,或者Job依赖的任务没有全部完成
* RUNNING
* Job正在运行中
* SUCCESS
* Job运行成功(瞬间状态)
* FAILED
* Job运行失败(瞬间状态)
* @author zhoufang
*
*/
public enum Status{
WAIT("wait"),RUNNING("running"),SUCCESS("success"),FAILED("failed");
Zeus中设定任务的大的类型:
public enum JobRunType {
MapReduce("main"), Shell("shell"), Hive("hive");
Zeus中设定的任务调度的类型:
public enum JobScheduleType {
Independent(0), Dependent(1), CyleJob(2);
如果job控制器的任务调度类型是Independent,表示这个任务是独立的任务,仅仅依靠调度周期来触发,不会由其他的任务调度起来。而Dependent类型的则是说任务依赖于上层任务的触发。cycle 任务分为天任务和小时任务,
public void handleEvent(AppEvent event) {
try {
if (event instanceof JobSuccessEvent) {
successEventHandle((JobSuccessEvent) event);
} else if (event instanceof JobFailedEvent) {
failedEventHandle((JobFailedEvent) event);
} else if (event instanceof ScheduleTriggerEvent) {
triggerEventHandle((ScheduleTriggerEvent) event);
} else if (event instanceof JobMaintenanceEvent) {
maintenanceEventHandle((JobMaintenanceEvent) event);
} else if (event.getType() == Events.Initialize) {
initializeEventHandle();
}
} catch (Exception e) {
// catch所有的异常,保证本job的异常不影响其他job的运行
ScheduleInfoLog.error("JobId:" + jobId + " handleEvent error", e);
}
}
处理 JobSuccessEvent 事件:
private void successEventHandle(JobSuccessEvent event) {
if (event.getTriggerType() == TriggerType.MANUAL) {
return;
}
String eId = event.getJobId();
JobDescriptor jobDescriptor = cache.getJobDescriptor();
if (jobDescriptor == null) {
autofix();
return;
}
if (!jobDescriptor.getAuto()) {
return;
}
if (jobDescriptor.getScheduleType() == JobScheduleType.Independent) {
return;
}
if (jobDescriptor.getScheduleType() == JobScheduleType.CyleJob) {
cycleJobSuccessHandle(event);
return;
}
if (!jobDescriptor.getDependencies().contains(eId)) {
return;
}
JobStatus jobStatus = null;
synchronized (this) {
jobStatus = groupManager.getJobStatus(jobId);
JobBean bean = groupManager.getUpstreamJobBean(jobId);
String cycle = bean.getHierarchyProperties().getProperty(
PropertyKeys.DEPENDENCY_CYCLE);
if (cycle != null && !"".equals(cycle)) {
Map<String, String> dep = jobStatus.getReadyDependency();
if ("sameday".equals(cycle)) {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
String now = format.format(new Date());
for (String key : new HashSet<String>(dep.keySet())) {
String d = format.format(new Date(Long.valueOf(dep
.get(key))));
if (!now.equals(d)) {
jobStatus.getReadyDependency().remove(key);
ScheduleInfoLog.info("JobId:" + jobId
+ " remove overdue dependency " + key);
}
}
}
}
ScheduleInfoLog.info("JobId:" + jobId
+ " received a successed dependency job with jobId:"
+ event.getJobId());
ScheduleInfoLog.info("JobId:" + jobId + " the dependency jobId:"
+ event.getJobId() + " record it");
jobStatus.getReadyDependency().put(eId,
String.valueOf(new Date().getTime()));
groupManager.updateJobStatus(jobStatus);
}
boolean allComplete = true;
for (String key : jobDescriptor.getDependencies()) {
if (jobStatus.getReadyDependency().get(key) == null) {
allComplete = false;
break;
}
}
if (allComplete) {
ScheduleInfoLog.info("JobId:" + jobId
+ " all dependency jobs is ready,run!");
startNewJob(event.getTriggerType(), jobDescriptor, jobId);
} else {
ScheduleInfoLog.info("JobId:" + jobId
+ " some of dependency is not ready,waiting!");
}
}
成功事件的触发是为了连锁效应对下游任务的处理,所以不处理触发类型是MANUAL的事件。cycle 任务收到一个任务的成功事件后,如果发现是自身任务的成功事件,并且自身没有任何的依赖,表示 这个 cycle 任务是一个独立的周期任务,需要运算下次开始时间。如果当前任务依赖于已经完成的任务,则在当前任务中保存已经完成的依赖,并检查当前任务是否可以执行。【该处的代码jobStatus.getReadyDependency().put(eId, event.getStatisEndTime()); 需要推敲一下】。任务依赖的周期都必须相同并且任务的结束时间也必须相同。如果周期不一样,因为只有天任务依赖小时任务,没有小时任务依赖天,所以可以判断自己肯定是天任务,而完成的是小时任务,因此需要判断该小时是否是23时即可。
处理 JobFailedEvent 事件:
private void failedEventHandle(JobFailedEvent event) {
JobDescriptor jobDescriptor = cache.getJobDescriptor();
if (jobDescriptor == null) {
autofix();
return;
}
if (!jobDescriptor.getAuto()) {
return;
}
if (jobDescriptor.getDependencies().contains(event.getJobId())) {// 本Job依赖失败的Job
if (event.getTriggerType() == TriggerType.SCHEDULE) {// 依赖的Job
// 的失败类型是
// SCHEDULE类型
// 自身依赖的Job失败了,表明自身也无法继续执行,抛出失败的消息
ZeusJobException exception = new ZeusJobException(event
.getJobException().getCauseJobId(), "jobId:"
+ jobDescriptor.getId() + " 失败,原因是依赖的Job:"
+ event.getJobId() + " 执行失败", event.getJobException());
ScheduleInfoLog.info("jobId:" + jobId
+ " is fail,as dependendy jobId:"
+ jobDescriptor.getId() + " is failed");
// 记录进History日志
JobHistory history = new JobHistory();
history.setStartTime(new Date());
history.setEndTime(new Date());
history.setExecuteHost(null);
history.setJobId(jobId);
history.setTriggerType(event.getTriggerType());
history.setStatus(Status.FAILED);
history.getLog().appendZeusException(exception);
history.setStatisEndTime(jobDescriptor.getStatisEndTime());
history.setTimezone(jobDescriptor.getTimezone());
history.setCycle(jobDescriptor.getCycle());
history = jobHistoryManager.addJobHistory(history);
jobHistoryManager.updateJobHistoryLog(history.getId(), history .getLog().getContent());
JobFailedEvent jfe = new JobFailedEvent(jobDescriptor.getId(), event.getTriggerType(), history, exception);
ScheduleInfoLog.info("JobId:" + jobId
+ " is fail,dispatch the fail event");
// 广播消息
context.getDispatcher().forwardEvent(jfe);
}
}
}
当依赖的一个Job失败时,本Job也自动失败了。
处理 ScheduleTriggerEvent 事件:
/**
* 收到定时触发任务的事件的处理流程
*
* @param event */
private void triggerEventHandle(ScheduleTriggerEvent event) {
String eId = event.getJobId();
JobDescriptor jobDescriptor = cache.getJobDescriptor();
if (jobDescriptor == null) {// 说明job被删除了,这是一个异常状况,autofix
autofix();
return;
} if (!eId.equals(jobDescriptor.getId())) {
return;
}
ScheduleInfoLog.info("JobId:" + jobId
+ " receive a timer trigger event,statisTime is:"
+ jobDescriptor.getStatisEndTime());
runJob(jobDescriptor);
}
收到触发任务事件,如果触发的任务是本任务的话,则启动该任务;
处理 JobMaintenanceEvent 事件:
private void maintenanceEventHandle(JobMaintenanceEvent event) {
if (event.getType() == Events.UpdateJob
&& jobId.equals(event.getJobId())) {
autofix();
}
}
收到更新任务事件,如果触发的任务是本任务的话,则更新改任务的内存信息;
处理 Initialize 事件:
private void initializeEventHandle() {
JobStatus jobStatus = groupManager.getJobStatus(jobId);
if (jobStatus != null) {
// 启动时发现在RUNNING 状态,说明上一次运行的结果丢失,将立即进行重试
if (jobStatus.getStatus() == Status.RUNNING) {
log.error("jobId=" + jobId
+ " 处于RUNNING状态,说明该JOB状态丢失,立即进行重试操作...");
// 搜索上一次运行的日志,从日志中提取jobid 进行kill
String operator = null;
if (jobStatus.getHistoryId() != null) {
JobHistory history = jobHistoryManager
.findJobHistory(jobStatus.getHistoryId());
// 特殊情况下,有可能history查询为空
if (history != null
&& history.getStatus() == Status.RUNNING) {
operator = history.getOperator();
try {
JobContext temp = JobContext.getTempJobContext();
temp.setJobHistory(history);
new CancelHadoopJob(temp).run();
} catch (Exception e) {
// 忽略
}
}
}
JobHistory history = new JobHistory();
history.setIllustrate("启动服务器发现正在running状态,判断状态已经丢失,进行重试操作");
history.setOperator(operator);
history.setTriggerType(TriggerType.MANUAL_RECOVER);
history.setJobId(jobId);
context.getJobHistoryManager().addJobHistory(history);
master.run(history);
}
}
JobDescriptor jd = cache.getJobDescriptor();
// 如果是定时任务,启动定时程序
if (jd.getAuto() && jd.getScheduleType() == JobScheduleType.Independent) {
String cronExpression = jd.getCronExpression();
try {
CronTrigger trigger = new CronTrigger(jd.getId(), "zeus", cronExpression);
JobDetail detail = new JobDetail(jd.getId(), "zeus", TimerJob.class);
detail.getJobDataMap().put("jobId", jd.getId());
detail.getJobDataMap().put("dispatcher", context.getDispatcher());
context.getScheduler().scheduleJob(detail, trigger);
} catch (Exception e) {
if (e instanceof SchedulerException
&& "Based on configured schedule, the given trigger will never fire."
.equals(e.getMessage())) {
// 定时器已经不会被触发了,关闭该job的自动调度功能
jd.setAuto(false);
try {
groupManager.updateJob(jd.getOwner(), jd);
} catch (ZeusException e1) {
log.error("JobId:" + jobId + " 更新失败", e1);
}
cache.refresh();
} else {
log.error("JobId:" + jobId + " 定时程序启动失败", e);
}
}
} // 周期任务,并且没有依赖的情况下,直接根据开始时间执行
if (jd.getAuto()
&& jd.getScheduleType() == JobScheduleType.CyleJob
&& (jd.getDependencies() == null || jd.getDependencies()
.isEmpty())) {
initCycleJob(jd);
}
}
网友评论