美文网首页
nifi源码剖析-Processor调用处理过程

nifi源码剖析-Processor调用处理过程

作者: 皮皮猿的博客 | 来源:发表于2017-06-15 19:24 被阅读0次

从AbstractProcessor方法开始

public abstract class AbstractProcessor extends AbstractSessionFactoryProcessor {
    @Override
    public final void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
        final ProcessSession session = sessionFactory.createSession(); // 创建StandardProcessSession
        try {
            onTrigger(context, session);
            session.commit();  // checkout + commit 
        } catch (final Throwable t) {
            getLogger().error("{} failed to process due to {}; rolling back session", new Object[]{this, t});
            session.rollback(true);
            throw t;
        }
    }
    // 具体到Processor的onTrigger方法
    public abstract void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException;
}

首先进入每个的Processor的onTrigger()方法,该方法里先会调用session.write()方法,然后调用session.transfer方法
write方法里构建流

StandardFlowFileQueue: 队列的putAll方法

调度的循环开始: TimerDrivenSchedulingAgent->doSchedule()方法(只是一种策略)

FlowController里设置调度策略和对应agent的关系的map(StandardProcessScheduler)

相关文章

网友评论

      本文标题:nifi源码剖析-Processor调用处理过程

      本文链接:https://www.haomeiwen.com/subject/mbyuqxtx.html