美文网首页Disruptor
Disruptor - 介绍(1)

Disruptor - 介绍(1)

作者: 晴天哥_王志 | 来源:发表于2018-08-19 10:59 被阅读64次

    开篇

     Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题。与Kafka、RabbitMQ用于服务间的消息队列不同,disruptor一般用于线程间消息的传递。基于Disruptor开发的系统单线程能支撑每秒600万订单,2010年在QCon演讲后,获得了业界关注。2011年,企业应用软件专家Martin Fowler专门撰写长文介绍The LMAX Architecture。同年它还获得了Oracle官方的Duke大奖。其他关于disruptor的背景就不在此多言,可以自己google。

    Disruptor性能

     disruptor是用于一个JVM中多个线程之间的消息队列,作用与ArrayBlockingQueue有相似之处,但是disruptor从功能、性能都远好于ArrayBlockingQueue,当多个线程之间传递大量数据或对性能要求较高时,可以考虑使用disruptor作为ArrayBlockingQueue的替代者。
     官方也对disruptor和ArrayBlockingQueue的性能在不同的应用场景下做了对比,本文列出其中一组数据,数据中P代表producer,C代表consumer,ABS代表ArrayBlockingQueue,目测性能只有有5~10倍左右的提升

    image

     完整的官方性能测试数据在Performance Results · LMAX-Exchange/disruptor Wiki可以看到,性能测试的代码已经包含在disruptor的代码中,有兴趣的可以直接过去看看。

    Disruptor原理介绍

    Disruptor组成元素图

    Sequence

     Sequence是Disruptor最核心的组件,上面已经提到过了。生产者对RingBuffer的互斥访问,生产者与消费者之间的协调以及消费者之间的协调,都是通过Sequence实现。几乎每一个重要的组件都包含Sequence。那么Sequence是什么呢?首先Sequence是一个递增的序号,说白了就是计数器;其次,由于需要在线程间共享,所以Sequence是引用传递,并且是线程安全的;再次,Sequence支持CAS操作;最后,为了提高效率,Sequence通过padding来避免伪共享。

    RingBuffer

     RingBuffer是存储消息的地方,通过一个名为cursor的Sequence对象指示队列的头,协调多个生产者向RingBuffer中添加消息,并用于在消费者端判断RingBuffer是否为空。巧妙的是,表示队列尾的Sequence并没有在RingBuffer中,而是由消费者维护。这样的好处是多个消费者处理消息的方式更加灵活,可以在一个RingBuffer上实现消息的单播,多播,流水线以及它们的组合。其缺点是在生产者端判断RingBuffer是否已满是需要跟踪更多的信息,为此,在RingBuffer中维护了一个名为gatingSequences的Sequence数组来跟踪相关Seqence。

    SequenceBarrier

     SequenceBarrier用来在消费者之间以及消费者和RingBuffer之间建立依赖关系。在Disruptor中,依赖关系实际上指的是Sequence的大小关系,消费者A依赖于消费者B指的是消费者A的Sequence一定要小于等于消费者B的Sequence,这种大小关系决定了处理某个消息的先后顺序。因为所有消费者都依赖于RingBuffer,所以消费者的Sequence一定小于等于RingBuffer中名为cursor的Sequence,即消息一定是先被生产者放到Ringbuffer中,然后才能被消费者处理。

    SequenceBarrier在初始化的时候会收集需要依赖的组件的Sequence,RingBuffer的cursor会被自动的加入其中。需要依赖其他消费者和/或RingBuffer的消费者在消费下一个消息时,会先等待在SequenceBarrier上,直到所有被依赖的消费者和RingBuffer的Sequence大于等于这个消费者的Sequence。当被依赖的消费者或RingBuffer的Sequence有变化时,会通知SequenceBarrier唤醒等待在它上面的消费者。

    WaitStrategy

     当消费者等待在SequenceBarrier上时,有许多可选的等待策略,不同的等待策略在延迟和CPU资源的占用上有所不同,可以视应用场景选择:

    • BusySpinWaitStrategy : 自旋等待,类似Linux Kernel使用的自旋锁。低延迟但同时对CPU资源的占用也多。
    • BlockingWaitStrategy : 使用锁和条件变量。CPU资源的占用少,延迟大。
    • SleepingWaitStrategy : 在多次循环尝试不成功后,选择让出CPU,等待下次调度,多次调度后仍不成功,尝试前睡眠一个纳秒级别的时间再尝试。这种策略平衡了延迟和CPU资源占用,但延迟不均匀。
    • YieldingWaitStrategy : 在多次循环尝试不成功后,选择让出CPU,等待下次调。平衡了延迟和CPU资源占用,但延迟也比较均匀。
    • PhasedBackoffWaitStrategy : 上面多种策略的综合,CPU资源的占用少,延迟大。

    为什么RingBuffer这么快,首先抛出两个原因:

    • 首先是CPU false sharing的解决,Disruptor通过将基本对象填充冗余基本类型变量来填充满整个缓存行,减少false sharing的概率,这部分没怎么看懂,Disruptor通过填充失效这个效果

    • 无锁队列的实现,对于传统并发队列,至少要维护两个指针,一个头指针和一个尾指针。在并发访问修改时,头指针和尾指针的维护不可避免的应用了锁。Disruptor由于是环状队列,对于Producer而言只有头指针而且锁是乐观锁,在标准Disruptor应用中,只有一个生产者,避免了头指针锁的争用。所以我们可以理解Disruptor为无锁队列。

    RingBuffer整个工作流程

    工作流程图
    • 每个RingBuffer是一个环状队列,队列中每个元素可以理解为一个槽。在初始化时,RingBuffer规定了总大小,就是这个环最多可以容纳多少槽。这里Disruptor规定了,RingBuffer大小必须是2的n次方。这里用了一个小技巧,就是将取模转变为取与运算。在内存管理中,我们常用的就是取余定位操作。如果我们想在Ringbuffer定位,一般会用到某个数字对Ringbuffer的大小取余。如果是对2的n次方取余,则可以简化成m % 2^n = m & ( 2^n - 1 )

    • Producer会向这个RingBuffer中填充元素,填充元素的流程是首先从RingBuffer读取下一个Sequence,之后在这个Sequence位置的槽填充数据,之后发布。

    • Consumer消费RingBuffer中的数据,通过SequenceBarrier来协调不同的Consumer的消费先后顺序,以及获取下一个消费位置Sequence。

    • Producer在RingBuffer写满时,会从头开始继续写替换掉以前的数据。但是如果有SequenceBarrier指向下一个位置,则不会覆盖这个位置,阻塞到这个位置被消费完成。Consumer同理,在所有Barrier被消费完之后,会阻塞到有新的数据进来。

    Disruptor例子

    如何使用 Disruptor ,Disruptor 的 API 十分简单,主要有以下几个步骤:

    • 1、定义事件:事件(Event)就是通过 Disruptor 进行交换的数据类型。

    • 2、定义事件工厂:事件工厂(Event Factory)定义了如何实例化前面第1步中定义的事件(Event),需要实现接口 com.lmax.disruptor.EventFactory<T>。Disruptor 通过 EventFactory 在 RingBuffer 中预创建 Event 的实例。一个 Event 实例实际上被用作一个“数据槽”,发布者发布前,先从 RingBuffer 获得一个 Event 的实例,然后往 Event 实例中填充数据,之后再发布到 RingBuffer 中,之后由 Consumer 获得该 Event 实例并从中读取数据。

    • 3、定义事件处理的具体实现:通过实现接口 com.lmax.disruptor.EventHandler<T> 定义事件处理的具体实现。

    • 4、定义用于事件处理的线程池:Disruptor 通过 java.util.concurrent.ExecutorService 提供的线程来触发 Consumer 的事件处理。

    • 5、指定等待策略:Disruptor 定义了 com.lmax.disruptor.WaitStrategy 接口用于抽象 Consumer 如何等待新事件,这是策略模式的应用。
      Disruptor 提供了多个 WaitStrategy 的实现,每种策略都具有不同性能和优缺点,根据实际运行环境的 CPU 的硬件特点选择恰当的策略,并配合特定的 JVM 的配置参数,能够实现不同的性能提升。例如,BlockingWaitStrategy、SleepingWaitStrategy、YieldingWaitStrategy 等,BlockingWaitStrategy 是最低效的策略,但其对CPU的消耗最小并且在各种不同部署环境中能提供更加一致的性能表现;SleepingWaitStrategy 的性能表现跟 BlockingWaitStrategy 差不多,对 CPU 的消耗也类似,但其对生产者线程的影响最小,适合用于异步日志类似的场景;YieldingWaitStrategy 的性能是最好的,适合用于低延迟的系统。在要求极高性能且事件处理线数小于 CPU 逻辑核心数的场景中,推荐使用此策略;例如,CPU开启超线程的特性。

    • 6、启动 Disruptor

    • 7、发布事件:Disruptor 的事件发布过程是一个两阶段提交的过程:
        第一步:先从 RingBuffer 获取下一个可以写入的事件的序号;
        第二步:获取对应的事件对象,将数据写入事件对象;
        第三部:将事件提交到 RingBuffer;
      事件只有在提交之后才会通知 EventProcessor 进行处理

    • 8、关闭 Disruptor

    Disruptor使用例子源码

    // step_1 定义事件
    public class LongEvent
    {
        private long value;
    
        public void set(long value)
        {
            this.value = value;
        }
    }
    
    
    // step_2 定义事件工厂
    import com.lmax.disruptor.EventFactory;
    
    public class LongEventFactory implements EventFactory<LongEvent>
    {
        public LongEvent newInstance()
        {
            return new LongEvent();
        }
    }
    
    
    // step_3 定义事件处理的具体实现
    import com.lmax.disruptor.EventHandler;
    
    public class LongEventHandler implements EventHandler<LongEvent>
    {
        public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
        {
            System.out.println("Event: " + event);
        }
    }
    
    // step_4 定义用于事件处理的线程池
    ExecutorService executor = Executors.newCachedThreadPool();
    
    // 指定等待策略
    WaitStrategy BLOCKING_WAIT = new BlockingWaitStrategy();
    WaitStrategy SLEEPING_WAIT = new SleepingWaitStrategy();
    WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();
    
    // step_5 启动 Disruptor
    EventFactory<LongEvent> eventFactory = new LongEventFactory();
    ExecutorService executor = Executors.newSingleThreadExecutor();
    int ringBufferSize = 1024 * 1024; // RingBuffer 大小,必须是 2 的 N 次方;
            
    // step_6 发布事件
    Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(eventFactory,
                    ringBufferSize, executor, ProducerType.SINGLE,
                    new YieldingWaitStrategy());
    EventHandler<LongEvent> eventHandler = new LongEventHandler();
    disruptor.handleEventsWith(eventHandler);
    disruptor.start();
    
    
    // step_7 发布事件;
    RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
    long sequence = ringBuffer.next();//请求下一个事件序号;
        
    try {
        LongEvent event = ringBuffer.get(sequence);//获取该序号对应的事件对象;
        long data = getEventData();//获取要通过事件传递的业务数据;
        event.set(data);
    } finally{
        ringBuffer.publish(sequence);//发布事件;
    }
    
    
    // step_8 Disruptor 还提供另外一种形式的调用来简化以上操作,并确保 publish 总是得到调用。
    static class Translator implements EventTranslatorOneArg<LongEvent, Long>{
        @Override
        public void translateTo(LongEvent event, long sequence, Long data) {
            event.set(data);
        }    
    }
        
    public static Translator TRANSLATOR = new Translator();
        
    public static void publishEvent2(Disruptor<LongEvent> disruptor) {
        // 发布事件;
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
        long data = getEventData();//获取要通过事件传递的业务数据;
        ringBuffer.publishEvent(TRANSLATOR, data);
    }
    
    //  step_9 关闭 Disruptor
    disruptor.shutdown();//关闭 disruptor,方法会堵塞,直至所有的事件都得到处理;
    executor.shutdown();//关闭 disruptor 使用的线程池;如果需要的话,必须手动关闭, disruptor 在 shutdown 时不会自动关闭;
    

    Disruptor核心组件源码解析

    Disruptor核心组件 - Disruptor组件

    • RingBuffer<T> ringBuffer用于存储数据对象
    • Executor executor用于保存消费端执行线程service
    • ConsumerRepository保存消费分组信息
    public class Disruptor<T>
    {
        //Disruptor核心变量RingBuffer用于存储变量
        private final RingBuffer<T> ringBuffer;
        // Disruptor用于运行consumer的ExecutorService对象
        private final Executor executor;
        // Disruptor保存的所有消费者信息
        private final ConsumerRepository<T> consumerRepository = new ConsumerRepository<T>();
        // 标记是否已经开始
        private final AtomicBoolean started = new AtomicBoolean(false);
        // 标记异常处理的handler
        private ExceptionHandler<? super T> exceptionHandler = new ExceptionHandlerWrapper<T>();
    
        @Deprecated
        public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final Executor executor)
        {
            this(RingBuffer.createMultiProducer(eventFactory, ringBufferSize), executor);
        }
    
        @Deprecated
        public Disruptor(
            final EventFactory<T> eventFactory,
            final int ringBufferSize,
            final Executor executor,
            final ProducerType producerType,
            final WaitStrategy waitStrategy)
        {
            this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy), executor);
        }
    
        public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final ThreadFactory threadFactory)
        {
            this(RingBuffer.createMultiProducer(eventFactory, ringBufferSize), new BasicExecutor(threadFactory));
        }
    
        public Disruptor(
                final EventFactory<T> eventFactory,
                final int ringBufferSize,
                final ThreadFactory threadFactory,
                final ProducerType producerType,
                final WaitStrategy waitStrategy)
        {
            this(
                RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
                new BasicExecutor(threadFactory));
        }
    
        private Disruptor(final RingBuffer<T> ringBuffer, final Executor executor)
        {
            this.ringBuffer = ringBuffer;
            this.executor = executor;
        }
    }
    

    Disruptor核心组件 - RingBuffer组件

    • RingBuffer内部通过填充解决内存伪共享问题,没看懂!!
    • Object[] entries用来保存数据
    • int bufferSize环形数组大小
    • Sequencer sequencer 分为多生产者和单生产者两种
    abstract class RingBufferPad
    {
        protected long p1, p2, p3, p4, p5, p6, p7;
    }
    
    abstract class RingBufferFields<E> extends RingBufferPad
    {
        //用于填充的对象引用,为什么填充不知道?
        private static final int BUFFER_PAD;
        //entry存储位置相对与array起始位置的偏移量,用于UNSAFE内存操作时进行寻址,注意这个偏移量加上了用于填充的BUFFER_PAD大小
        private static final long REF_ARRAY_BASE;
        //对应对象引用占用内存大小,计算出来的相对位移数,比如对象引用大小是4byte,那么REF_ELEMENT_SHIFT=2,因为2的2次方=4;
        private static final int REF_ELEMENT_SHIFT;
        private static final Unsafe UNSAFE = Util.getUnsafe();
    
        static
        {
            final int scale = UNSAFE.arrayIndexScale(Object[].class);
            if (4 == scale)
            {
                REF_ELEMENT_SHIFT = 2;
            }
            else if (8 == scale)
            {
                REF_ELEMENT_SHIFT = 3;
            }
            else
            {
                throw new IllegalStateException("Unknown pointer size");
            }
            BUFFER_PAD = 128 / scale;
       
            REF_ARRAY_BASE = UNSAFE.arrayBaseOffset(Object[].class) + (BUFFER_PAD << REF_ELEMENT_SHIFT);
        }
    
        private final long indexMask;
        // 用于保存Entry的对象数组,也就是RingBuffer当中保存数据的数据结构
        private final Object[] entries;
        // 环形数据库大小
        protected final int bufferSize;
       // RingBuffer当中的sequencer,分为SingleProducerSequencer和MultiProducerSequencer两类
        protected final Sequencer sequencer;
    
        RingBufferFields(
            EventFactory<E> eventFactory,
            Sequencer sequencer)
        {
            this.sequencer = sequencer;
            this.bufferSize = sequencer.getBufferSize();
    
            if (bufferSize < 1)
            {
                throw new IllegalArgumentException("bufferSize must not be less than 1");
            }
            if (Integer.bitCount(bufferSize) != 1)
            {
                throw new IllegalArgumentException("bufferSize must be a power of 2");
            }
    
            this.indexMask = bufferSize - 1;
            // 2 * BUFFER_PAD代表头尾都需要增加BUFFER_PAD的空间,所以访问空间需要以数组的起始位置+BUFFER_PAD,当然需要转化为字节
            this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
            fill(eventFactory);
        }
    
        private void fill(EventFactory<E> eventFactory)
        {
            for (int i = 0; i < bufferSize; i++)
            {
                entries[BUFFER_PAD + i] = eventFactory.newInstance();
            }
        }
    
        @SuppressWarnings("unchecked")
        protected final E elementAt(long sequence)
        {
            return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));
        }
    }
    
    public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E>
    {
        public static final long INITIAL_CURSOR_VALUE = Sequence.INITIAL_VALUE;
        protected long p1, p2, p3, p4, p5, p6, p7;
    
        RingBuffer(
            EventFactory<E> eventFactory,
            Sequencer sequencer)
        {
            super(eventFactory, sequencer);
        }
    }
    

    Disruptor核心组件 - SingleProducerSequencer

    • 单生产者的Sequenecer对象


      SingleProducerSequencer
    public abstract class AbstractSequencer implements Sequencer
    {
        private static final AtomicReferenceFieldUpdater<AbstractSequencer, Sequence[]> SEQUENCE_UPDATER =
            AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences");
        
        // RingBuffer的大小
        protected final int bufferSize;
        
        // 等待策略
        protected final WaitStrategy waitStrategy;
        
        // 当前RingBuffer对应的油表位置
        protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
        
        //各个消费者持有的取数sequence数组
        protected volatile Sequence[] gatingSequences = new Sequence[0];
    
        public AbstractSequencer(int bufferSize, WaitStrategy waitStrategy)
        {
            if (bufferSize < 1)
            {
                throw new IllegalArgumentException("bufferSize must not be less than 1");
            }
            if (Integer.bitCount(bufferSize) != 1)
            {
                throw new IllegalArgumentException("bufferSize must be a power of 2");
            }
    
            this.bufferSize = bufferSize;
            this.waitStrategy = waitStrategy;
        }
    }
    
    abstract class SingleProducerSequencerPad extends AbstractSequencer
    {
        protected long p1, p2, p3, p4, p5, p6, p7;
    
        public SingleProducerSequencerPad(int bufferSize, WaitStrategy waitStrategy)
        {
            super(bufferSize, waitStrategy);
        }
    }
    
    abstract class SingleProducerSequencerFields extends SingleProducerSequencerPad
    {
        public SingleProducerSequencerFields(int bufferSize, WaitStrategy waitStrategy)
        {
            super(bufferSize, waitStrategy);
        }
    
        protected long nextValue = Sequence.INITIAL_VALUE;
        protected long cachedValue = Sequence.INITIAL_VALUE;
    }
    
    // 适用于单生产者的场景,由于没有实现任何栅栏,使用多线程的生产者进行操作并不安全。
    
    public final class SingleProducerSequencer extends SingleProducerSequencerFields
    {
        protected long p1, p2, p3, p4, p5, p6, p7;
    
        public SingleProducerSequencer(int bufferSize, final WaitStrategy waitStrategy)
        {
            super(bufferSize, waitStrategy);
        }
    }
    

    Sequencer组件-MultiProducerSequencer

    • 多生产者对象MultiProducerSequencer


      MultiProducerSequencer
    public abstract class AbstractSequencer implements Sequencer
    {
        private static final AtomicReferenceFieldUpdater<AbstractSequencer, Sequence[]> SEQUENCE_UPDATER =
            AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences");
        
        // RingBuffer的大小
        protected final int bufferSize;
        
        // 等待策略
        protected final WaitStrategy waitStrategy;
        
        // 当前RingBuffer对应的油表位置
        protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
        
        //各个消费者持有的取数sequence数组
        protected volatile Sequence[] gatingSequences = new Sequence[0];
    
        public AbstractSequencer(int bufferSize, WaitStrategy waitStrategy)
        {
            if (bufferSize < 1)
            {
                throw new IllegalArgumentException("bufferSize must not be less than 1");
            }
            if (Integer.bitCount(bufferSize) != 1)
            {
                throw new IllegalArgumentException("bufferSize must be a power of 2");
            }
    
            this.bufferSize = bufferSize;
            this.waitStrategy = waitStrategy;
        }
    }
    
    public final class MultiProducerSequencer extends AbstractSequencer
    {
        private static final Unsafe UNSAFE = Util.getUnsafe();
        private static final long BASE = UNSAFE.arrayBaseOffset(int[].class);
        private static final long SCALE = UNSAFE.arrayIndexScale(int[].class);
    
        private final Sequence gatingSequenceCache = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
    
        private final int[] availableBuffer;
        private final int indexMask;
        private final int indexShift;
    
        public MultiProducerSequencer(int bufferSize, final WaitStrategy waitStrategy)
        {
            super(bufferSize, waitStrategy);
            availableBuffer = new int[bufferSize];
            indexMask = bufferSize - 1;
            indexShift = Util.log2(bufferSize);
            initialiseAvailableBuffer();
        }
    

    Sequencer组件-Sequence

    • 通过unsafe实现的线程安全对象
    • 负责实现生产者和消费者消费进度的原子技术
    public class Sequence extends RhsPadding
    {
        static final long INITIAL_VALUE = -1L;
        private static final Unsafe UNSAFE;
        private static final long VALUE_OFFSET;
    
        static
        {
            UNSAFE = Util.getUnsafe();
            try
            {
                VALUE_OFFSET = UNSAFE.objectFieldOffset(Value.class.getDeclaredField("value"));
            }
            catch (final Exception e)
            {
                throw new RuntimeException(e);
            }
        }
    
        public Sequence()
        {
            this(INITIAL_VALUE);
        }
    
        public Sequence(final long initialValue)
        {
            UNSAFE.putOrderedLong(this, VALUE_OFFSET, initialValue);
        }
    
        public long get()
        {
            return value;
        }
    
        public void set(final long value)
        {
            UNSAFE.putOrderedLong(this, VALUE_OFFSET, value);
        }
    
        public void setVolatile(final long value)
        {
            UNSAFE.putLongVolatile(this, VALUE_OFFSET, value);
        }
    
        public boolean compareAndSet(final long expectedValue, final long newValue)
        {
            return UNSAFE.compareAndSwapLong(this, VALUE_OFFSET, expectedValue, newValue);
        }
    
        public long incrementAndGet()
        {
            return addAndGet(1L);
        }
    
        public long addAndGet(final long increment)
        {
            long currentValue;
            long newValue;
    
            do
            {
                currentValue = get();
                newValue = currentValue + increment;
            }
            while (!compareAndSet(currentValue, newValue));
    
            return newValue;
        }
    
        @Override
        public String toString()
        {
            return Long.toString(get());
        }
    }
    

    参考文章

    Disruptor 极速体验
    Disruptor3.0的实现细节
    并发框架Disruptor
    原高并发数据结构Disruptor解析
    Disruptor使用指南

    相关文章

      网友评论

        本文标题:Disruptor - 介绍(1)

        本文链接:https://www.haomeiwen.com/subject/aodabftx.html