从AbstractProcessor方法开始
public abstract class AbstractProcessor extends AbstractSessionFactoryProcessor {
@Override
public final void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
final ProcessSession session = sessionFactory.createSession(); // 创建StandardProcessSession
try {
onTrigger(context, session);
session.commit(); // checkout + commit
} catch (final Throwable t) {
getLogger().error("{} failed to process due to {}; rolling back session", new Object[]{this, t});
session.rollback(true);
throw t;
}
}
// 具体到Processor的onTrigger方法
public abstract void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException;
}
首先进入每个的Processor的onTrigger()方法,该方法里先会调用session.write()方法,然后调用session.transfer方法
write方法里构建流
StandardFlowFileQueue: 队列的putAll方法
调度的循环开始: TimerDrivenSchedulingAgent->doSchedule()方法(只是一种策略)
FlowController里设置调度策略和对应agent的关系的map(StandardProcessScheduler)
网友评论