美文网首页
Oozie-Service-CoordMaterializeTr

Oozie-Service-CoordMaterializeTr

作者: PunyGod | 来源:发表于2016-07-04 11:37 被阅读100次

    功能:
    定期去扫描元数据表CoordinatorJobBean,将满足条件的Job实体化;

    // default is 300sec (5min)
    int schedulingInterval = Services.get().getConf().getInt(CONF_SCHEDULING_INTERVAL, lookupInterval);
    Runnable lookupTriggerJobsRunnable = new CoordMaterializeTriggerRunnable(materializationWindow, lookupInterval);
    services.get(SchedulerService.class).schedule(lookupTriggerJobsRunnable, 10, schedulingInterval,SchedulerService.Unit.SEC);
    

    CoordMaterializeTriggerRunnable:

    oozie里面的触发机制:

     1、启动一个定时执行器,每隔 schedulingInterval 时间运行一次 CoordMaterializeTriggerRunnable;
     2、查询CoordinatorJobBean根据时间和条数限制获取本次需要实例化的CoordinatorJobBean;
        技巧: 在CoordinatorJobBean中有个字段 nextMaterializedTimestamp 表明,在这个时间点之前的实例已经存在,为了尽可能的让任务的实例尽可能在触发之前
        实例就产生,而不是事后产生,在查询CoordinatorJobBean使用的时候不是使用当前时间,而是使用未来的一个时间: new Date().getTime() + lookupInterval * 1000
     3、异步去实例化 CoordinatorJobBean 下的 action;
     4、在实例化action的时候,生成的是窗口期内的一批实例,在类 CoordMaterializeTransitionXCommand中实现;
     5、loadState():加载 CoordinatorJobBean 信息,计算这次要实例化的时间窗口: startMatdTime 和 endMatdTime;
     6、materialize():根据不同的计算方式(cron、自定义)来在这个时间窗口内循环的产生action的实例;
     7、更新  CoordinatorJobBean 状态,记录 endMatdTime、lastActionNumber(时间累加)、状态置为running状态、如果jobEndTime< endMatdTime
        说明这个 CoordinatorJobBean 实例化工作已经结束,标记 job.setDoneMaterialization();
     8、performWrites():更新数据库
     9、notifyParent():通知上层结构 bundle;
    
    /** * This runnable class will run in every "interval" to queue CoordMaterializeTransitionXCommand. 
    */
    static class CoordMaterializeTriggerRunnable implements Runnable {
        private int materializationWindow;
        private int lookupInterval;
        private long delay = 0;
        private List<XCallable<Void>> callables;
        private List<XCallable<Void>> delayedCallables;
        private XLog LOG = XLog.getLog(getClass());
        public CoordMaterializeTriggerRunnable(int materializationWindow, int lookupInterval) {
            this.materializationWindow = materializationWindow;
            this.lookupInterval = lookupInterval;
        }
        @Override
        public void run() {
            LockToken lock = null;
            // first check if there is some other running instance from the same service;
            try {
                lock = Services.get().get(MemoryLocksService.class)
                        .getWriteLock(CoordMaterializeTriggerService.class.getName(), lockTimeout);
                if (lock != null) {
                    runCoordJobMatLookup();
                    if (null != callables) {
                        boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
                        if (ret == false) {
                            XLog.getLog(getClass()).warn(
                                    "Unable to queue the callables commands for CoordMaterializeTriggerRunnable. "
                                            + "Most possibly command queue is full. Queue size is :"
                                            + Services.get().get(CallableQueueService.class).queueSize());
                        }
                        callables = null;
                    }
                    if (null != delayedCallables) {
                        boolean ret = Services.get().get(CallableQueueService.class)
                                .queueSerial(delayedCallables, this.delay);
                        if (ret == false) {
                            XLog.getLog(getClass()).warn(
                                    "Unable to queue the delayedCallables commands for CoordMaterializeTriggerRunnable. "
                                            + "Most possibly Callable queue is full. Queue size is :"
                                            + Services.get().get(CallableQueueService.class).queueSize());
                        }
                        delayedCallables = null;
                        this.delay = 0;
                    }
                }
                else {
                    LOG.debug("Can't obtain lock, skipping");
                }
            }
            catch (Exception e) {
                LOG.error("Exception", e);
            }
            finally {
                if (lock != null) {
                    lock.release();
                    LOG.info("Released lock for [{0}]", CoordMaterializeTriggerService.class.getName());
                }
            }
        }
        /**
         * Recover coordinator jobs that should be materialized
         * @throws JPAExecutorException
         */
        private void runCoordJobMatLookup() throws JPAExecutorException {
            List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();        XLog.Info.get().clear();
            XLog LOG = XLog.getLog(getClass());
            try {
                // get current date
                Date currDate = new Date(new Date().getTime() + lookupInterval * 1000);
                // get list of all jobs that have actions that should be materialized.
                int materializationLimit = ConfigurationService.getInt(CONF_MATERIALIZATION_SYSTEM_LIMIT);
                materializeCoordJobs(currDate, materializationLimit, LOG, updateList);
            }
            catch (Exception ex) {
                LOG.error("Exception while attempting to materialize coordinator jobs, {0}", ex.getMessage(), ex);
            }
            finally {
                BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null);
            }
        }
        private void materializeCoordJobs(Date currDate, int limit, XLog LOG, List<UpdateEntry> updateList)
                throws JPAExecutorException {
            try {
                List<CoordinatorJobBean> materializeJobs = CoordJobQueryExecutor.getInstance().getList(
                        CoordJobQuery.GET_COORD_JOBS_OLDER_FOR_MATERIALIZATION, currDate, limit);
                LOG.info("CoordMaterializeTriggerService - Curr Date= " + DateUtils.formatDateOozieTZ(currDate)
                        + ", Num jobs to materialize = " + materializeJobs.size());
                for (CoordinatorJobBean coordJob : materializeJobs) {
                    Services.get().get(InstrumentationService.class).get()
                            .incr(INSTRUMENTATION_GROUP, INSTR_MAT_JOBS_COUNTER, 1);
                    queueCallable(new CoordMaterializeTransitionXCommand(coordJob.getId(), materializationWindow));
                    coordJob.setLastModifiedTime(new Date());
                    updateList.add(new UpdateEntry<CoordJobQuery>(CoordJobQuery.UPDATE_COORD_JOB_LAST_MODIFIED_TIME,
                            coordJob));
                }
            }
            catch (JPAExecutorException jex) {
                LOG.warn("JPAExecutorException while attempting to materialize coordinator jobs", jex);
            }
        }
        /**
         * Adds callables to a list. If the number of callables in the list reaches {@link
         * CoordMaterializeTriggerService#CONF_CALLABLE_BATCH_SIZE}, the entire batch is queued and the callables list
         * is reset.
         *
         * @param callable the callable to queue. 
        */
        private void queueCallable(XCallable<Void> callable) {
            if (callables == null) {
                callables = new ArrayList<XCallable<Void>>();
            }
            callables.add(callable);
            if (callables.size() == ConfigurationService.getInt(CONF_CALLABLE_BATCH_SIZE)) {            boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
                if (ret == false) {
                    XLog.getLog(getClass()).warn(
                            "Unable to queue the callables commands for CoordMaterializeTriggerRunnable. "
                                    + "Most possibly command queue is full. Queue size is :"
                                    + Services.get().get(CallableQueueService.class).queueSize());
                }
                callables = new ArrayList<XCallable<Void>>();
            }
        }
    }
    

    相关文章

      网友评论

          本文标题:Oozie-Service-CoordMaterializeTr

          本文链接:https://www.haomeiwen.com/subject/ngbcjttx.html