美文网首页
nifi源码剖析-Processor调用处理过程

nifi源码剖析-Processor调用处理过程

作者: 皮皮猿的博客 | 来源:发表于2017-06-15 19:24 被阅读0次

    从AbstractProcessor方法开始

    public abstract class AbstractProcessor extends AbstractSessionFactoryProcessor {
        @Override
        public final void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
            final ProcessSession session = sessionFactory.createSession(); // 创建StandardProcessSession
            try {
                onTrigger(context, session);
                session.commit();  // checkout + commit 
            } catch (final Throwable t) {
                getLogger().error("{} failed to process due to {}; rolling back session", new Object[]{this, t});
                session.rollback(true);
                throw t;
            }
        }
        // 具体到Processor的onTrigger方法
        public abstract void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException;
    }
    

    首先进入每个的Processor的onTrigger()方法,该方法里先会调用session.write()方法,然后调用session.transfer方法
    write方法里构建流

    StandardFlowFileQueue: 队列的putAll方法

    调度的循环开始: TimerDrivenSchedulingAgent->doSchedule()方法(只是一种策略)

    FlowController里设置调度策略和对应agent的关系的map(StandardProcessScheduler)

    相关文章

      网友评论

          本文标题:nifi源码剖析-Processor调用处理过程

          本文链接:https://www.haomeiwen.com/subject/mbyuqxtx.html