美文网首页程序员Disruptor
EventProcessor核心架构设计与底层源码深度分析

EventProcessor核心架构设计与底层源码深度分析

作者: JavaEdge | 来源:发表于2019-06-21 01:31 被阅读6次

1 EventProcessor 简介

Disruptor的消费者依赖EventProcessor循环处理可用事件。EventProcessor,事件处理器

handle和process都可以翻译为“处理”
但是process侧重于机器的处理
而handle侧重于有人工的处理
所以使用handle表示用户逻辑的处理
使用process表示机器的处理

该接口主要有两个实现类


  • WorkProcessor
  • BatchEventProcessor

它们对应的逻辑处理消费者分别是

  • EventHandler
  • WorkHandler

EventProcessor的接口定义

EventProcessor需要是一个runnable的实现,它将使用适当的等待策略从{@link RingBuffer}轮询事件。 
不太可能需要自己实现此接口。在第一个实例中使用{@link EventHandler}接口以及预先提供的BatchEventProcessor
事件执行器,等待RingBuffer有可用消费事件。一个事件处理器关联一个执行线程
public interface EventProcessor extends Runnable {
    /**
     *获取对此{@link EventProcessor}使用的{@link Sequence}的引用。
     *
     * @return对{@link EventProcessor}的{@link Sequence}的引用
     */
    Sequence getSequence();

    /**
     * 发出此事件处理器在下一次干净休息时消耗完后应该停止的信号。
     * 它将调用{@link SequenceBarrier#alert()}来通知线程检查状态。
     */
    void halt();

    boolean isRunning();
}

EventProcessor接口继承了Runnable接口,主要有两种实现

  • 单线程批量处理BatchEventProcessor
  • 多线程处理WorkProcessor

2 使用BatchEventProcessor单线程批处理事件

在使用BatchEventProcessor时,通过Disruptor#handleEventsWith方法可以获取一个EventHandlerGroup,再通过EventHandlerGroup的and和then方法可以构建一个复杂的消费者链。EventHandlerGroup表示一组事件消费者,内部持有了Disruptor类实例disruptor,其大部分功能都是通过调用disruptor实现,其实可以算作是Disruptor这个辅助类的一部分。

  • EventHandlerGroup.java

设置批处理程序以使用环形缓冲区中的事件。 这些处理程序仅在此组中的每个{@link EventProcessor}处理完事件后处理事件。
该方法通常用作链的一部分。 例如,如果处理程序A必须在处理程序B dw.handleEventsWith(A)之前处理事件。那么(B)
@param处理将处理事件的批处理程序。
@return {@link EventHandlerGroup},可用于在创建的事件处理器上设置事件处理器障碍。


设置批处理程序以处理来自环形缓冲区的事件。
这些处理程序仅在此组中的每个{@link EventProcessor}处理完事件后处理事件。
该方法通常用作链的一部分。 例如,如果A必须在Bdw.after(A).handleEventsWith(B)之前处理事件
@param处理将处理事件的批处理程序。
@return {@link EventHandlerGroup},可用于在创建的事件处理器上设置事件处理器障碍。


// 由EventHandlerGroup调用时,barrierSequences是EventHandlerGroup实例的序列,
//也就是上一个事件处理者组的序列,作为当前事件处理的门控,防止后边的消费链超前
// 如果第一次调用handleEventsWith,则barrierSequences为空
EventHandlerGroup<T> createEventProcessors(final Sequence[] barrierSequences,
                                           final EventHandler<? super T>[] eventHandlers) {
        checkNotStarted();

        
        // 对应此事件处理器组的序列组
        final Sequence[] processorSequences = new Sequence[eventHandlers.length];
        final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);

        for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++) {
            final EventHandler<? super T> eventHandler = eventHandlers[i];
     
            final BatchEventProcessor<T> batchEventProcessor =
                new BatchEventProcessor<>(ringBuffer, barrier, eventHandler);

            if (exceptionHandler != null) {
                batchEventProcessor.setExceptionHandler(exceptionHandler);
            }

            consumerRepository.add(batchEventProcessor, eventHandler, barrier);
            processorSequences[i] = batchEventProcessor.getSequence();
        }
        // 每次添加完事件处理器后,更新门控序列,以便后续调用链的添加
        // 所谓门控,是指后续消费链的消费,不能超过前边
        updateGatingSequencesForNextInChain(barrierSequences, processorSequences);

        return new EventHandlerGroup<>(this, consumerRepository, processorSequences);
    }

// 为消费链下一组消费者,更新门控序列
// barrierSequences是上一组事件处理器组的序列(如果本次是第一次,则为空数组),本组不能超过上组序列值
// processorSequences是本次要设置的事件处理器组的序列
    private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences)
    {
        if (processorSequences.length > 0)
        {
            // 将本组序列添加到Sequencer中的gatingSequences中
            ringBuffer.addGatingSequences(processorSequences);
             // 将上组序列从Sequencer中的gatingSequences中,gatingSequences一直保存消费链末端消费者的序列组
            for (final Sequence barrierSequence : barrierSequences)
            {
                ringBuffer.removeGatingSequence(barrierSequence);
            }
            // 取消标记上一组消费者为消费链末端
            consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);
        }
    }

可以看到,使用BatchEventProcessor构建消费者链时的逻辑都在createEventProcessors方法中。

ConsumerRepository 类主要保存消费者的各种关系,如通过EventHandler引用获取EventProcessorInfo信息,通过Sequence获取ConsumerInfo信息等。因为要使用引用做key,所以数据结构使用IdentityHashMap。
IdentityHashMap和HashMap最大的不同,就是使用==而不是equals比较key。
这个createEventProcessors方法接收两个参数,barrierSequences表示当前消费者组的屏障序列数组,如果当前消费者组是第一组,则取一个空的序列数组;否则,barrierSequences就是上一组消费者组的序列数组。createEventProcessors方法的另一个参数eventHandlers,这个参数是代表事件消费逻辑的EventHandler数组。
Disruptor为每个EventHandler实现类都创建了一个对应的BatchEventProcessor。
在构建BatchEventProcessor时需要以下传入三个构造参数:dataProvider是数据存储结构如RingBuffer;sequenceBarrier用于跟踪生产者游标,协调数据处理;eventHandler是用户实现的事件处理器,也就是实际的消费者。
注意,Disruptor并非为每个BatchEventProcessor都创建一个新的SequenceBarrier,而是每个消费者组共用一个SequenceBarrier。
BatchEventProcessor定义如下。至于为什么要叫做BatchEventProcessor,可以看看在run()方法里每次waitFor获取的availableSequence是当前能够使用的最大值,然后再循环处理这些数据。这样当消费者有瞬时抖动,导致暂时落后生产者时,可在下一次循环中,批量处理所有落后的事件。

参考

解读Disruptor系列--解读源码(3)之消费者

相关文章

网友评论

    本文标题:EventProcessor核心架构设计与底层源码深度分析

    本文链接:https://www.haomeiwen.com/subject/tepsqctx.html