美文网首页java收集解读Disruptor系列
解读Disruptor系列--解读源码(3)之消费者

解读Disruptor系列--解读源码(3)之消费者

作者: coder_jerry | 来源:发表于2017-09-29 14:27 被阅读665次

    之前我们已经熟悉了Disruptor的启动和事件生产操作,接下来我们一同探究Disruptor如何消费事件。

    0x00 概念回顾

    我们先回顾下Disruptor消费相关的名词概念:
    Event: Disruptor中传输的事件。
    RingBuffer: 存储和更新事件的容器。
    EventHandler: 用户实现接口,包含消费处理逻辑,代表Disruptor一个消费者。
    EventProcessor: EventProcessor继承了Runnable接口,包含处理Disruptor事件的主循环。

    多播事件: 队列和Disruptor在表现行为上最大的区别。队列中的一个事件只能被一个消费者消费,而Disruptor中的事件会发布给所有消费者。特别适合同一数据的独立并行处理操作。
    消费者依赖图(消费链):同一事件需要被多个消费者消费时,消费者之间可能有依赖关系,如消费者A,B,C,B和C依赖A先执行,但是B和C可以并行消费。

    0x01 EventProcessor接口概览

    OK,咱们正式开始对Disruptor消费者的源码解读。
    Disruptor的消费者依赖EventProcessor循环处理可用事件。EventProcessor顾名思义,就是事件处理器(handle和process都可以翻译为“处理”,但是process侧重于机器的处理,而handle侧重于有人工的处理,所以使用handle表示用户逻辑的处理,使用process表示机器的处理),这个接口有两个实现类,分别是WorkProcessor和BatchEventProcessor,它们对应的逻辑处理消费者分别是EventHandler和WorkHandler。下面是EventProcessor的UML类图及EventHandler和EventProcessor的接口定义。

    image.png
    /**
    * Callback interface to be implemented for processing events as they become available in the {@link RingBuffer}
    *
    * @param <T> event implementation storing the data for sharing during exchange or parallel coordination of an event.
    * @see BatchEventProcessor#setExceptionHandler(ExceptionHandler) if you want to handle exceptions propagated out of the handler.
    * 处理事件的回调接口
    */
    public interface EventHandler<T>
    {
        /**
        * Called when a publisher has published an event to the {@link RingBuffer}
        *
        * @param event      published to the {@link RingBuffer}
        * @param sequence  of the event being processed
        * @param endOfBatch flag to indicate if this is the last event in a batch from the {@link RingBuffer}
        * @throws Exception if the EventHandler would like the exception handled further up the chain.
        */
        void onEvent(T event, long sequence, boolean endOfBatch) throws Exception;
    }
    
    /**
    * EventProcessors waitFor events to become available for consumption from the {@link RingBuffer}
    * <p>
    * An EventProcessor will generally be associated with a Thread for execution.
    * 事件执行器,等待RingBuffer有可用消费事件。一个事件处理器关联一个执行线程
    */
    public interface EventProcessor extends Runnable
    {
        /**
        * Get a reference to the {@link Sequence} being used by this {@link EventProcessor}.
        *
        * @return reference to the {@link Sequence} for this {@link EventProcessor}
        */
        Sequence getSequence();
    
        /**
        * Signal that this EventProcessor should stop when it has finished consuming at the next clean break.
        * It will call {@link SequenceBarrier#alert()} to notify the thread to check status.
        */
        void halt();
    
        boolean isRunning();
    }
    

    EventProcessor接口继承了Runnable接口,主要有两种实现:单线程批量处理BatchEventProcessor和多线程处理WorkProcessor
    在使用Disruptor帮助类构建消费者时,使用handleEventsWith方法传入多个EventHandler,内部使用多个BatchEventProcessor关联多个线程执行。这种情况类似JMS中的发布订阅模式,同一事件会被多个消费者并行消费。适用于同一事件触发多种操作。
    而使用Disruptor的handleEventsWithWorkerPool传入多个WorkHandler时,内部使用多个WorkProcessor关联多个线程执行。这种情况类似JMS的点对点模式,同一事件会被一组消费者其中之一消费。适用于提升消费者并行处理能力。

    0x02 消费技术实现

    我们先回顾下Disruptor消费者的两个特点:消费者依赖图(即下文所谓的“消费链”)和事件多播。
    假设现在有A,B,C,D四个消费者,它们都能组成什么样的形式呢?从众多的排列组合中,我挑了4组比较有代表性的消费链形式。

    image.png
    • 第1组中,消费者A消费按成后,B、C、D可同时消费;
    • 第2组中,消费者A、B、C、D顺序消费;
    • 第3组中,消费者A、B顺序消费后,C、D同时消费;
    • 第4组中,消费者A在消费完成后,B和C可以同时消费,但是必须在都消费完成后,D才能消费。

    标号为1、3、4的消费链都使用了事件多播,可见事件多播属于消费链的一种组合形式。注意,在上面4种组合中,每个组合的每一水平行,都属于一个消费者组。
    这些还只是较为简单的消费链组成,实际中消费链可能会更复杂。
    那么在Disruptor内部是怎么实现消费链的呢?
    我们可以先思考下。如果想把独立的消费者组成消费链,那么后方的消费者(组)必然要知道在它前方的消费者(组)的处理情况,否则就做不到顺序消费。同时,消费者也要了解生产者的位置,来判断是否有可用事件。之前我们分析生产者代码的时候,已经讲过,生产者为了不覆盖没有消费完全的事件,必须知道最慢消费者的处理情况
    做到了这些才会有能力去控制消费者组成消费链。下面让我们具体看Disruptor中的实现。

    0x02.1 使用BatchEventProcessor单线程批处理事件

    在使用BatchEventProcessor时,通过Disruptor#handleEventsWith方法可以获取一个EventHandlerGroup,再通过EventHandlerGroup的and和then方法可以构建一个复杂的消费者链。EventHandlerGroup表示一组事件消费者,内部持有了Disruptor类实例disruptor,其大部分功能都是通过调用disruptor实现,其实可以算作是Disruptor这个辅助类的一部分。

    // EventHandlerGroup.java
    public EventHandlerGroup<T> then(final EventHandler<? super T>... handlers)
    {
        return handleEventsWith(handlers);
    }
    
    public EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers)
    {
        return disruptor.createEventProcessors(sequences, handlers);
    }
    
    // Disruptor.java
    public EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers)
    {
        return createEventProcessors(new Sequence[0], handlers);
    }
    
    // 由EventHandlerGroup调用时,barrierSequences是EventHandlerGroup实例的序列,也就是上一个事件处理者组的序列,作为当前事件处理的门控,防止后边的消费链超前
    // 如果是第一次调用handleEventsWith,则barrierSequences是一个空数组
    EventHandlerGroup<T> **createEventProcessors**(
        final Sequence[] barrierSequences,
        final EventHandler<? super T>[] eventHandlers)
    {
        checkNotStarted();
        // 对应此事件处理器组的序列组
        final Sequence[] processorSequences = new Sequence[eventHandlers.length];
        final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);
    
        for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++)
        {
            final EventHandler<? super T> eventHandler = eventHandlers[i];
            // 批量处理事件的循环
            final BatchEventProcessor<T> batchEventProcessor =
                new BatchEventProcessor<T>(ringBuffer, barrier, eventHandler);
    
            if (exceptionHandler != null)
            {
                batchEventProcessor.setExceptionHandler(exceptionHandler);
            }
    
            consumerRepository.add(batchEventProcessor, eventHandler, barrier);
            processorSequences[i] = batchEventProcessor.getSequence();
        }
        // 每次添加完事件处理器后,更新门控序列,以便后续调用链的添加。(所谓门控,是指后续消费链的消费,不能超过前边。)
        updateGatingSequencesForNextInChain(barrierSequences, processorSequences);
    
        return new EventHandlerGroup<T>(this, consumerRepository, processorSequences);
    }
    
    // 为消费链下一组消费者,更新门控序列
    // barrierSequences是上一组事件处理器组的序列(如果本次是第一次,则为空数组),本组不能超过上组序列值
    // processorSequences是本次要设置的事件处理器组的序列
    private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences)
    {
        if (processorSequences.length > 0)
        {
            ringBuffer.addGatingSequences(processorSequences); // 将本组序列添加到Sequencer中的gatingSequences中
            for (final Sequence barrierSequence : barrierSequences) // 将上组序列从Sequencer中的gatingSequences中,gatingSequences一直保存消费链末端消费者的序列组
            {
                ringBuffer.removeGatingSequence(barrierSequence);
            }
            consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences); // 取消标记上一组消费者为消费链末端
        }
    }
    

    可以看到,使用BatchEventProcessor构建消费者链时的逻辑都在createEventProcessors这个方法中。
    先简单说下ConsumerRepository,这个类主要保存消费者的各种关系,如通过EventHandler引用获取EventProcessorInfo信息,通过Sequence获取ConsumerInfo信息等。因为要使用引用做key,所以数据结构使用IdentityHashMapIdentityHashMap
    和HashMap最大的不同,就是使用==而不是equals比较key
    这个createEventProcessors方法接收两个参数,barrierSequences表示当前消费者组的屏障序列数组,如果当前消费者组是第一组,则取一个空的序列数组;否则,barrierSequences就是上一组消费者组的序列数组。createEventProcessors方法的另一个参数eventHandlers,这个参数是代表事件消费逻辑的EventHandler数组。
    Disruptor为每个EventHandler实现类都创建了一个对应的BatchEventProcessor。
    在构建BatchEventProcessor时需要以下传入三个构造参数:dataProvider是数据存储结构如RingBuffer;sequenceBarrier用于跟踪生产者游标,协调数据处理;eventHandler是用户实现的事件处理器,也就是实际的消费者。
    注意,Disruptor并非为每个BatchEventProcessor都创建一个新的SequenceBarrier,而是每个消费者组共用一个SequenceBarrier
    BatchEventProcessor定义如下。至于为什么要叫做BatchEventProcessor,可以看看在run()方法里每次waitFor获取的availableSequence是当前能够使用的最大值,然后再循环处理这些数据。这样当消费者有瞬时抖动,导致暂时落后生产者时,可在下一次循环中,批量处理所有落后的事件。

    /**
    * Convenience class for handling the batching semantics of consuming entries from a {@link RingBuffer}
    * and delegating the available events to an {@link EventHandler}.
    * <p>
    * If the {@link EventHandler} also implements {@link LifecycleAware} it will be notified just after the thread
    * is started and just before the thread is shutdown.
    *
    * @param <T> event implementation storing the data for sharing during exchange or parallel coordination of an event.
    *
    * 每个EventHandler对应一个EventProcessor执行者,BatchEventProcessor每次大循环可以获取最高可用序号,并循环调用EventHandler
    */
    public final class BatchEventProcessor<T>
        implements EventProcessor
    {
        private final AtomicBoolean running = new AtomicBoolean(false);
        private ExceptionHandler<? super T> exceptionHandler = new FatalExceptionHandler();
        private final DataProvider<T> dataProvider; // 数据提供者,默认是RingBuffer,也可替换为自己的数据结构
        private final SequenceBarrier sequenceBarrier; // 默认为ProcessingSequenceBarrier
        private final EventHandler<? super T> eventHandler; // 此EventProcessor对应的用户自定义的EventHandler实现
        private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); // 当前执行位置
        private final TimeoutHandler timeoutHandler;
        private final BatchStartAware batchStartAware; // 每次循环取得一批可用事件后,在实际处理前调用
    
        /**
        * Construct a {@link EventProcessor} that will automatically track the progress by updating its sequence when
        * the {@link EventHandler#onEvent(Object, long, boolean)} method returns.
        *
        * @param dataProvider    to which events are published.
        * @param sequenceBarrier on which it is waiting.
        * @param eventHandler    is the delegate to which events are dispatched.
        */
        public BatchEventProcessor(
            final DataProvider<T> dataProvider,
            final SequenceBarrier sequenceBarrier,
            final EventHandler<? super T> eventHandler)
        {
            this.dataProvider = dataProvider;
            this.sequenceBarrier = sequenceBarrier;
            this.eventHandler = eventHandler;
    
            if (eventHandler instanceof SequenceReportingEventHandler)
            {
                ((SequenceReportingEventHandler<?>) eventHandler).setSequenceCallback(sequence);
            }
    
            batchStartAware =
                    (eventHandler instanceof BatchStartAware) ? (BatchStartAware) eventHandler : null;
            timeoutHandler =
                    (eventHandler instanceof TimeoutHandler) ? (TimeoutHandler) eventHandler : null;
        }
    
        // ... 省略部分代码
    
        /**
        * It is ok to have another thread rerun this method after a halt().
        *
        * @throws IllegalStateException if this object instance is already running in a thread
        */
        @Override
        public void run()
        {
            if (!running.compareAndSet(false, true))
            {
                throw new IllegalStateException("Thread is already running");
            }
            sequenceBarrier.clearAlert();
    
            notifyStart();
    
            T event = null;
            long nextSequence = sequence.get() + 1L;
            try
            {
                while (true)
                {
                    try
                    {  // availableSequence返回的是可用的最大值
                        final long availableSequence = sequenceBarrier.waitFor(nextSequence); // 使用给定的等待策略去等待下一个序列可用
                        if (batchStartAware != null)
                        {
                            batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
                        }
                        // 批处理在此处得以体现
                        while (nextSequence <= availableSequence)
                        {
                            event = dataProvider.get(nextSequence);
                            eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
                            nextSequence++;
                        }
                        // eventHandler处理完毕后,更新当前序号
                        sequence.set(availableSequence);
                    }
                    catch (final TimeoutException e)
                    {
                        notifyTimeout(sequence.get());
                    }
                    catch (final AlertException ex)
                    {
                        if (!running.get())
                        {
                            break;
                        }
                    }
                    catch (final Throwable ex)
                    {
                        exceptionHandler.handleEventException(ex, nextSequence, event);
                        sequence.set(nextSequence);
                        nextSequence++;
                    }
                }
            }
            finally
            {
                notifyShutdown();
                running.set(false);
            }
        }
    
    }
    

    0x02.2 消费者可用序列屏障-SequenceBarrier

    我们重点看一下SequenceBarrier,可直译为“序列屏障”。SequenceBarrier的主要作用是协调获取消费者可处理到的最大序号,内部持有着生产者和其依赖的消费者序列。它的接口定义如下。

    public interface SequenceBarrier
    {
        /**
        * Wait for the given sequence to be available for consumption.<br>
        * 等待指定序列可用
        * @param sequence to wait for
        * @return the sequence up to which is available
        * @throws AlertException      if a status change has occurred for the Disruptor
        * @throws InterruptedException if the thread needs awaking on a condition variable.
        * @throws TimeoutException
        *
        */
        long waitFor(long sequence) throws AlertException, InterruptedException, TimeoutException;
    
        /**
        * Get the current cursor value that can be read.<br>
        * 获取当前可读游标值
        *
        * @return value of the cursor for entries that have been published.
        *
        */
        long getCursor();
    
        /**
        * The current alert status for the barrier.<br>
        * 当前的alert状态
        *
        * @return true if in alert otherwise false.
        */
        boolean isAlerted();
    
        /**
        * Alert the {@link EventProcessor}s of a status change and stay in this status until cleared.<br>
        *
        * 通知消费者状态变化。当调用EventProcessor#halt()将调用此方法。
        */
        void alert();
    
        /**
        * Clear the current alert status.<br>
        * 清楚alert状态
        */
        void clearAlert();
    
        /**
        * Check if an alert has been raised and throw an {@link AlertException} if it has.
        * 检查是否发生alert,发生将抛出异常
        * @throws AlertException if alert has been raised.
        */
        void checkAlert() throws AlertException;
    }
    

    SequenceBarrier实例引用被EventProcessor持有,用于等待并获取可用的消费事件,主要体现在waitFor这个方法。
    要实现这个功能,需要3点条件:

    1. 知道生产者的位置。
    2. 因为Disruptor支持消费者链,在不同的消费者组之间,要保证后边的消 费者组只有在前消费者组中的消费者都处理完毕后,才能进行处理。
    3. 暂时没有事件可消费,在等待可用消费时,还需要使用某种等待策略进行等待。

    看下SequenceBarrier实现类ProcessingSequenceBarrier的代码是如何实现waitFor方法。

    final class ProcessingSequenceBarrier implements SequenceBarrier
    {
        private final WaitStrategy waitStrategy; // 等待可用消费时,指定的等待策略
        private final Sequence dependentSequence; // 依赖的上组消费者的序号,如果当前为第一组则为cursorSequence(即生产者发布游标序列),否则使用FixedSequenceGroup封装上组消费者序列
        private volatile boolean alerted = false; // 当触发halt时,将标记alerted为true
        private final Sequence cursorSequence; // AbstractSequencer中的cursor引用,记录当前发布者发布的最新位置
        private final Sequencer sequencer; // MultiProducerSequencer 或 SingleProducerSequencer
    
        public ProcessingSequenceBarrier(
            final Sequencer sequencer,
            final WaitStrategy waitStrategy,
            final Sequence cursorSequence,
            final Sequence[] dependentSequences)
        {
            this.sequencer = sequencer;
            this.waitStrategy = waitStrategy;
            this.cursorSequence = cursorSequence;
            if (0 == dependentSequences.length) // 依赖的上一组序列长度,第一次是0
            {
                dependentSequence = cursorSequence;
            }
            else // 将上一组序列数组复制成新数组保存,引用不变
            {
                dependentSequence = new FixedSequenceGroup(dependentSequences);
            }
        }
    
        @Override
        public long waitFor(final long sequence)
            throws AlertException, InterruptedException, TimeoutException
        {
            // 检查是否停止服务
            checkAlert();
            // 获取最大可用序号 sequence为给定序号,一般为当前序号+1,cursorSequence记录生产者最新位置,
            long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
    
            if (availableSequence < sequence)
            {
                return availableSequence;
            }
            // 返回已发布最高的序列值,将对每个序号进行校验
            return sequencer.getHighestPublishedSequence(sequence, availableSequence);
        }
    
        // ... 
    }
    

    0x02.3 该用什么姿势等待可用事件-WaitStrategy

    看来实际的等待操作还是在WaitStrategy#waitFor完成的。

    // WaitStrategy.java
    /**
    * Strategy employed for making {@link EventProcessor}s wait on a cursor {@link Sequence}. <br>
    * 消费者等待可用事件的策略
    */
    public interface WaitStrategy
    {
        /**
        * Wait for the given sequence to be available.  It is possible for this method to return a value
        * less than the sequence number supplied depending on the implementation of the WaitStrategy.  A common
        * use for this is to signal a timeout.  Any EventProcessor that is using a WaitStrategy to get notifications
        * about message becoming available should remember to handle this case.  The {@link BatchEventProcessor} explicitly
        * handles this case and will signal a timeout if required.
        *
        * @param sequence          to be waited on. 给定序号
        * @param cursor            the main sequence from ringbuffer. Wait/notify strategies will
        *                          need this as it's the only sequence that is also notified upon update. 生产者游标
        * @param dependentSequence on which to wait. 依赖的序列,一般是上一个消费者组序列的FixedSequenceGroup封装。如果消费者是第一组,则为cursor。
        * @param barrier          the processor is waiting on. 在等待时需要判断是否对消费者有alert操作
        * @return the sequence that is available which may be greater than the requested sequence.
        * @throws AlertException      if the status of the Disruptor has changed.
        * @throws InterruptedException if the thread is interrupted.
        * @throws TimeoutException
        */
        long waitFor(long sequence, Sequence cursor, Sequence dependentSequence, SequenceBarrier barrier)
            throws AlertException, InterruptedException, TimeoutException;
    
        /**
        * Implementations should signal the waiting {@link EventProcessor}s that the cursor has advanced. <br>
        * 当生产者发布新事件后,将通知等待的EventProcessor。当用锁机制时才会包含相应逻辑。
        */
        void signalAllWhenBlocking();
    }
    
    在各种等待策略中,我们选取阻塞策略研究。
    public final class BlockingWaitStrategy implements WaitStrategy
    {
        private final Lock lock = new ReentrantLock();
        private final Condition processorNotifyCondition = lock.newCondition();
    
        @Override
        public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)
            throws AlertException, InterruptedException
        {
            long availableSequence;
            if (cursorSequence.get() < sequence) // 当前游标小于给定序号,也就是无可用事件
            {
                lock.lock();
                try
                {
                    while (cursorSequence.get() < sequence) // 当给定的序号大于生产者游标序号时,进行等待
                    {
                        barrier.checkAlert();
                        // 循环等待,在Sequencer中publish进行唤醒;等待消费时也会在循环中定时唤醒。
                        // 循环等待的原因,是要检查alert状态。如果不检查将导致不能关闭Disruptor。
                        processorNotifyCondition.await();
                    }
                }
                finally
                {
                    lock.unlock();
                }
            }
    // 给定序号大于上一个消费者组最慢消费者(如当前消费者为第一组则和生产者游标序号比较)序号时,需要等待。不能超前消费上一个消费者组未消费完毕的事件。
    // 那么为什么这里没有锁呢?可以想一下此时的场景,代码运行至此,已能保证生产者有新事件,如果进入循环,说明上一组消费者还未消费完毕。
    // 而通常我们的消费者都是较快完成任务的,所以这里才会考虑使用Busy Spin的方式等待上一组消费者完成消费。
            while ((availableSequence = dependentSequence.get()) < sequence)
            {
                barrier.checkAlert();
            }
    
            return availableSequence;
        }
    
        @Override
        public void signalAllWhenBlocking()
        {
            lock.lock();
            try
            {
                processorNotifyCondition.signalAll();
            }
            finally
            {
                lock.unlock();
            }
        }
    
        @Override
        public String toString()
        {
            return "BlockingWaitStrategy{" +
                "processorNotifyCondition=" + processorNotifyCondition +
                '}';
        }
    }
    

    阻塞等待策略使用Lock+Condition的方式等待生产者生产可用事件,而使用Busy Spin的方式等待可能出现的上一个消费者组未消费完成的情况。
    这里给我们一个提示,在构建低延迟系统时,因为锁的性能消耗,尽量不要使用锁。如果必须要用锁,也要把锁粒度调到最小。
    另外,消费者在等待可用消费事件时,会循环调用barrier.checkAlert(),再去调用锁的条件等待,等待可用消费事件。
    有三个地方可以唤醒等待中的消费线程。两种是在Sequencer实现类中,一是有可用事件发布,通知消费线程继续消费;二是在调用next()获取可用的RingBuffer槽位时,发现RingBuffer满了(生产者速度大于消费者,导致生产者没有可用位置发布事件),将唤醒消费者线程,此功能在3.3.5版本新增(Resignal any waiting threads when trying to publish to a full ring buffer )。开始我百思不得,为什么要在buffer满了的时候不断唤醒消费者线程,直到看到这个issue才明白。大意是在log4j2中使用Disruptor时发生了死锁,为了避免在发布事件时,由于某种原因导致没有通知到消费者,在生产者尝试往一个已满的buffer发布数据时,就会再通知消费者进行消费。而这个bug最终也被Log4j认领,与Disruptor无关。Disruptor这里的再次通知也是为了更加保险。

    //*ProducerSequencer.java
    // next(n)中的代码
    // 由于慢消费者,无可用坑位,只有当消费者消费,向前移动后,才能跳出循环
    // 由于外层判断使用的是缓存的消费者序列最小值,这里使用真实的消费者序列进行判断,并将最新结果在跳出while循环之后进行缓存
    while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
    {  // 唤醒等待的消费者,正常情况下并无意义,只是为了避免极少数情况下未知原因导致的发布时锁机制出现异常,未通知到消费者
        waitStrategy.signalAllWhenBlocking();
        LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
    }
    

    还有一种唤醒就是关闭Disruptor时,消费者关闭前将会处理完当前批次数据(并非RingBuffer的所有数据,而是此次循环取出的最大可用序号以下的所有未处理数据),如果消费者线程当前在等待状态,将被唤醒并终结。
    BatchEventProcessor就讲到这。

    0x02.4 使用WorkProcessor多线程处理事件

    下面说一说WorkHandler+WorkProcessor。
    上面讲过,使用EventHandler+BatchEventProcessor这种方式类似JMS的发布订阅,同一个事件会被不同线程的EventHandler并行消费。那么,如果单线程处理能力不足,想多线程处理同一主题下的不同事件该怎么办呢?这种方式就类似JMS的点到点模式,多个消费者可以监听同一个队列,谁先拿到就归谁处理。
    在Disruptor中使用WorkHandler+WorkProcessor实现以上功能。当需要使用这种模式,可在设置Disruptor消费者时,通过使用handleEventsWithWorkerPool和thenHandleEventsWithWorkerPool设置消费链。

    disruptor
        .handleEventsWithWorkerPool(
          new WorkHandler[]{
              journalHandler,
              journalHandler,
              journalHandler
          }
        )
        .thenHandleEventsWithWorkerPool(resultHandler);
    

    先看下相关的源码。

    // Disruptor
    public EventHandlerGroup<T> handleEventsWithWorkerPool(final WorkHandler<T>... workHandlers)
    {
        return createWorkerPool(new Sequence[0], workHandlers);
    }
    
    EventHandlerGroup<T> createWorkerPool(
        final Sequence[] barrierSequences, final WorkHandler<? super T>[] workHandlers)
    {
        final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(barrierSequences);
        final WorkerPool<T> workerPool = new WorkerPool<T>(ringBuffer, sequenceBarrier, exceptionHandler, workHandlers);
    
        consumerRepository.add(workerPool, sequenceBarrier);
    
        final Sequence[] workerSequences = workerPool.getWorkerSequences();
    
        updateGatingSequencesForNextInChain(barrierSequences, workerSequences);
    
        return new EventHandlerGroup<T>(this, consumerRepository, workerSequences);
    }
    
    // WorkerPool.java WorkerPool构造方法
    public WorkerPool(
        final RingBuffer<T> ringBuffer,
        final SequenceBarrier sequenceBarrier,
        final ExceptionHandler<? super T> exceptionHandler,
        final WorkHandler<? super T>... workHandlers)
    {
        this.ringBuffer = ringBuffer;
        final int numWorkers = workHandlers.length;
        workProcessors = new WorkProcessor[numWorkers];
    
        for (int i = 0; i < numWorkers; i++)
        {
            workProcessors[i] = new WorkProcessor<T>( // 为每个WorkHandler新建一个WorkProcessor
                ringBuffer,
                sequenceBarrier,
                workHandlers[i],
                exceptionHandler,
                workSequence);
        }
    }
    

    在使用线程池处理事件时,与单线程处理相比,最大的不同在于新增了一个WorkerPool。WorkerPool用于管理一组WorkProcessor,它的属性、方法如下。

    image.png

    WorkProcessor的原理和BatchEventProcessor类似,只是多了workSequence用来保存同组共用的处理序列。在更新workSequence时,涉及多线程操作,所以使用CAS进行更新。
    WorkProcessor的run()方法如下。

    @Override
    public void run()
    {
        if (!running.compareAndSet(false, true))
        {
            throw new IllegalStateException("Thread is already running");
        }
        sequenceBarrier.clearAlert();
    
        notifyStart();
    
        boolean processedSequence = true;
        long cachedAvailableSequence = Long.MIN_VALUE;
        long nextSequence = sequence.get();
        T event = null;
        while (true)
        {
            try
            {
                // if previous sequence was processed - fetch the next sequence and set
                // that we have successfully processed the previous sequence
                // typically, this will be true
                // this prevents the sequence getting too far forward if an exception
                // is thrown from the WorkHandler
                if (processedSequence) // 表示nextSequence序号的处理情况(不区分正常或是异常处理)。只有处理过,才能申请下一个序号。
                {
                    processedSequence = false;
                    do
                    {
                        // 同组中多个消费线程有可能会争抢一个序号,使用CAS避免使用锁。
                        // 同一组使用一个workSequence,WorkProcessor不断申请下一个可用序号,对workSequence设置成功才会实际消费。
                        nextSequence = workSequence.get() + 1L;
                        sequence.set(nextSequence - 1L);
                    }
                    while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence));
                }
                // 缓存的可用序号比要处理的序号大,才能进行处理
                if (cachedAvailableSequence >= nextSequence)
                {
                    event = ringBuffer.get(nextSequence);
                    workHandler.onEvent(event);
                    processedSequence = true;
                }
                else // 更新缓存的可用序列。这个cachedAvailableSequence只用在WorkProcessor实例内,不同实例的缓存可能是不一样的
                {     // 和单线程模式类似,返回的也是最大可用序号
                    cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence);
                }
            }
            catch (final TimeoutException e)
            {
                notifyTimeout(sequence.get());
            }
            catch (final AlertException ex)
            {
                if (!running.get())
                {
                    break;
                }
            }
            catch (final Throwable ex)
            {
                // handle, mark as processed, unless the exception handler threw an exception
                exceptionHandler.handleEventException(ex, nextSequence, event);
                processedSequence = true;
            }
        }
    
        notifyShutdown();
    
        running.set(false);
    }
    

    代码逻辑和BatchEventProcessor类似,就不再赘述啦。
    还有一点需要留意,Disruptor通过EventHandlerGroup代表一个消费者组,就表示之前那四张图中一个水平线上的消费者组。这样不同的消费者组之间不必关心各自的实现,从而可以实现更加复杂和灵活的消费链,即依赖图表。

    0x03 消费者小结

    从小语文老师就教育我们写作文要总结,好习惯不能忘~
    本文主要探讨了Disruptor消费者内部概要实现,重点阐述了BatchEventProcessor、WorkProcess的消费代码原理。同时省略了超时通知、开始和结束通知、异常控制等内容,并非不重要,而只是尽量言简意赅,达到抛砖引玉的目的。
    BatchEventProcessor主要用于处理单线程并行任务,同一消费者组的不同消费者会接收相同的事件,并在所有事件处理完毕后进入下一消费者组进行处理(是不是类似JUC里的Phaser、CyclicBarrier或CountDownLatch呢)。WorkProcessor通过WorkerPool管理多个WorkProcessor,达到多线程处理事件的目的,同一消费者组的多个WorkProcessor不会处理同一个事件。通过选择不同的WaitStragegy实现,可以控制消费者在没有可用事件处理时的等待策略。
    好啦,有关Disruptor消费者的分享就到这。
    欢迎大家留言讨论,一同探讨,一同进步。

    相关文章

      网友评论

      • print_log:问下,BatchEventProcessor 为什么一个消息会有被多个消费者并行处理,那这样的话,岂不是会出现 final long availableSequence = sequenceBarrier.waitFor(nextSequence); 多个线程waitFor 一个相同的sequence?
        coder_jerry:这种情况为什么要用一个sequenceBarrier呢?

      本文标题:解读Disruptor系列--解读源码(3)之消费者

      本文链接:https://www.haomeiwen.com/subject/dkeiextx.html