美文网首页
Oozie-Service-ActionCheckerServi

Oozie-Service-ActionCheckerServi

作者: PunyGod | 来源:发表于2016-07-04 14:31 被阅读65次

功能:
定期去检查action的状态,包含coord的 和wf的;

/**
 * {@link ActionCheckRunnable} is the runnable which is scheduled to run and
 * queue Action checks.
 */

static class ActionCheckRunnable implements Runnable {
    private int actionCheckDelay;
    private List<XCallable<Void>> callables;
    private StringBuilder msg = null;
    public ActionCheckRunnable(int actionCheckDelay) {
        this.actionCheckDelay = actionCheckDelay;
    }
    public void run() {
        XLog.Info.get().clear();
        XLog LOG = XLog.getLog(getClass());
        msg = new StringBuilder();
        try {
            runWFActionCheck();
            runCoordActionCheck();
        }
        catch (CommandException ce) {
            LOG.error("Unable to run action checks, ", ce);
        }
        LOG.debug("QUEUING [{0}] for potential checking", msg.toString());
        if (null != callables) {
            boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
            if (ret == false) {
                LOG.warn("Unable to queue the callables commands for CheckerService. "
                        + "Most possibly command queue is full. Queue size is :"                        + Services.get().get(CallableQueueService.class).queueSize());
            }
            callables = null;
        }
    }
    /**
     * check workflow actions
     *
     * @throws CommandException
     */
    private void runWFActionCheck() throws CommandException {
        JPAService jpaService = Services.get().get(JPAService.class);
        if (jpaService == null) {
            throw new CommandException(ErrorCode.E0610);
        }
        List<WorkflowActionBean> actions;
        try {
            actions = WorkflowActionQueryExecutor.getInstance().getList(WorkflowActionQuery.GET_RUNNING_ACTIONS,
                    actionCheckDelay);
        }
        catch (JPAExecutorException je) {
            throw new CommandException(je);
        }
        if (actions == null || actions.isEmpty()) {
            return;
        }
        List<String> actionIds = toIds(actions);
        try {
            actionIds = Services.get().get(JobsConcurrencyService.class).getJobIdsForThisServer(actionIds);
        }
        catch (Exception ex) {
            throw new CommandException(ErrorCode.E1700, ex.getMessage(), ex);
        }
        msg.append(" WF_ACTIONS : ").append(actionIds.size());
        for (String actionId : actionIds) {
            Services.get().get(InstrumentationService.class).get().incr(INSTRUMENTATION_GROUP,
                    INSTR_CHECK_ACTIONS_COUNTER, 1);
                queueCallable(new ActionCheckXCommand(actionId));
        }
    }
    /**
     * check coordinator actions
     *
     * @throws CommandException
     */
    private void runCoordActionCheck() throws CommandException {
        JPAService jpaService = Services.get().get(JPAService.class);
        if (jpaService == null) {
            throw new CommandException(ErrorCode.E0610);
        }
        List<String> cactionIds;
        try {
            cactionIds = jpaService.execute(new CoordActionsRunningGetJPAExecutor(
                    actionCheckDelay));
        }
        catch (JPAExecutorException je) {
            throw new CommandException(je);
        }
        if (cactionIds == null || cactionIds.isEmpty()) {
            return;
        }
        try {
            cactionIds = Services.get().get(JobsConcurrencyService.class).getJobIdsForThisServer(cactionIds);
        }
        catch (Exception ex) {
            throw new CommandException(ErrorCode.E1700, ex.getMessage(), ex);
        }
        msg.append(" COORD_ACTIONS : ").append(cactionIds.size());
        for (String coordActionId : cactionIds) {
            Services.get().get(InstrumentationService.class).get().incr(INSTRUMENTATION_GROUP,
                    INSTR_CHECK_COORD_ACTIONS_COUNTER, 1);
                queueCallable(new CoordActionCheckXCommand(coordActionId, actionCheckDelay));
        }
    }
    /**
     * Adds callables to a list. If the number of callables in the list
     * reaches {@link ActionCheckerService#CONF_CALLABLE_BATCH_SIZE}, the
     * entire batch is queued and the callables list is reset.
     *
     * @param callable the callable to queue.
     */
    private void queueCallable(XCallable<Void> callable) {
        if (callables == null) {
            callables = new ArrayList<XCallable<Void>>();
        }
        callables.add(callable);
        if (callables.size() == ConfigurationService.getInt(CONF_CALLABLE_BATCH_SIZE)) {
            boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
            if (ret == false) {
                XLog.getLog(getClass()).warn(
                        "Unable to queue the callables commands for CheckerService. "
                                + "Most possibly command queue is full. Queue size is :"
                                + Services.get().get(CallableQueueService.class).queueSize());
            }
            callables = new ArrayList<XCallable<Void>>();
        }
    }
    private List<String> toIds(List<WorkflowActionBean> actions) {
        List<String> ids = new ArrayList<String>(actions.size());
        for (WorkflowActionBean action : actions) {
            ids.add(action.getId());
        }
        return ids;
    }
}

定时调度的主要工作就是对于处于running状态超过一个时间的任务,不管是wf的还是coord的任务,异步发起action check的命令,来检查任务的状态。

相关文章

  • Oozie-Service-ActionCheckerServi

    功能:定期去检查action的状态,包含coord的 和wf的; 定时调度的主要工作就是对于处于running状态...

网友评论

      本文标题:Oozie-Service-ActionCheckerServi

      本文链接:https://www.haomeiwen.com/subject/mcicjttx.html