1 EventProcessor 简介
Disruptor的消费者依赖EventProcessor循环处理可用事件。EventProcessor,事件处理器
handle和process都可以翻译为“处理”
但是process侧重于机器的处理
而handle侧重于有人工的处理
所以使用handle表示用户逻辑的处理
使用process表示机器的处理
该接口主要有两个实现类

- WorkProcessor
- BatchEventProcessor
它们对应的逻辑处理消费者分别是
- EventHandler
- WorkHandler
EventProcessor的接口定义
EventProcessor需要是一个runnable的实现,它将使用适当的等待策略从{@link RingBuffer}轮询事件。
不太可能需要自己实现此接口。在第一个实例中使用{@link EventHandler}接口以及预先提供的BatchEventProcessor
事件执行器,等待RingBuffer有可用消费事件。一个事件处理器关联一个执行线程
public interface EventProcessor extends Runnable {
/**
*获取对此{@link EventProcessor}使用的{@link Sequence}的引用。
*
* @return对{@link EventProcessor}的{@link Sequence}的引用
*/
Sequence getSequence();
/**
* 发出此事件处理器在下一次干净休息时消耗完后应该停止的信号。
* 它将调用{@link SequenceBarrier#alert()}来通知线程检查状态。
*/
void halt();
boolean isRunning();
}
EventProcessor接口继承了Runnable接口,主要有两种实现
- 单线程批量处理BatchEventProcessor
- 多线程处理WorkProcessor
2 使用BatchEventProcessor单线程批处理事件
在使用BatchEventProcessor时,通过Disruptor#handleEventsWith方法可以获取一个EventHandlerGroup,再通过EventHandlerGroup的and和then方法可以构建一个复杂的消费者链。EventHandlerGroup表示一组事件消费者,内部持有了Disruptor类实例disruptor,其大部分功能都是通过调用disruptor实现,其实可以算作是Disruptor这个辅助类的一部分。
- EventHandlerGroup.java
设置批处理程序以使用环形缓冲区中的事件。 这些处理程序仅在此组中的每个{@link EventProcessor}处理完事件后处理事件。
该方法通常用作链的一部分。 例如,如果处理程序A必须在处理程序B dw.handleEventsWith(A)之前处理事件。那么(B)
@param处理将处理事件的批处理程序。
@return {@link EventHandlerGroup},可用于在创建的事件处理器上设置事件处理器障碍。
![]()
设置批处理程序以处理来自环形缓冲区的事件。
这些处理程序仅在此组中的每个{@link EventProcessor}处理完事件后处理事件。
该方法通常用作链的一部分。 例如,如果A必须在Bdw.after(A).handleEventsWith(B)之前处理事件
@param处理将处理事件的批处理程序。
@return {@link EventHandlerGroup},可用于在创建的事件处理器上设置事件处理器障碍。
![]()
// 由EventHandlerGroup调用时,barrierSequences是EventHandlerGroup实例的序列,
//也就是上一个事件处理者组的序列,作为当前事件处理的门控,防止后边的消费链超前
// 如果第一次调用handleEventsWith,则barrierSequences为空
EventHandlerGroup<T> createEventProcessors(final Sequence[] barrierSequences,
final EventHandler<? super T>[] eventHandlers) {
checkNotStarted();
// 对应此事件处理器组的序列组
final Sequence[] processorSequences = new Sequence[eventHandlers.length];
final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);
for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++) {
final EventHandler<? super T> eventHandler = eventHandlers[i];
final BatchEventProcessor<T> batchEventProcessor =
new BatchEventProcessor<>(ringBuffer, barrier, eventHandler);
if (exceptionHandler != null) {
batchEventProcessor.setExceptionHandler(exceptionHandler);
}
consumerRepository.add(batchEventProcessor, eventHandler, barrier);
processorSequences[i] = batchEventProcessor.getSequence();
}
// 每次添加完事件处理器后,更新门控序列,以便后续调用链的添加
// 所谓门控,是指后续消费链的消费,不能超过前边
updateGatingSequencesForNextInChain(barrierSequences, processorSequences);
return new EventHandlerGroup<>(this, consumerRepository, processorSequences);
}
// 为消费链下一组消费者,更新门控序列
// barrierSequences是上一组事件处理器组的序列(如果本次是第一次,则为空数组),本组不能超过上组序列值
// processorSequences是本次要设置的事件处理器组的序列
private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences)
{
if (processorSequences.length > 0)
{
// 将本组序列添加到Sequencer中的gatingSequences中
ringBuffer.addGatingSequences(processorSequences);
// 将上组序列从Sequencer中的gatingSequences中,gatingSequences一直保存消费链末端消费者的序列组
for (final Sequence barrierSequence : barrierSequences)
{
ringBuffer.removeGatingSequence(barrierSequence);
}
// 取消标记上一组消费者为消费链末端
consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);
}
}
可以看到,使用BatchEventProcessor构建消费者链时的逻辑都在createEventProcessors
方法中。
ConsumerRepository 类主要保存消费者的各种关系,如通过EventHandler引用获取EventProcessorInfo信息,通过Sequence获取ConsumerInfo信息等。因为要使用引用做key,所以数据结构使用IdentityHashMap。
IdentityHashMap和HashMap最大的不同,就是使用==而不是equals比较key。
这个createEventProcessors方法接收两个参数,barrierSequences表示当前消费者组的屏障序列数组,如果当前消费者组是第一组,则取一个空的序列数组;否则,barrierSequences就是上一组消费者组的序列数组。createEventProcessors方法的另一个参数eventHandlers,这个参数是代表事件消费逻辑的EventHandler数组。
Disruptor为每个EventHandler实现类都创建了一个对应的BatchEventProcessor。
在构建BatchEventProcessor时需要以下传入三个构造参数:dataProvider是数据存储结构如RingBuffer;sequenceBarrier用于跟踪生产者游标,协调数据处理;eventHandler是用户实现的事件处理器,也就是实际的消费者。
注意,Disruptor并非为每个BatchEventProcessor都创建一个新的SequenceBarrier,而是每个消费者组共用一个SequenceBarrier。
BatchEventProcessor定义如下。至于为什么要叫做BatchEventProcessor,可以看看在run()方法里每次waitFor获取的availableSequence是当前能够使用的最大值,然后再循环处理这些数据。这样当消费者有瞬时抖动,导致暂时落后生产者时,可在下一次循环中,批量处理所有落后的事件。
网友评论