美文网首页JAVA
Zeus-Master-主要组件

Zeus-Master-主要组件

作者: PunyGod | 来源:发表于2016-07-07 15:18 被阅读593次

    MasterContext:

    Master上下文

    private Map<Channel, MasterWorkerHolder> workers=new ConcurrentHashMap<Channel, MasterWorkerHolder>();
    private ApplicationContext applicationContext;
    private Master master;
    private Scheduler scheduler;
    private Dispatcher dispatcher;//调度任务 jobId
    private Queue<JobElement> queue=new ArrayBlockingQueue<JobElement>(10000);//调试任务  debugId
    private Queue<JobElement> debugQueue=new ArrayBlockingQueue<JobElement>(1000);//手动任务  historyId
    private Queue<JobElement> manualQueue=new ArrayBlockingQueue<JobElement>(1000);
    private MasterHandler handler;
    private MasterServer server;
    private ExecutorService threadPool=Executors.newCachedThreadPool();
    private ScheduledExecutorService schedulePool=Executors.newScheduledThreadPool(5);
    

    内容说明:

    • workers: 维护了与所有worker的rpc通信管道,同时包含了worker的心跳信息,(内存、正在跑的任务、心跳时间戳、worker host标识)
    • applicationContext:spring 管理数据库对象;
    • master: master中初始化了对Dispatcher的监听,同时维护了两个线程,一个线程将任务池中的任务分配给当前最合适的worker机器,并启动;另个线程用了监测与worker的心跳连接,对于超时一分钟的则进行关闭;
    • scheduler: 开源框架quartz的调度api,对于周期性的任务调度,zeus使用quarz的调度策略来进行调度;
    • dispatcher: 事件分发类;
    • queue: 正常调度的任务池;
    • debugQueue: 调试任务的任务池;
    • manualQueue: 手动任务的任务池;
    • handler: master与web和worker的rpc通信封装类;
    • server: 主节点 master netty服务器启动;
    • threadPool: 主节点rpc通信线程池;
    • schedulePool: 主节点调度线程池;

    JobController:

    任务控制器:

    private final String jobId;
    private CacheJobDescriptor cache;
    private JobHistoryManager jobHistoryManager;
    private GroupManager groupManager;
    private Master master;
    private MasterContext context;
    

    任务状态的控制器,每个任务对应一个 JobController;通过接受不同的事件,来转换任务的状态机状态,从而到达管理任务生命周期的作用;

    Zeus 中设定的任务的触发方式分为三种:

    /** * 触发任务执行的3种类型
     * 1: 定时执行
     * 2:手动执行(不产生连锁反应)
     * 3:手动恢复(产生连锁反应)
     *
     */
    public enum TriggerType{
       SCHEDULE(1),MANUAL(2),MANUAL_RECOVER(3);
       public String toName(){
       if(id==1){
          return "自动调度";
       }else if(id==2){
          return "手动触发";
       }else if(id==3){
          return "手动恢复";
       }
       return "未知";
    }
    

    Zeus中设定的任务状态:

    /**
     * WAIT:
     *     Job没有开始,或者Job依赖的任务没有全部完成
     * RUNNING
     *     Job正在运行中
     * SUCCESS
     *     Job运行成功(瞬间状态)
     * FAILED
     *     Job运行失败(瞬间状态)
     * @author zhoufang
     *
     */
    public enum Status{
       WAIT("wait"),RUNNING("running"),SUCCESS("success"),FAILED("failed");
    

    Zeus中设定任务的大的类型:

    public enum JobRunType {
       MapReduce("main"), Shell("shell"), Hive("hive");
    

    Zeus中设定的任务调度的类型:

    public enum JobScheduleType {
       Independent(0), Dependent(1), CyleJob(2);
    

    如果job控制器的任务调度类型是Independent,表示这个任务是独立的任务,仅仅依靠调度周期来触发,不会由其他的任务调度起来。而Dependent类型的则是说任务依赖于上层任务的触发。cycle 任务分为天任务和小时任务,

    public void handleEvent(AppEvent event) {
       try {
          if (event instanceof JobSuccessEvent) {
             successEventHandle((JobSuccessEvent) event);
          } else if (event instanceof JobFailedEvent) {
             failedEventHandle((JobFailedEvent) event);
          } else if (event instanceof ScheduleTriggerEvent) {
             triggerEventHandle((ScheduleTriggerEvent) event);
          } else if (event instanceof JobMaintenanceEvent) {
             maintenanceEventHandle((JobMaintenanceEvent) event);
          } else if (event.getType() == Events.Initialize) {
             initializeEventHandle();
          }
       } catch (Exception e) {
          // catch所有的异常,保证本job的异常不影响其他job的运行
          ScheduleInfoLog.error("JobId:" + jobId + " handleEvent error", e);
       }
    }
    

    处理 JobSuccessEvent 事件:

    private void successEventHandle(JobSuccessEvent event) {
       if (event.getTriggerType() == TriggerType.MANUAL) {
          return;
       }
       String eId = event.getJobId();
       JobDescriptor jobDescriptor = cache.getJobDescriptor();
       if (jobDescriptor == null) {
          autofix();
          return;
       }
       if (!jobDescriptor.getAuto()) {
          return;
       }
       if (jobDescriptor.getScheduleType() == JobScheduleType.Independent) {
          return;
       }
       if (jobDescriptor.getScheduleType() == JobScheduleType.CyleJob) {
          cycleJobSuccessHandle(event);
          return;
       }
       if (!jobDescriptor.getDependencies().contains(eId)) {
          return;
       }
       JobStatus jobStatus = null;
       synchronized (this) {
          jobStatus = groupManager.getJobStatus(jobId);
          JobBean bean = groupManager.getUpstreamJobBean(jobId);
          String cycle = bean.getHierarchyProperties().getProperty(
                PropertyKeys.DEPENDENCY_CYCLE);
          if (cycle != null && !"".equals(cycle)) {
             Map<String, String> dep = jobStatus.getReadyDependency();
             if ("sameday".equals(cycle)) {
                SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
                String now = format.format(new Date());
                for (String key : new HashSet<String>(dep.keySet())) {
                   String d = format.format(new Date(Long.valueOf(dep
                         .get(key))));
                   if (!now.equals(d)) {
                      jobStatus.getReadyDependency().remove(key);
                      ScheduleInfoLog.info("JobId:" + jobId
                            + " remove overdue dependency " + key);
                   }
                }
             }
          }
          ScheduleInfoLog.info("JobId:" + jobId
                + " received a successed dependency job with jobId:"
                + event.getJobId());
          ScheduleInfoLog.info("JobId:" + jobId + " the dependency jobId:"
                + event.getJobId() + " record it");
          jobStatus.getReadyDependency().put(eId,
                String.valueOf(new Date().getTime()));
          groupManager.updateJobStatus(jobStatus);
       }
       boolean allComplete = true;
       for (String key : jobDescriptor.getDependencies()) {
          if (jobStatus.getReadyDependency().get(key) == null) {
             allComplete = false;
             break;
          }
       }
       if (allComplete) {
          ScheduleInfoLog.info("JobId:" + jobId
                + " all dependency jobs is ready,run!");
          startNewJob(event.getTriggerType(), jobDescriptor, jobId);
       } else {
          ScheduleInfoLog.info("JobId:" + jobId
                + " some of dependency is not ready,waiting!");
       }
    }
    

    成功事件的触发是为了连锁效应对下游任务的处理,所以不处理触发类型是MANUAL的事件。cycle 任务收到一个任务的成功事件后,如果发现是自身任务的成功事件,并且自身没有任何的依赖,表示 这个 cycle 任务是一个独立的周期任务,需要运算下次开始时间。如果当前任务依赖于已经完成的任务,则在当前任务中保存已经完成的依赖,并检查当前任务是否可以执行。【该处的代码jobStatus.getReadyDependency().put(eId, event.getStatisEndTime()); 需要推敲一下】。任务依赖的周期都必须相同并且任务的结束时间也必须相同。如果周期不一样,因为只有天任务依赖小时任务,没有小时任务依赖天,所以可以判断自己肯定是天任务,而完成的是小时任务,因此需要判断该小时是否是23时即可。

    处理 JobFailedEvent 事件:

    private void failedEventHandle(JobFailedEvent event) {
       JobDescriptor jobDescriptor = cache.getJobDescriptor();
       if (jobDescriptor == null) {
          autofix();
          return;
       }
       if (!jobDescriptor.getAuto()) {
          return;
       }
       if (jobDescriptor.getDependencies().contains(event.getJobId())) {// 本Job依赖失败的Job
          if (event.getTriggerType() == TriggerType.SCHEDULE) {// 依赖的Job
                                                    // 的失败类型是
                                                    // SCHEDULE类型
             // 自身依赖的Job失败了,表明自身也无法继续执行,抛出失败的消息
             ZeusJobException exception = new ZeusJobException(event
                   .getJobException().getCauseJobId(), "jobId:"
                   + jobDescriptor.getId() + " 失败,原因是依赖的Job:"
                   + event.getJobId() + " 执行失败", event.getJobException());
             ScheduleInfoLog.info("jobId:" + jobId
                   + " is fail,as dependendy jobId:"
                   + jobDescriptor.getId() + " is failed");
             // 记录进History日志
             JobHistory history = new JobHistory();
             history.setStartTime(new Date());
             history.setEndTime(new Date());
             history.setExecuteHost(null);
             history.setJobId(jobId);
             history.setTriggerType(event.getTriggerType());
             history.setStatus(Status.FAILED);
             history.getLog().appendZeusException(exception);
             history.setStatisEndTime(jobDescriptor.getStatisEndTime());
             history.setTimezone(jobDescriptor.getTimezone());
             history.setCycle(jobDescriptor.getCycle());
             history = jobHistoryManager.addJobHistory(history);
             jobHistoryManager.updateJobHistoryLog(history.getId(), history               .getLog().getContent());
             JobFailedEvent jfe = new JobFailedEvent(jobDescriptor.getId(),               event.getTriggerType(), history, exception);
             ScheduleInfoLog.info("JobId:" + jobId
                   + " is fail,dispatch the fail event");
             // 广播消息
             context.getDispatcher().forwardEvent(jfe);
          }
       }
    }
    

    当依赖的一个Job失败时,本Job也自动失败了。

    处理 ScheduleTriggerEvent 事件:

    /**
     * 收到定时触发任务的事件的处理流程
     *
      * @param event */
    private void triggerEventHandle(ScheduleTriggerEvent event) {
       String eId = event.getJobId();
       JobDescriptor jobDescriptor = cache.getJobDescriptor();
       if (jobDescriptor == null) {// 说明job被删除了,这是一个异常状况,autofix
          autofix();
          return;
       }   if (!eId.equals(jobDescriptor.getId())) {
          return;
       }
       ScheduleInfoLog.info("JobId:" + jobId
             + " receive a timer trigger event,statisTime is:"
             + jobDescriptor.getStatisEndTime());
       runJob(jobDescriptor);
    }
    

    收到触发任务事件,如果触发的任务是本任务的话,则启动该任务;

    处理 JobMaintenanceEvent 事件:

    private void maintenanceEventHandle(JobMaintenanceEvent event) {
       if (event.getType() == Events.UpdateJob
             && jobId.equals(event.getJobId())) {
          autofix();
       }
    }
    

    收到更新任务事件,如果触发的任务是本任务的话,则更新改任务的内存信息;

    处理 Initialize 事件:

    private void initializeEventHandle() {
       JobStatus jobStatus = groupManager.getJobStatus(jobId);
       if (jobStatus != null) {
          // 启动时发现在RUNNING 状态,说明上一次运行的结果丢失,将立即进行重试
          if (jobStatus.getStatus() == Status.RUNNING) {
             log.error("jobId=" + jobId
                   + " 处于RUNNING状态,说明该JOB状态丢失,立即进行重试操作...");
             // 搜索上一次运行的日志,从日志中提取jobid 进行kill
             String operator = null;
             if (jobStatus.getHistoryId() != null) {
                JobHistory history = jobHistoryManager
                      .findJobHistory(jobStatus.getHistoryId());
                // 特殊情况下,有可能history查询为空
                if (history != null
                      && history.getStatus() == Status.RUNNING) {
                   operator = history.getOperator();
                   try {
                      JobContext temp = JobContext.getTempJobContext();
                      temp.setJobHistory(history);
                      new CancelHadoopJob(temp).run();
                   } catch (Exception e) {
                      // 忽略
                   }
                }
             }
             JobHistory history = new JobHistory();
             history.setIllustrate("启动服务器发现正在running状态,判断状态已经丢失,进行重试操作");
             history.setOperator(operator);
             history.setTriggerType(TriggerType.MANUAL_RECOVER);
             history.setJobId(jobId);
             context.getJobHistoryManager().addJobHistory(history);
             master.run(history);
          }
       }
       JobDescriptor jd = cache.getJobDescriptor();
       // 如果是定时任务,启动定时程序
       if (jd.getAuto() && jd.getScheduleType() == JobScheduleType.Independent) {
          String cronExpression = jd.getCronExpression();
          try {
             CronTrigger trigger = new CronTrigger(jd.getId(), "zeus",               cronExpression);
             JobDetail detail = new JobDetail(jd.getId(), "zeus",               TimerJob.class);
             detail.getJobDataMap().put("jobId", jd.getId());
             detail.getJobDataMap().put("dispatcher",               context.getDispatcher());
             context.getScheduler().scheduleJob(detail, trigger);
          } catch (Exception e) {
             if (e instanceof SchedulerException
                   && "Based on configured schedule, the given trigger will never fire."
                         .equals(e.getMessage())) {
                // 定时器已经不会被触发了,关闭该job的自动调度功能
                jd.setAuto(false);
                try {
                   groupManager.updateJob(jd.getOwner(), jd);
                } catch (ZeusException e1) {
                   log.error("JobId:" + jobId + " 更新失败", e1);
                }
                cache.refresh();
             } else {
                log.error("JobId:" + jobId + " 定时程序启动失败", e);
             }
          }
       }   // 周期任务,并且没有依赖的情况下,直接根据开始时间执行
       if (jd.getAuto()
             && jd.getScheduleType() == JobScheduleType.CyleJob
             && (jd.getDependencies() == null || jd.getDependencies()
                   .isEmpty())) {
          initCycleJob(jd);
       }
    }
    

    相关文章

      网友评论

        本文标题:Zeus-Master-主要组件

        本文链接:https://www.haomeiwen.com/subject/vnhljttx.html