美文网首页
Disruptor深度解析-消费者Consumer

Disruptor深度解析-消费者Consumer

作者: solo_sky | 来源:发表于2020-02-01 00:26 被阅读0次

    前言

    上一篇文章介绍了RingBuffer的基本信息,本文将对Disruptor的消费者进行进一步的解析,并对其中可能存在的坑点进行分析;

    消费者继承体系


    从接口体系上来看,消费者主要分为Work和Event两种类型,这两种类型的差别如下:

    • 同一层次的WorkProcessor只有一个可以处理成功RingBuffer的事件,类似消息体系中的点对点模式;
    • 同一层次的BatchEventProcessor并行处理(每一个消费者)成功RingBuffer事件,类似消息体系中的Topic模式

    这两种实现上差别到底在哪里呢,我们进一步进行分析。从接口上,我们可以看到它们都继承了Runnable,所以联想到Disruptor在使用时生成的Executor,我们可以猜测不同的消费者都是在不同的线程中进行处理的,我们直接对其进行分析。

    WorkProcessor

    主要属性:

        // Processor是否已经开始运行
        private final AtomicBoolean running = new AtomicBoolean(false);
        // 序列,初始时为-1
        private final Sequence        sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
        // 持有的RingBuffer引用
        private final RingBuffer<T>   ringBuffer;
        // Processor上游的序列屏障
        private final SequenceBarrier sequenceBarrier;
        // 用户自定义的业务逻辑处理器
        private final WorkHandler<? super T>      workHandler;
        // 异常处理器
        private final ExceptionHandler<? super T> exceptionHandler;
        // 消费的sequence
        private final Sequence                    workSequence;
        
        private final EventReleaser eventReleaser = new EventReleaser() {
            @Override
            public void release() {
                sequence.set(Long.MAX_VALUE);
            }
        };
        // 超时处理器
        private final TimeoutHandler timeoutHandler;
    

    上述关键属性分析下来,主要有sequence和workSequence两个属性有一定的迷惑性,探究下它们是如何来的:

    // 构造方法
    public WorkProcessor(
                final RingBuffer<T> ringBuffer,
                final SequenceBarrier sequenceBarrier,
                final WorkHandler<? super T> workHandler,
                final ExceptionHandler<? super T> exceptionHandler,
                final Sequence workSequence) {
            ...
            // 上游传递进来的
            this.workSequence = workSequence;
            ...
        }
    

    继续往上翻:

    public WorkerPool(
                final RingBuffer<T> ringBuffer,
                final SequenceBarrier sequenceBarrier,
                final ExceptionHandler<? super T> exceptionHandler,
                final WorkHandler<? super T>... workHandlers) {
            this.ringBuffer = ringBuffer;
            final int numWorkers = workHandlers.length;
            workProcessors = new WorkProcessor[numWorkers];
            // 循环构造processor,共享workSequence
            for (int i = 0; i < numWorkers; i++) {
                workProcessors[i] = new WorkProcessor<>(
                        ringBuffer,
                        sequenceBarrier,
                        workHandlers[i],
                        exceptionHandler,
                        workSequence);
            }
        }
    

    可以发现,同一WorkPool的processor共享同一个sequence,因此其实所谓的只有一个能够消费成功本质上依靠的就是同一个sequence(volatile语义)。
    在Disruptor的使用中,我们一般需要手动调用下Disruptor#start()方法来启动整个框架:

      public RingBuffer<T> start(final Executor executor) {
            if (!started.compareAndSet(false, true)) {
                throw new IllegalStateException("WorkerPool has already been started and cannot be restarted until halted.");
            }
            // 生产者cursor,初始时为-1
            final long cursor = ringBuffer.getCursor();
            workSequence.set(cursor);
    
            for (WorkProcessor<?> processor : workProcessors) {
                processor.getSequence().set(cursor);
                executor.execute(processor);
            }
    
            return ringBuffer;
        }
    

    所以由此判断,框架启动时workSequence和sequence的值都与生产者保持一致即-1,此时我们回到WorkProcessor的run()方法。

    public void run() {
            // 判断是否重复启动
            if (!running.compareAndSet(false, true)) {
                throw new IllegalStateException("Thread is already running");
            }
            // 清除警告状态
            sequenceBarrier.clearAlert();
            // 如果有实现LifecycleAware接口,回调其onStart()逻辑
            notifyStart();  
    
            // 上一个sequence的slot是否处理成功,刚开始为-1,所以默认处理成功
            boolean processedSequence = true;
            // 缓存的可用sequence的下标
            long cachedAvailableSequence = Long.MIN_VALUE;
            // 下一个待处理的sequence
            long nextSequence = sequence.get();
            T event = null;
            while (true) {
                try {
                    // 如果上一个处理成功,则更新workSequence的值
                    if (processedSequence) {
                        processedSequence = false;
                        do {
                            // 从当前workSequence的后移1位,为新的需要处理的序列
                            nextSequence = workSequence.get() + 1L;
                            // sequence存储该processor已经处理成功的序列最大值,初始时为-1;
                            sequence.set(nextSequence - 1L);
                        }
                        // workSequence所有processor线程共享同一变量
                        // 假设此时发生并发问题,由于其它线程已经处理成功,那么此处更新失败,则下次nextSequence可能获取到跳跃的值
                        while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence));
                    }
                    // 逻辑走到这里,可以确定一个事情:sequence的值=workSequence的值-1,所以这里workSequence
                    // 本质上代表了该Processor成功抢占成功可以处理的sequence数据
                    // cachedAvailableSequence为等待之后下一个可用的序列
                    if (cachedAvailableSequence >= nextSequence) {
                        // 获取workSequence对应的slot数据
                        event = ringBuffer.get(nextSequence);
                        // 调用用户逻辑进行处理
                        workHandler.onEvent(event);
                        // 处理成功,更新标记为true
                        processedSequence = true;
                    } else {
                        // 说明当前processor抢占到的sequence可用数据超前,需要判断该sequence数据是否可用
                        cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence);
                    }
                } catch (final TimeoutException e) {
                    // 调用对应的TimeoutHandler进行处理
                    notifyTimeout(sequence.get());
                } catch (final AlertException ex) {
                    // 当发生警告通知时,如果该线程状态已经被暂停,则直接中断
                    if (!running.get()) {
                        break;
                    }
                } catch (final Throwable ex) {
                    // 处理异常,将该sequence标识为处理成功  !important
                    exceptionHandler.handleEventException(ex, nextSequence, event);
                    processedSequence = true;
                }
            }
    
            notifyShutdown();
    
            running.set(false);
        }
    

    从run()的代码逻辑可以看出,不同的work消费者都是通过CAS来抢占需要处理的slot数据,每个processor维护了自身已经处理成功的sequence以及大家公共持有的workSequence,同时该processor是否可以处理sequence是由barrier来维护的。有一点需要注意,在exceptionHandler.handleEventException(ex, nextSequence, event);中,默认的异常处理器为FatalExceptionHandler,其打印日志结束后会抛出RuntimeException,从而导致消费者线程中断,所以在实际使用中一定要实现业务自己的ExceptionHandler或者在WorkHandler中自己处理异常

    那么这里每个processor自身的sequence有什么作用?
    思考下如下场景,假设现在有10个消费者,现在有10个slot需要处理,那么极端情况下可能workSequence被更新到了10,但是可能最小的sequence此时为0,说明第一个申请sequence成功的线程还未处理完毕,那么这整批消费者(一个WorkPool)最慢的下标其实就是0,一旦其处理成功,则sequence就有可能被更新为10;所以这里sequence的集合其实就是用来标识整个消费者中最慢的进度;

    关于waitFor

    上面提到了消费者能否处理sequence是由SequenceBarrier#waitFor来决定的,下面探究下该方法的实现机制。

    public long waitFor(final long sequence)
                throws AlertException, InterruptedException, TimeoutException {
            checkAlert();
            // 转移职责,交由waitStrategy处理
            long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
    
            if (availableSequence < sequence) {
                return availableSequence;
            }
            // 该方法在上一期已经分析过,此处不再分析
            return sequencer.getHighestPublishedSequence(sequence, availableSequence);
    }
    ...
    // 以BlockingWaitStrategy为例
    public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)
                throws AlertException, InterruptedException {
            long availableSequence;
            // 要消费的sequence超过了生产者的sequence,则消费者等待;
            if (cursorSequence.get() < sequence) {
                synchronized (mutex) {
                    while (cursorSequence.get() < sequence) {
                        barrier.checkAlert();
                        mutex.wait();
                    }
                }
            }
            // 该processor要消费的sequence超过了上游processor的最小值,自旋等待
            while ((availableSequence = dependentSequence.get()) < sequence) {
                barrier.checkAlert();
                // 尝试寻找实现了onSpinWait的静态方法Thread进行调用
                ThreadHints.onSpinWait();
            }
            // 返回上游最小sequence
            return availableSequence;
    }
    

    BatchEventProcessor

    主要属性

        // 空闲状态
        private static final int IDLE    = 0;
        // 暂停状态
        private static final int HALTED  = IDLE + 1;
        // 运行状态
        private static final int RUNNING = HALTED + 1;
        // 实际运行状态
        private final AtomicInteger               running          = new AtomicInteger(IDLE);
        // 异常处理器
        private       ExceptionHandler<? super T> exceptionHandler = new FatalExceptionHandler();
        // RingBuffer
        private final DataProvider<T>             dataProvider;
        // 屏障
        private final SequenceBarrier             sequenceBarrier;
        // 用户业务逻辑处理器
        private final EventHandler<? super T>     eventHandler;
        // 消费者sequence
        private final Sequence                    sequence         = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
        // 超时处理器
        private final TimeoutHandler              timeoutHandler;
        private final BatchStartAware             batchStartAware;
    

    主要属性与WorkProcessor基本类似,查看其run()方法逻辑:

    private void processEvents() {
            T event = null;
            // 从当前sequence后移一个进行消费
            long nextSequence = sequence.get() + 1L;
    
            while (true) {
                try {
                    // 获取可用的sequence长度
                    final long availableSequence = sequenceBarrier.waitFor(nextSequence);
                    if (batchStartAware != null && availableSequence >= nextSequence) {
                        batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
                    }
                    // 如果当前处理的sequence落后,就循环挨个处理
                    while (nextSequence <= availableSequence) {
                        event = dataProvider.get(nextSequence);
                        eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
                        nextSequence++;
                    }
                    // 处理完毕后进行更新
                    sequence.set(availableSequence);
                } catch (final TimeoutException e) {
                    notifyTimeout(sequence.get());
                } catch (final AlertException ex) {
                    if (running.get() != RUNNING) {
                        break;
                    }
                } catch (final Throwable ex) {
                    exceptionHandler.handleEventException(ex, nextSequence, event);  
                    // 业务逻辑出现异常时,若无新的异常抛出,则更新sequence
                    sequence.set(nextSequence);
                    nextSequence++;
                }
            }
    }
    

    从上面的代码可以看出,假设Disruptor调用halt之后,则该批次数据仍会处理完毕,在新的一轮waitFor判断中抛出AlertException异常;

    关于消费者线程池

    此处把线程池单独拿出来看是有一定原因的,现在Disruptor把显示传入线程池的构造方法置为了@Deprecated,那么我们在使用时应该注意什么呢?
    从前文我们已经大致分析过生产者和消费者是如何协同的,回顾下结论:

    • sequencer会显示维护消费最慢slot的下标;
    • 生产者在发布事件时需要先调用next()进行显示申请或者占用;
    • 消费者线程在消费时会主动调用barrier#waitFor进行判断;
    • 每个消费者占用一个线程;

    这里有一个死锁问题,假设说消费者个数为N,线程个数为M,其中N > M,从前面的代码我们已经分析过,消费者线程的逻辑都是死循环,那么很有可能出现饥饿消费者,即无法被线程池调用,那么从生产者端来看,最慢slot一直未改变,从而导致生产者等待,而生产者等待又会促使消费者waitFor方法无法通过,从而出现互相等待死锁问题;那么Disruptor是如何进行解决的呢,代码如下:

    public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final ThreadFactory threadFactory) {
            this(RingBuffer.createMultiProducer(eventFactory, ringBufferSize), new BasicExecutor(threadFactory));
    }
    ...
    // BaseExecutor
    private final ThreadFactory factory;
    private final Queue<Thread> threads = new ConcurrentLinkedQueue<>();
    
    public BasicExecutor(ThreadFactory factory) {
      this.factory = factory;
    }
    
    @Override
    public void execute(Runnable command) {
      final Thread thread = factory.newThread(command);
      if (null == thread) {
        throw new RuntimeException("Failed to create thread to run: " + command);
      }
    
      thread.start();
    
      threads.add(thread);
    }
    

    代码非常简单,即创建足够消费者使用的线程数量进行消费;
    本文先写到这里,下一篇文章对消费者的等待策略进行具体分析。

    相关文章

      网友评论

          本文标题:Disruptor深度解析-消费者Consumer

          本文链接:https://www.haomeiwen.com/subject/khuzthtx.html