美文网首页
Oozie-command

Oozie-command

作者: PunyGod | 来源:发表于2016-06-29 16:31 被阅读129次

    从前面几篇组件介绍,希望用户对oozie的组件有个粗略的概念,用户需要完成的业务逻辑将会被封装成为wf、coord、bundle,作为一个调度系统,oozie需要为用户做到什么呢,oozie需要按照用户的设地在合适的时候对用户的wf、coord、bundle进行操作,可能的操作包括 启动、停止、杀死、重跑、挂起、恢复、结束等等,本篇主要介绍oozie的命令系统;

    /** * Extends Callable adding the concept of priority. <p/> The priority is useful when queuing callables for later 
    * execution via the {@link org.apache.oozie.service.CallableQueueService}. <p/> A higher number means a higher 
    * priority. <p/>
    */
    public interface XCallable<T> extends Callable<T> {
    
    /** * Base class for synchronous and asynchronous commands. 
    * <p/> 
    * It enables by API the following pattern: 
    * <p/>
    * <ul> 
    * <li>single execution: a command instance can be executed only once</li> 
    * <li>eager data loading: loads data for eager precondition check</li>
    * <li>eager precondition check: verify precondition before obtaining lock</li> 
    * <li>data loading: loads data for precondition check and execution</li>
    * <li>precondition check: verifies precondition for execution is still met</li> 
    * <li>locking: obtains exclusive lock on key before executing the command</li> 
    * <li>execution: command logic</li> 
    * </ul> 
    * <p/> 
    * It has built in instrumentation and logging. 
    */
    public abstract class XCommand<T> implements XCallable<T> {
    

    oozie将所有的命令抽象出一层 XCommand ,命令根据不同的场景需要同步执行或者异步执行,当进行异步执行的时候,还引入了优先级的概念来排列命令的执行计划;

    /** * Implements the XCommand life-cycle. 
    *
    * @return the {link #execute} return value. 
    * @throws Exception thrown if the command could not be executed. */
    @Override
    public final T call() throws CommandException {
        setLogInfo();
        if (CallableQueueService.INTERRUPT_TYPES.contains(this.getType()) && used.get()) {
            LOG.debug("Command [{0}] key [{1}]  already used for [{2}]", getName(), getEntityKey(), this.toString());
            return null;
        }
        commandQueue = null; 
        instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".executions", 1);
        Instrumentation.Cron callCron = new Instrumentation.Cron();
        try {
            callCron.start();
            if (!isSynchronous) {
                eagerLoadState();
                eagerVerifyPrecondition();
            }
            try {
                T ret = null;
                if (!isSynchronous && isLockRequired() && !this.inInterruptMode()) {
                    Instrumentation.Cron acquireLockCron = new Instrumentation.Cron();
                    acquireLockCron.start();
                    acquireLock();
                    acquireLockCron.stop();
                    instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".acquireLock", acquireLockCron);
                } 
               // executing interrupts only in case of the lock required commands
                if (lock != null) {
                    this.executeInterrupts();
                } 
               if (isSynchronous || !isLockRequired() || (lock != null) || this.inInterruptMode()) { 
                   if (CallableQueueService.INTERRUPT_TYPES.contains(this.getType())
                            && !used.compareAndSet(false, true)) {  
                      LOG.debug("Command [{0}] key [{1}]  already executed for [{2}]", getName(), getEntityKey(), this.toString());
                        return null;
                    }
                    LOG.trace("Load state for [{0}]", getEntityKey()); 
                   loadState();
                   LOG.trace("Precondition check for command [{0}] key [{1}]", getName(), getEntityKey());
                    verifyPrecondition();
                    LOG.debug("Execute command [{0}] key [{1}]", getName(), getEntityKey());
                    Instrumentation.Cron executeCron = new Instrumentation.Cron();
                    executeCron.start();
                    ret = execute();
                    executeCron.stop();
                    instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".execute", executeCron);
                }
                if (commandQueue != null) {
                    CallableQueueService callableQueueService = Services.get().get(CallableQueueService.class);
                    for (Map.Entry<Long, List<XCommand<?>>> entry : commandQueue.entrySet()) {
                        LOG.debug("Queuing [{0}] commands with delay [{1}]ms", entry.getValue().size(), entry.getKey());
                        if (!callableQueueService.queueSerial(entry.getValue(), entry.getKey())) {
                            LOG.warn("Could not queue [{0}] commands with delay [{1}]ms, queue full", entry.getValue()
                                    .size(), entry.getKey());
                       }
                    }
                }
                return ret;
            }
            finally {
                if (!isSynchronous && isLockRequired() && !this.inInterruptMode()) { 
                   releaseLock();
               }
            }
        }
        catch(PreconditionException pex){ 
           LOG.warn(pex.getMessage().toString() + ", Error Code: " + pex.getErrorCode().toString());
            instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".preconditionfailed", 1);
            return null;
        }
        catch (XException ex) {
            LOG.error("XException, ", ex);
            instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".xexceptions", 1);
            if (ex instanceof CommandException) { 
               throw (CommandException) ex;
            }
            else {
                throw new CommandException(ex);
            }
        }
        catch (Exception ex) {
            LOG.error("Exception, ", ex); 
           instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".exceptions", 1); 
           throw new CommandException(ErrorCode.E0607, getName(), ex.getMessage(), ex);
        } 
       catch (Error er) {
            LOG.error("Error, ", er);  
          instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".errors", 1); 
           throw er;
        }
        finally {
            FaultInjection.deactivate("org.apache.oozie.command.SkipCommitFaultInjection");
            callCron.stop(); 
           instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".call", callCron);
        }}
    
    

    如上是 所有命令的call方法:如果是异步执行命令时候,再执行命令的时候 需要
    先来测试一下此刻的状态是否还有执行命令的必要;所有的call方法在执行 具体的 execute() 时候,都需要装载需要操作的信息,用于更改数据库信息。如果对某个实例的操作不能同时进行,在执行命令之前还需要去获取锁,来保证某一时刻的操作是唯一的。很多情况下,一个命令会衍生出子命令,比如说杀死一个bundle的时候,我需要将bundle的状态置为杀死之外,还要去发送杀死bundle中的coord杀死的命令,这个就是一个命令产生子命令的场景,这个时候,我们也需要将这些子命令加入到异步命令执行池中去直接。

    不同的命令逻辑的执行区别主要体现在不同的子类中的 execute() 中。

    diagram.png

    图中是oozie的整个包含不同业务色彩的命令系统。

    相关文章

      网友评论

          本文标题:Oozie-command

          本文链接:https://www.haomeiwen.com/subject/rdkxjttx.html