美文网首页
Disruptor介绍及原理讲解

Disruptor介绍及原理讲解

作者: 作妖大鹅鹅 | 来源:发表于2019-08-13 17:42 被阅读0次

    简介

    Disruptor是一个无锁有界内存队列开源框架,最大的特点就是性能非常高高高。很多知名项目都有用到它比如我们熟悉的Log4j 2 .

    本文主要介绍它如何做到高性能,以及具体的框架设计。

    为什么性能这么强大?

    主要是因为有这三个法宝:RingBuffer,无锁设计和缓存行填充。

    RingBuffer

    Disruptor底层采用RingBuffer的结构,RingBuffer大家都知道,就是一个循环使用下标的数组嘛。

    image.png

    计算访问下标的时候,通过取余计算 (cursor % size )来得到数组下标。(一个trick是,当size是2的幂的时候,可以用 cursor & (size - 1) 来快速计算下标。所以Disruptor指定size必须是2的幂。)

    用RingBuffer的好处:

    1. 不需要清数据,用新数据去覆盖旧数据,减少GC
    2. 底层是数组,充分利用缓存

    无锁设计

    怎么做到的?

    在Disruptor中,生产者和消费者有各自的游标,用来指导需要写入或读取的位置。

    消费者对节点的唯一操作是读而不是写,因此不需要加锁。

    只有一个生产者的时候,只需要保证生产者的游标不会超过最慢的消费者一圈(即,不会把消费者还没读完的数据覆盖掉)即可,因此不需要锁。

    当有多个生产者时,Disruptor采用CAS来保证游标的线程安全。在整个复杂的框架中,只有这一个地方出现多线程竞争修改同一个变量值。具体的交互在后面讲。

    (另一方面,用volatile来标记游标,采用内存屏障来代替锁?)

    缓存行填充

    先来了解一个概念,“伪共享”。

    image.png

    我们知道计算机有多级的缓存体系,越靠近CPU的缓存,速度越快,容量越小。而缓存由很多cache line组成,每个cache line通常是64bytes,所以一个cache line通常可以缓存8个long变量。从内存中拉取数据的时候,会把相邻的数据都一起加载到缓存中。在某些情况下这个缓存行优势会失效,导致并发速度反而下降了,这种情况称为伪共享。以下是一个典型的例子:

    假设有两个线程Thread1和Thread2,分别在Core1和Core2上运行。有两个变量head和tail由它们共享,Thread1只读写head, Thread2只读写tail。理想情况下它们不应该有干扰,但是我们可以看到,当Thread1写入head以后,其他Core对应的cache line被都置为失效,也就意味着Core2想要读写tail,需要从内存中重新读取,而这显然是一种浪费。

    image

    我们可以通过缓存行填充来解决这类问题:

    class LhsPadding
    {
        protected long p1, p2, p3, p4, p5, p6, p7;
    }
     
    class Value extends LhsPadding
    {
        protected volatile long value;
    }
     
    class RhsPadding extends Value
    {
        protected long p9, p10, p11, p12, p13, p14, p15;
    }
    
    

    在Disruptor中,游标做了类似的处理。

    在由我们自己定义的Event类中,也值得这样做。如果有不同的生产者往不同的字段写入(?),我们要确保各个字段之间不会出现伪共享。

    具体设计

    直觉上理解一下Disruptor的设计:Disruptor通过对游标的管理,保证任何时候只有一个生产者去写一个槽,就省了很多并发问题;只要好好看好游标就行了。

    消费

    先从比较好理解的消费者讲起。

    7

    在Disruptor中,消费者被称为EventProcessor,通过SequenceBarrier和RingBuffer交互。

    如上图Stage2所示,事件处理器的最大序号是16.它向SequenceBarrier调用waitFor(17)以获得17格中的数据。如果没有数据写入RingBuffer,Stage2事件处理器将挂起等待下一个序号。但是,如上图所示的情况,RingBuffer已经被填充到18格,所以waitFor函数将返回18并通知事件处理器,它可以读取包括直到18格在内的数据,如下图所示。

    6

    你应该已经感觉得到,这样做是怎样有助于平缓延迟的峰值了——以前需要逐个节点地询问“我可以拿下一个数据吗?现在可以了么?现在呢?”,消费者现在只需要简单的说“当你拿到的数字比我这个要大的时候请告诉我”,函数返回值会告诉它有多少个新的节点可以读取数据了。因为这些新的节点的确已经写入了数据(Ring Buffer本身的序号已经更新),而且消费者对这些节点的唯一操作是读而不是写,因此访问不用加锁。这太好了,不仅代码实现起来可以更加安全和简单,而且不用加锁使得速度更快。

    另一个好处是——你可以用多个消费者去读同一个RingBuffer ,不需要加锁,也不需要用另外的队列来协调不同的线程(消费者)。这样你可以在Disruptor的协调下实现真正的并发数据处理。

    生产

    向RingBuffer写入数据需要经过两阶段提交。

    只有一个发布者时

    首先,发布者必须确定RingBuffer中下一个可以插入的格, RingBuffer持有最近写入格的序号(下图中的18格),从而确定下一个插入格的序号。RingBuffer通过检查所有EventProcessor正在从RingBuffer中读取的当前序号来判断下一个插入格是否空闲,只需要保证下一个插入格已经被所有EventProcessor读取过即可。

    3

    发现了下一个插入格:

    4

    当发布者得到下一个序号后,它可以获得该格中的对象,并可以对该对象进行任意操作。你可以把格想象成一个简单的可以写入任意值的容器。

    同时,在发布者处理19格数据的时候,RingBuffer的序号依然是18,所以其他事件处理器将不会读到19格中的数据。

    对象的改动保存进了RingBuffer:

    5

    最终,发布者将数据写入19格后,通知RingBuffer发布19格的数据。这时,RingBuffer更新序号并且所有从RingBuffer读数据的事件处理器都可以看到19格中的数据。

    总结一下,发布者需要先申请一个可写入的位置,然后再写入然后提交,这是一个明显的两阶段提交设计。

    有多个发布者时

    单生产者的情况比较简单,当有多个生产者时,申请写入位置的时候就会产生竞争。上文说过,Disruptor采用CAS来保证游标的线程安全。直接上源码吧。这段源码应该是最能体现Disruptor核心设计思想的部分了。

    com.lmax.disruptor.MultiProducerSequencer

    /**
     * @see Sequencer#next(int)
     */
    @Override
    public long next(int n) //生产者申请分配n个位置
    {
        if (n < 1 || n > bufferSize)
        {
            throw new IllegalArgumentException("n must be > 0 and < bufferSize");
        }
     
        long current;
        long next;
     
        do
        {
            current = cursor.get(); // cursor是已经分配的位置的头
            next = current + n; // 待分配位置的头
     
            long wrapPoint = next - bufferSize; // 待分配位置回退一圈的位置。用于和消费者的位置比较,避免把还没有读的位置也分配了
            long cachedGatingSequence = gatingSequenceCache.get(); // 缓存的最慢消费者的位置
     
            if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) // 如果缓存的最慢消费者的位置不符合条件,则重新进入检查
            {
                long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
     
                if (wrapPoint > gatingSequence) // 最慢消费者的位置太小,会被生产者覆盖,因此不能完成分配,所以让出CPU。
                {
                    LockSupport.parkNanos(1);
                    continue;
                }
     
                gatingSequenceCache.set(gatingSequence);
            }
            else if (cursor.compareAndSet(current, next))  // 设置新的游标位置! 由于CAS的特性,多个生产者同时试图修改current游标的时候,只有一个会成功
                                                            // 其他的会重新进入循环,获取新的游标位置继续尝试申请。
            {
                break;
            }
        }
        while (true);
     
        return next;
    }
    

    进阶

    Disruptor Wizard
    提供了一系列API来设置event handlers,并设置它们之间的依赖关系。

    WaitStrategy
    当消费者等待在SequenceBarrier上时,有许多可选的等待策略,不同的等待策略在延迟和CPU资源的占用上有所不同,可以视应用场景选择。

    源码梳理

    3.x的代码,感兴趣的同学可以参考: https://zhanghaoxin.blog.csdn.net/article/category/6121943 ,对源码结构有较详细的讲解。或者参考 Disruptor 3.x源码梳理(简版)

    参考资料

    http://lmax-exchange.github.io/disruptor/files/Disruptor-1.0.pdf 论文

    https://tech.meituan.com/2016/11/18/disruptor.html by美团技术团队

    http://ifeve.com/disruptor/ 官网发布的系列文章的译文(比较老了,很多是根据1.x和2.x的讲解的,但是用来参考核心思想是没问题的)

    相关文章

      网友评论

          本文标题:Disruptor介绍及原理讲解

          本文链接:https://www.haomeiwen.com/subject/wpibjctx.html