美文网首页
Disruptor深度解析-RingBuffer

Disruptor深度解析-RingBuffer

作者: solo_sky | 来源:发表于2020-02-01 00:25 被阅读0次

    前言

    RingBuffer是Disruptor框架负责数据存储的模块,大部分文章也将其称之为环形缓存区,本文将对其实现原理进行深度探究。

    本文依赖Disruptor版本为3.4.3

    继承体系

    RingBuffer继承体系类图

    实现接口说明

    • DataProvider:获取指定位置的数据;
    • EventSink:发布事件到RingBuffer的接口定义,提供了各种事件发布的操作方法;
    • Cursored:获取当前游标位置;在Disruptor中都是依赖于Sequence的value;
    • Sequenced:提供了RingBuffer相关的sequence判断操作方法,包括获取大小、占用slot、发布等方法;

    继承类说明

    • RingBufferFields:实现RingBuffer内存预分配以及数据定位等功能,此类非常关键,后续进行详细介绍;
    • RingBufferPad:解决缓存行伪共享的填充类;

    RingBuffer初始化流程

    //  RingBuffer的构造方法
    RingBuffer(EventFactory<E> eventFactory, Sequencer sequencer) {
            // 调用父类即RingBufferFields的构造方法
            super(eventFactory, sequencer);
    }
    

    可以看出RingBuffer的初始化流程逻辑均在其父类中,我们详细分析RingBufferFields类的实现逻辑;

    abstract class RingBufferFields<E> extends RingBufferPad {
        // 用来预留填充的单侧slot个数,加速计算
        private static final int    BUFFER_PAD;
        // RingBuffer中首个slot的偏移位置
        private static final long   REF_ARRAY_BASE;
        // log2(scale),该值与BUFFER_PAD作用相似,主要用来加速计算
        private static final int    REF_ELEMENT_SHIFT;
        // 反射拿到UNSAFE对象
        private static final Unsafe UNSAFE = Util.getUnsafe();
    
        static {
            // 返回Object数组中一个元素指针占用的内存大小
            final int scale = UNSAFE.arrayIndexScale(Object[].class);
            // 指针占4个字节
            if (4 == scale) {
                REF_ELEMENT_SHIFT = 2;
                // 指针占8个字节
            } else if (8 == scale) {
                REF_ELEMENT_SHIFT = 3;
            } else {
                throw new IllegalStateException("Unknown pointer size");
            }
            // BUFFER_PAD的取值为16/32,后续计算使用
            BUFFER_PAD = 128 / scale;
            // 计算数组首元素的偏移大小,往后偏移了128个字节,128/8=16
            REF_ARRAY_BASE = UNSAFE.arrayBaseOffset(Object[].class) + 128;
        }
    
        // 数组的最大下标,即bufferSize - 1
        private final   long      indexMask;
        // 对象数组,RingBuffer中实际数据载体
        private final   Object[]  entries;
        // RingBuffer的长度
        protected final int       bufferSize;
        // 序列器,分为单生产者和多生产者,用来在生产者和消费者之间传递数据,此处暂时略过
        protected final Sequencer sequencer;
    
        RingBufferFields(EventFactory<E> eventFactory, Sequencer sequencer) {
            this.sequencer = sequencer;
            this.bufferSize = sequencer.getBufferSize();
    
            if (bufferSize < 1) {
                throw new IllegalArgumentException("bufferSize must not be less than 1");
            }
            // bufferSize必须为2^N,否则进行报错
            if (Integer.bitCount(bufferSize) != 1) {
                throw new IllegalArgumentException("bufferSize must be a power of 2");
            }
            // 最大下标
            this.indexMask = bufferSize - 1;
            // 预留32/64个空槽位
            this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
            /**
             * 一次性填充满整个数组,从BUFFER_PAD的下一位开始;
             */
            fill(eventFactory);
        }
    
        private void fill(EventFactory<E> eventFactory) {
            // 预留BUFFER_PAD个空slot
            for (int i = 0; i < bufferSize; i++) {
                entries[BUFFER_PAD + i] = eventFactory.newInstance();
            }
        }
    
        @SuppressWarnings("unchecked")
        protected final E elementAt(long sequence) {
            // 元素内存偏移量=base + index * scale 
            // REF_ARRAY_BASE= UNSAFE.arrayBaseOffset(Object[].class) + 128;
            // 因为RingBuffer的长度为2^N,所以(sequence & indexMask) = sequence % bufferSize = index
            // index<<REF_ELEMENT_SHIFT = index << (2 ^ (log2(scale))) = index * scale
            return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));
        }
    }
    

    上述代码分析中,有几处关键点如下:

    • 所谓的环形缓冲区实质上就是一个连续数组,只是包装出来了环形的概念;
    • RingBuffer预分配内存时在数组的左右俩侧各预留了BUFFER_PAD个slot;
    • 内存预分配时从BUFFER_PAD处开始,在64位JDK中,假设开启指针压缩,则相当于左侧预留32个slot,右侧预留32个slot;
    • 在RingBuffer长度为2^N时,通过sequence & (bufferSize - 1)加速定位元素实际下标索引,通过结合<<操作实现乘法;
    • 首个元素的内存偏移位置是固定的,即128位,因为BUFFER_PAD * scale=128;

    在MultiProducerSequencer中后续通过>>>实现除法,作用类似;

    为何预留slot?

    在现代CPU中,缓存行伪共享是一个经典问题,即CPU一次load的L1缓存行大小是固定的,如果两个不同的元素被load到同一缓存行中,那么任何一个元素的修改都会导致另一元素在其它CPU核对应的缓存失效,这个问题在很多博文中都有专门讲解,此处不再赘述。引用结论就是单个频繁修改元素尽量不和其它元素在同一缓存行中。
    对于entries对象,左右两侧各预留了128 bytes,这就保证了缓存行在<=128bytes时,entries对象一定不会和其它对象共享缓存行。

    Sequence是什么?

    Sequence是针对RingBuffer中slot位置的一个描述,与AtomicLong的功能几乎一致,其在AtomicLong的基础上解决了缓存行伪共享的问题,其继承关系如下:



    其核心元素为value,根据Java的内存分配,value两次各8个long对象,即大小为8 * 8=64 bytes,那么在缓存行<=64 bytes时,可以避免缓存行伪共享问题。
    下面对其关键的两个方法做下简单分析:

    /**
         * Perform an ordered write of this sequence.  The intent is a Store/Store barrier between this write and any previous store.
         *
         * putOrderedLong在执行时使用store/store内存屏障,因此其在可见性上会存在一定的延时,但是由于其未使用store/load屏障,因此其写入是无阻塞的
         */
        public void set(final long value) {
            UNSAFE.putOrderedLong(this, VALUE_OFFSET, value);
        }
    
        /**
         * Performs a volatile write of this sequence.  The intent is a Store/Store barrier between this write and any previous write and a
         * Store/Load barrier between this write and any subsequent volatile read.
         *
         * 本方法支持volatile语义,即在写入前插入store/store内存屏障,写入后插入store/load内存屏障,因此值的修改在其它线程是立即可见的
         */
        public void setVolatile(final long value) {
            UNSAFE.putLongVolatile(this, VALUE_OFFSET, value);
        }
    

    Sequencer是什么?

    前面讲完了Sequence,是时候推出Sequencer了,Sequencer是Disruptor中非常核心的一个概念,其作为生产者和消费者的桥梁,通过维护两者的sequence来实现数据的扭转;


    sequencer.img

    从继承体系上来看,sequencer主要分为单生产者和多生产者两类,这两者实现了各种free-lock的算法,此处以SingleProducerSequencer为例分析。

    AbstractSequencer

    该类主要属性说明如下:

    // JDK提供的基于反射机制的volatile原子更新工具类,本更新器用来更新gatingSequences属性
    private static final AtomicReferenceFieldUpdater<AbstractSequencer, Sequence[]> SEQUENCE_UPDATER =
                AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences");
    
    // RingBuffer中环形缓冲区的大小
    protected final    int          bufferSize;
    // 消费者等待策略
    protected final    WaitStrategy waitStrategy;
    // 生产者sequence,初始位置为-1
    protected final    Sequence     cursor          = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
    // 消费者sequence集合
    protected volatile Sequence[]   gatingSequences = new Sequence[0];
    

    该类主要方法说明如下:

        /**
         * 增加消费者sequence列表,该方法功能由SequenceGroups代理,本质上还是AtomicReferenceFieldUpdater进行更新
         */
        @Override
        public final void addGatingSequences(Sequence... gatingSequences) {
            SequenceGroups.addSequences(this, SEQUENCE_UPDATER, this, gatingSequences);
        }
    
        /**
         * 移除消费者sequence列表,该方法功能由SequenceGroups代理,本质上还是AtomicReferenceFieldUpdater进行更新
         */
        @Override
        public boolean removeGatingSequence(Sequence sequence) {
            return SequenceGroups.removeSequence(this, SEQUENCE_UPDATER, sequence);
        }
    
        /**
         * 获取消费者sequence列表中最小的索引下标值,cursor.get()为生产者下标值
         */
        @Override
        public long getMinimumSequence() {
            return Util.getMinimumSequence(gatingSequences, cursor.get());
        }
    
    

    SingleProducerSequencer

    SingleProducerSequencer中最主要的属性为nextValue和cachedValue,其中nextValue表示获取下一个可用slot的索引下标,而cachedValue则是当前消费者消费最慢slot的索引下标,下面以next(int n)方法为例分析生产者获取可用sequence的逻辑。

    /**
     * 申请N个可用slot,用来存放生产出来的数据
     * @param n 可用sequence的个数
     */
    public long next(int n) {
            // 申请个数不能<1,不能>bufferSize
            if (n < 1 || n > bufferSize) {
                throw new IllegalArgumentException("n must be > 0 and < bufferSize");
            }
    
            // 当前slot下标,初始时为-1
            long nextValue = this.nextValue;
    
            // 申请可用槽位的最大下标,默认一次申请1个,所以初始时位0
            long nextSequence = nextValue + n;
            // 倒转1轮,与消费者进行比较
            long wrapPoint = nextSequence - bufferSize;
    
            // 消费者中记录的最小slot下标,默认为-1
            long cachedGatingSequence = this.cachedValue;
    
            // wrapPoint > cachedGatingSequence,主要用来判断是否出现环路,即生产者生产速度过快,追上了消费者;
            // cachedGatingSequence > nextValue,主要用来判断是否消费者消费过快,追上了生产者;
            if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) {
                // 设置生产者要使用的slot下标,底层为volatile语义
                cursor.setVolatile(nextValue);  // StoreLoad fence
                
                long minSequence;
                // 重新最小slot值,此处分两种情况讨论:
                // 如果nextValue<gatingSequences最小值,说明消费者消费过快,那么此时wrapPoint=nextValue + n-bufferSize<nextValue,直接跳出循环,消费者处直接等待;
                // 如果nextValue>gatingSequences最小值,那么判断是否出现环路,如果不出现,那么直接分配,否则,生产者等待;
                while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) {
                    // 生产者等待
                    LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
                }
    
                this.cachedValue = minSequence;
            }
            // 更新生产者已使用的slot下标
            this.nextValue = nextSequence;
    
            return nextSequence;
    }
    

    从上述的代码分析来看,申请可用slot主要通过生产者和消费者的sequence进行比较,单生产者与多生产者的差别仅在于多生产者对sequence的更新可能产生争用,因此使用CAS机制进行更新;

    publishEvent

    事件发布也是RingBuffer提供的一个重要功能,其相当于为生产者提供了数据操作的入口,通过前面的继承体系可以看出,事件相关操作是由EventSink接口提供的,下面对其先进行一个简单的分析。

    public void publishEvent(EventTranslator<E> translator) {
            // 占用slot位置
            final long sequence = sequencer.next();
            // 发布事件
            translateAndPublish(translator, sequence);
    }
    ......
    private void translateAndPublish(EventTranslator<E> translator, long sequence) {
            try {
                // 事件转换
                translator.translateTo(get(sequence), sequence);
            } finally {
                // 更新生产者sequence的值
                sequencer.publish(sequence);
            }
    }
    ......
    public void publish(long sequence) {
            // 更新生产者的value
            cursor.set(sequence);
            // 唤醒消费者线程,这个与等待策略有关,此处先不具体讨论
            waitStrategy.signalAllWhenBlocking();
    }
    

    结合上面next()的实现源码,我们大致可以得出如下结论,以单事件发布为例,其是一个两阶段操作

    • 预占用slot,更新sequencer的nextValue,这样下次分配时从新的nextValue往下分配
    • 对slot的具体数据进行更新,同时更新生产者的sequence

    由于是单生产者,这种操作方式不会产生并发问题。但是在多生产者中,逻辑会有问题,比如如果在第二步中更新sequence,在一定程度上会存在顺序错乱的问题,因此在预申请的时候就要更新生产者sequence,同时如何判断数据发布成功了呢?我们后面揭晓。

    MultiProducerSequencer

        // UNSAFE对象实例
        private static final Unsafe UNSAFE = Util.getUnsafe();
        // 获取int数组的首个元素偏移位置
        private static final long   BASE   = UNSAFE.arrayBaseOffset(int[].class);
        // 返回一个int数组中对象指针占用的内存大小
        private static final long   SCALE  = UNSAFE.arrayIndexScale(int[].class);
    
        // 缓存的消费者最慢slot的下标
        private final Sequence gatingSequenceCache = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
    
        // availableBuffer tracks the state of each ringbuffer slot
        // see below for more details on the approach
        private final int[] availableBuffer;
    
        // ringbuffer最大下标
        private final int indexMask;
    
        // log2(bufferSize)
        private final int indexShift;
    

    通过这些属性定义,我们似乎有一点熟悉,比如BASE、SCALE、indexMask、indexShift这些概念,实际上这些值主要也是为了加速计算,其中比较让人懵圈的是int数组availableBuffer,所以可以想象BASE这些主要是为了对其进行内存操作;

    public MultiProducerSequencer(int bufferSize, final WaitStrategy waitStrategy) {
            super(bufferSize, waitStrategy);
            // 初始化availableBuffer大小为RingBuffer的大小
            availableBuffer = new int[bufferSize];
            indexMask = bufferSize - 1;
            indexShift = Util.log2(bufferSize);
            initialiseAvailableBuffer();
    }
    
    private void initialiseAvailableBuffer() {
             // 这个写法有点奇怪,提前分配内存,值为-1
            for (int i = availableBuffer.length - 1; i != 0; i--) {
                setAvailableBufferValue(i, -1);
            }
    
            setAvailableBufferValue(0, -1);
    }
    

    OK,分析到这里,我们大致了解了多生产者的一些关键信息,下面回到在单生产者发布事件的问题上来继续探究,我们知道无论是哪种生产者模式,第一步都是申请可用的slot,那下面直接上多生产者申请slot的代码。

    public long next(int n) {
            // 基础的逻辑判断
            if (n < 1 || n > bufferSize) {
                throw new IllegalArgumentException("n must be > 0 and < bufferSize");
            }
            // 生产者实际的sequence值 
            long current;
            // 申请可用slot的最大值
            long next;
    
            do {
                // 实际sequence,初始时为-1
                current = cursor.get();
                // 申请可用slot的最大值,初始时为0
                next = current + n;
                
                // 下面与单生产者类似
                long wrapPoint = next - bufferSize;
                long cachedGatingSequence = gatingSequenceCache.get();
                // 这里补充一点,cachedGatingSequence是缓存的上一次可能出现环路或消费过快时的消费者最慢slot的下标,正常申请时可能存在多轮未更新
                // 因此理论上cachedGatingSequence可能并不是最新的值,且其值相比实际值较小;
                if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) {
                    // 获取最小的sequence值
                    long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
                    // 如果出现环路,等待然后继续循环
                    if (wrapPoint > gatingSequence) {
                        LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
                        continue;
                    }
                    // 未出现环路,更新消费者的sequence值
                    gatingSequenceCache.set(gatingSequence);
                // CAS更新生产者的sequence,实现free-lock
                } else if (cursor.compareAndSet(current, next)) {
                    // 更新成功跳出
                    break;
                }
            } while (true);
          
            return next;
    }
    

    从上述可以看出,next方法在多生产者中永远都依赖生产者的实际sequence值进行计算,且占用成功后实时更新;

    这里是与单生产者模式的第一个差别,单生产者中计算依赖的是其自身的nextValue值,其通过publish才会实际更新生产者游标,所以单生产者是一个典型的二阶段提交;

    对于多生产者来说,由于生产者游标被实时更新掉了,那消费者依据什么来判断数据已经替换(发布)成功了呢?查看MultiProducerSequencer的publish方法,代码如下:

    public void publish(final long sequence) {
            // 关键代码
            setAvailable(sequence);
            waitStrategy.signalAllWhenBlocking();
    }
    

    继续往下翻看setAvailable的代码:

    private void setAvailable(final long sequence) {
            setAvailableBufferValue(calculateIndex(sequence), calculateAvailabilityFlag(sequence));
    }
    ...
    private int calculateAvailabilityFlag(final long sequence) {
            // 计算值=sequence / bufferSize,即该sequence属于哪一圈
            return (int) (sequence >>> indexShift);
    }
    
    private int calculateIndex(final long sequence) {
            // 计算值=sequence % bufferSize,即该sequence的实际数组下标
            return ((int) sequence) & indexMask;
    }
    // 由上可知,availableBuffer中记录了对应sequence的实际圈数(round)
    private void setAvailableBufferValue(int index, int flag) {
            // 计算偏移量
            long bufferAddress = (index * SCALE) + BASE;
            UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag);
    }
    

    由上我们可知,availableBuffer中记录了最新sequence对应的圈数,这里我们先临时插入一个消费者的代码片段,在Disruptor中,消费者的接口主要为EventProcessor,这里以WorkProcessor为例,代码如下:

    if (cachedAvailableSequence >= nextSequence) {
      // 获取下一个工作序列
      event = ringBuffer.get(nextSequence);
      // 处理序列
      workHandler.onEvent(event);
      // 更新标记为true
      processedSequence = true;
    } else {
      // 获取可处理的最大sequence序列
      cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence);
    }
    ...
    public long waitFor(final long sequence) throws AlertException, InterruptedException, TimeoutException {
      checkAlert();
      // 获取可用的序列
      long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
    
      if (availableSequence < sequence) {
        return availableSequence;
      }
      // 关键在这里 !import
     return sequencer.getHighestPublishedSequence(sequence, availableSequence);
    }
    

    可知消费者消费数据时强依赖sequencer.getHighestPublishedSequence的方法,多生产者的代码实现如下:

    public long getHighestPublishedSequence(long lowerBound, long availableSequence) {
            for (long sequence = lowerBound; sequence <= availableSequence; sequence++) {
                // 关键代码 !import
                if (!isAvailable(sequence)) {
                    return sequence - 1;
                }
            }
            return availableSequence;
    }
    ...
    public boolean isAvailable(long sequence) {
            // %,计算实际的下标
            int index = calculateIndex(sequence);
            // 计算其round
            int flag = calculateAvailabilityFlag(sequence);
            long bufferAddress = (index * SCALE) + BASE;
            // 判断其是否与标识位相同,如果不相同,则代码数据未publish完成
            return UNSAFE.getIntVolatile(availableBuffer, bufferAddress) == flag;
    }
    

    由上整个分析基本结束,即消费者依靠availableBuffer的值来判断该数据是否已经发布完毕;

    总结

    本文主要对RingBuffer的数据结构、生产者以及数据发布流程进行了一个大致的分析,但其中有很多的细节还未来得及分析,比如生产者和消费者之间是如何进行数据联动的,Barrier在其中扮演的角色又是什么,这些留到下一篇文章进行深入探讨。

    相关文章

      网友评论

          本文标题:Disruptor深度解析-RingBuffer

          本文链接:https://www.haomeiwen.com/subject/kfiwthtx.html