美文网首页
Disruptor接口总览

Disruptor接口总览

作者: 有个点丶 | 来源:发表于2021-05-28 17:16 被阅读0次
    BatchStartAware

    onBatchStart(long batchSize) BatchEventProcessor引用

    Cursored

    getCursor() 获取当前指针位置,用来动态添加或者移除序列中的序列

    DataProvider

    get(long sequence) 获取指定sequence位置的元素

    EventFactory

    newInstance() 为RingBuffer用来预创建事件填充RingBuffer

    EventHandler

    onEvent(T event, long sequence, boolean endOfBatch) 用来处理有效事件时,指定位置上的事件回调

    EventProcessor

    继承了Runnable接口,会有线程执行它,去RingBuffer中轮询获取事件,根据使用指定的WaitStrategy获取下一个有效的序列。
    getSequence() 获取被这个Processor引用的Sequence
    halt() 通知Processor在消费下一个事件前暂停时停止
    isRunning() 检查EventProccessor状态

    EventReleaseAware

    setEventReleaser(EventReleaser eventReleaser) 用于设置一个EventReleaser
    目前的EventReleaser的实现是将Sequence设置为Long.MAX_VALUE,只有WorkProcessor有用到这两个。

    EventSequencer

    继承了两个接口,DataProvider、Sequenced,Sequenced接口后面会提到

    EventSink

    各种发布事件接口,这些接口发布的事件与EventTranslator,多参EventTranslator有关。

    EventTranslator

    类似事件适配器,将非Event数据,通过translateTo(T event, long sequence),将数据映射到指定的Event中,sequence是这个事件关联的序列号。

    ExceptionHandler

    比较特殊的一种EventHandler,专门用来监听Disruptor抛出的异常。
    handleEventException(Throwable ex, long sequence, T event);
    handleOnStartException(Throwable ex);
    handleOnShutdownException(Throwable ex);

    LifecycleAware

    在EventHandler中实现的接口,且其中的所有方法只会被通知一次,且仅在线程开始前,与线程结束前。
    onStart()
    onShutdown()

    SequenceBarrier

    用来协调发布者的指针位置和依赖序列的EventProcessor的屏障的数据结构
    long waitFor(long sequence) 获取给定序列相关的有效位置,但不一定会是提交的sequence,最终结果由waitStrategy.waitFor或sequencer.getHighestPublishedSequence提供
    long getCursor() 获取当前可以读取数据的指针位置
    boolean isAlerted() 检查当前的alert状态
    void alert() 进入alert状态,且直到调用clearAlert方法为止
    clearAlert() 清除当前的alert状态
    checkAlert() 检查当前alert状态,如果是,将会抛出AlertException异常。

    Sequenced

    getBufferSize() 返回int类型当前RingBuffer的大小
    hasAvailableCapacity(int requiredCapacity) 检查Buffer中是否有足够的requiredCapacity大小的空间,但由于方法是并发,所以只能作为指示。
    remainingCapacity() 获取当前Buffer剩余容量,返回数值同样仅供参考
    next() 实际分配下个事件的有效序列,会阻塞
    next(int n) 一次性分配n个序列,返回值 - (n -1)为此次的首个序列号,会阻塞
    tryNext() 同next(),但是如果空间不足会立刻返回,而next()会等到分配到sequence为止
    tryNext(int n) 同next(int n),也是与tryNext()一样的中断条件
    publish(long sequence) 当事件发布完成时,发布一个序列号
    publish(long lo, long hi) 批量发布序列号,lo与hi分别是next(int n)返回值 - (n - 1) 和 next(int n)返回值

    Sequencer

    协调序列号的访问与追踪的数据结构
    claim(long sequence) 声明指定的序列,一般用来重置Ring Buffer到指定值
    isAvailable(long sequence) 确认指定学列好是否已经发布,且可以使用,不会阻塞。
    addGatingSequences(Sequence... gatingSequences) 将指定的Gating序列号添加到Disruptor中,过程具有线程安全以及原子性。为了防止未处理事件被写入覆盖,所以Sequencer需要监听所有的Processor处理的Sequence。
    removeGatingSequence(Sequence sequence) 移除指定Gating序列,当指定新的EventHandler
    newBarrier(Sequence... sequencesToTrack) 根据提供的序列对象创建一个新的SequenceBarrier,继续追踪参数中提供的Sequence
    getMinimumSequence() 从所有Gating序列中获取最小的序列号
    getHighestPublishedSequence(long nextSequence, long availableSequence) 获取nextSequence到availableSequence区间的有效的最大的序列号,最小是nextSequence -1。
    newPoller(DataProvider<T> provider, Sequence... gatingSequences) 实验性的poller模型消费者的构造方法

    SequenceReportingEventHandler

    继承EventHandler接口,增加一个setSequenceCallback(Sequence sequenceCallback),用来同时sequenceCallback的序列已经被处理了,但是源码中没有相关的流程
    TimeoutHandler onTimeout(long sequence), 消费流程中,WaitStrategy申请序列时,会抛出TimeoutException,这个时候,就会使用TimeoutHandler的onTimeout(long sequence)方法
    LifecycleAware、SequenceReportingEventHandler、BatchStartAware、TimeoutHandler的实现类都需要实现EventHandler接口,在Processor创建时,会使用instanceof判断类型,并设置相关参数。

    WaitStrategy

    可以让EventProcessor能够自定义申请序列指针的等待策略。
    waitFor(long sequence, Sequence cursor, Sequence dependentSequence, SequenceBarrier barrier) 根据策略实现,将会等待提供的序列号有效后返回,也可能会返回小于提供的序列号的值
    signalAllWhenBlocking() 唤醒相关为WaitStrategy中的conditional等待的EventProcessor。

    WorkHandler

    感觉像是阉割版的EventHandler,只不过由WorkProcessor来触发WorkHandler的onEvent。

    Disruptor主要的流程有三条,

    1. 申请sequence
    2. 发布需要被消费的sequence
    3. 消费sequence

    首先,申请sequence流程很好理解,取MultiProducerSequencer中的next(int n)为例:

        public long next(int n)  {
            if (n < 1)  {
                throw new IllegalArgumentException("n must be > 0");
            }
    
            long current;
            long next;
    
            do {
                current = cursor.get();
                next = current + n;
    
                long wrapPoint = next - bufferSize; // 必为负数
                long cachedGatingSequence = gatingSequenceCache.get();// 当前sequencer缓存的已消费序列号
                // 如果current小于cachedGatingSequence
                if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
                {
                    // 说明此时processor与cursor指针不同步,先取当前最小的seq
                    long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
    
                    if (wrapPoint > gatingSequence) // 目前唯一可能出现这种情况就是long类型的值溢出了
                    {
                        LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
                        continue;
                    }
                    gatingSequenceCache.set(gatingSequence);// CAS更新gatingSeq
                }
                else if (cursor.compareAndSet(current, next)) // 如果以消费seq比cursor小,则直接CAS,分配seq
                {
                    break;
                }
            } while (true);
            return next;
        }
    
    

    那么第二步,发布sequence流程,仍然可以在这个类中找到publish()方法:

        // 发布单个
        public void publish(final long sequence) {
            setAvailable(sequence);
            waitStrategy.signalAllWhenBlocking();
        }
        // 批量发布
        public void publish(long lo, long hi) {
            for (long l = lo; l <= hi; l++) {
                setAvailable(l);
            }
            waitStrategy.signalAllWhenBlocking();
        }
    
        private void setAvailable(final long sequence)  {
            setAvailableBufferValue(calculateIndex(sequence), calculateAvailabilityFlag(sequence));
        }
    
        private void setAvailableBufferValue(int index, int flag)  {
            long bufferAddress = (index * SCALE) + BASE;
            UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag);
        }
    
        // indexMask = bufferSize - 1;
        // indexShift = Util.log2(bufferSize);
        private int calculateIndex(final long sequence) {
            return ((int) sequence) & indexMask;
        }
    
        private int calculateAvailabilityFlag(final long sequence) {
            return (int) (sequence >>> indexShift);
        }
    

    最终目的是为了将MultiProducerSequencer中维护的与RingBuffer长度一致的availableBuffer的sequence位置上设置为特定的flag标记。
    完成标记操作后,通过waitStrategy,通知在所有next(int n)方法中阻塞的线程。
    发布操作完成。
    消费流程,则需要到BatchEventProcesser中查看具体实现,EventProcessor是个Runnable的实现类,所以找到run方法,可以发现一个processEvents()方法,其余方法都是相关的监听器触发,可以忽略:

        private void processEvents() {
            T event = null;
            // sequence对象的初始值是-1,也就是消费者最开始的消费起点是0
            long nextSequence = sequence.get() + 1L;
    
            while (true) {// 会一直尝试循环消费,直到waitFor()方法中抛出AlertException时,推出循环
                try {
                    // sequenceBarrier是RingBuffer.newBarrier(Sequence... sequences)创建的
                    // waitFor 获取目前最大的有效seq
                    final long availableSequence = sequenceBarrier.waitFor(nextSequence);
                   // ...... 省略部分代码
                    // 此处通过sequenceBarrier.waitFor(nextSequence),等待或者轮询,直到事件发布到availableSequence为止
                    while (nextSequence <= availableSequence) {// 一直消费到availableSequence下标对应事件为止
                        event = dataProvider.get(nextSequence); // 取RingBuffer获取指定下标的Event
                        // 通知事件回调
                        eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
                        nextSequence++;
                    }
                    sequence.set(availableSequence);
                } 
                // ……省略部分catch
                catch (final Throwable ex) {
                    handleEventException(ex, nextSequence, event);
                    sequence.set(nextSequence);
                    nextSequence++;
                }
            }
        }
    

    notifyStart()notifyShutdown()会触发LifecycleAware中的方法,在处理事件过程中,也会触发BatchStartAware监听。
    另外,RingBuffer在添加EventHandler时,会为每个EventHandler创建一个EventProcessor,
    gatingSequence数组中的值,由EventProcessor的getSequence()方法关联,然后RingBuffer使用addGatingSequences(Sequence... gatingSequences)方法,让gatingSequence与Sequencer建立关联关系。
    当EventProcessor消费完sequence后,会更新Sequence的值,这样自然而然的Sequencer可以之间监听到每个EventProcessor的消费情况,保证多消费者同时消费时未读Event不会被覆盖。

    每个EventProcessor由独立的sequence是为了让每个Event都能够被每个EventProcessor消费。

    相关文章

      网友评论

          本文标题:Disruptor接口总览

          本文链接:https://www.haomeiwen.com/subject/xlofsltx.html