美文网首页
Distruptor总结

Distruptor总结

作者: 知止9528 | 来源:发表于2019-01-19 14:51 被阅读55次
    Disruptor结构图.png

    disruptor的核心设计思路

    • 使用循环数组的方式代替队列,使用预先填充数据的方式来避免GC
    • 使用CPU缓存行填充的方式来避免极端情况下的数据争用导致的性能下降
    • 多线程编程中尽量避免锁争用的编码技巧

    生成端减少资源的竞争
    多线程在队列也好,循环数组也好,必然存在对标志位的竞争。无论是使用锁来避免竞争,还是使用cas来进行无锁算法。只要争用的情况存在,并且线程较多,都会出现对资源的不断消耗。争用的对象越多,争用中消耗掉的资源也就越多。为了避免这样的情况,减少争用的资源就是一个手段。比如在循环数组中只保留一个标志位,也就是下一个可以写入数据位置的标志位。而尾部标志位则在各个消费者线程中保存(具体的编程手法后续细讲)

    如果存在多个生产者,则可写入的标志位需要用cas算法来进行争夺,避免锁的使用。多个线程通过cas得到唯一的不冲突的下一个可写序号。由于需要获得序号后才能进行写入,而写入完成才可以让消费者线程进行消费。所以才获得序号后,完成写入前,必须有一种方式让消费者检测是否完成。以避免消费者拿到还未填入输入的数组位。

    具体编程手法
    主要的争夺环节集中在多线程发布中,序号大的线程发布需要等到序号小的线程发布完成后才能发布。那我们的优化的点也在这个地方。如果只有一个地方可以写入完成信息,必然需要争夺。为了避免争夺,我们可以使用标志数组(长度和内容数组相同,每一位表示相同下标的内容数组是否发布)来表示每一个位置是否写入。这样就可以避免发布的争夺(大家的标志位都不在一起了)。这也是Disruptor里面的availableSequence

    但是又来带来一个问题,用什么数字来表示是否已经发布完成?如果只是0和1,那么写过1轮以后,标志数组位上就都是1了。又无法区分。所以标志数组上的数字应该在循环数组的每一轮循环的值都不同。比如一开始都是-1,第一轮中是0的表示已发布,第二轮中是0表示没发布,是1的表示已发布。

    Disruptor中的具体处理
    发布的算法步骤
    1.将序号除以标志数组长度(因为长度是2的次方幂,这一步可以通过右移来完成)得到填入值x。(序号每次都会增长,所以每次都应该不同)
    2.将序号和标志数组长度减一进行并运算得到填入位置index。即类似于hash&(length-1)获取填入位置
    4.将x写入index位置。

    消费端减少对锁的使用
    生产者消费者中最容易出现争夺的,采用的优化手段包括

    • 使用循环数组代替队列,使用cas算法来代替锁争夺
    • 消费者各自保存自己当前已经处理过的序号,而不是将这个序号的信息在队列中来存储,避免多线程争用。
    • 生产者线程则需要持有消费者的类的信息,好用来判断所有消费者中消费的最小的序号,以避免在数据写入时覆盖了某个消费者尚未处理的数据信息。(生成端是怎么来获取消费者消费最小的序号的,后面讲)

    伪共享
    这个主要跟计算机的基本结构。L1、L2、L3分别表示一级缓存、二级缓存、三级缓存,越靠近CPU的缓存,速度越快,容量也越小。所以L1缓存很小但很快,并且紧靠着在使用它的CPU内核;L2大一些,也慢一些,并且仍然只能被一个单独的CPU核使用;L3更大、更慢,并且被单个插槽上的所有CPU核共享;最后是主存,由全部插槽上的所有CPU核共享。

    当CPU执行运算的时候,它先去L1查找所需的数据、再去L2、然后是L3,如果最后这些缓存中都没有,所需的数据就要去主内存拿。走得越远,运算耗费的时间就越长。所以如果你在做一些很频繁的事,你要尽量确保数据在L1缓存中。

    如果不同的线程变量处于同一个缓存行,及时其中一个的缓存数据是最新的,如果另外一个变量需要更新,那么整个缓存行都会更新,具体可以参见计算机MESI协议.

    解决伪共享的一种方法是通过补齐(Padding),使得每一条缓存行只存一个多线程变量
    32位的计算机上一个缓存行是64个字节,而一个Java的long类型变量是8字节,因此在一个缓存行中可以存8个long类型的变量

    CPU每次从主存中拉取数据时,会把相邻的数据也存入同一个cache line

    在访问一个long数组的时候,如果数组中的一个值被加载到缓存中,它会自动加载另外7个
    所以一般都是左右补7个。但这种方法是有局限性就是和计算机位数有关系以及jvm可能会对无用的变量做优化导致补齐并不能生效

    解决伪共享的第二种方法就是使用@ Contended注解,这个是java给我们提供的方案


    下面看源码
    单生产者生产数据
    1.申请写入m个元素
    2.若是有m个元素可以写入,则返回最大的序列号。这儿需要注意的是是否会覆盖未读的元素
    3.若是返回正确,则生产者开始写入元素

    public long next(int n)
        {
            if (n < 1)
            {
                throw new IllegalArgumentException("n must be > 0");
            }
    
            long nextValue = this.nextValue; 上一次生产的
    
            long nextSequence = nextValue + n;  
            long wrapPoint = nextSequence - bufferSize;  同一起跑线进行比较
            long cachedGatingSequence = this.cachedValue;
    
            if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
            {
                cursor.setVolatile(nextValue);  // StoreLoad fence 
    
                long minSequence;
               判断是否覆盖了最慢的消费者
                while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
                {
                    LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?   进行阻塞
                }
    
                this.cachedValue = minSequence;  将最慢的消费者缓存起来
            }
    
            this.nextValue = nextSequence;
    
            return nextSequence;
        }
    
    image.png

    多个生产者

    防止多个线程重复写同一个元素

    解决方案:

    • 每个线程获取不同的一段数组空间进行操作,在分配元素的时候,通过CAS判断一下这段空间是否已经分配出去即可

    防止读取的时候,读到还未写的元素

    解决方案
    引入了一个与Ring Buffer大小相同的buffer
    即位数组available Buffer
    当某个位置写入成功的时候,便把availble Buffer相应的位置置位,标记为写入成功
    读取的时候,会遍历available Buffer,来判断元素是否已经就绪

    代码如下,引入了CAS操作以及原子变量

    public long next(int n)
        {
            if (n < 1)
            {
                throw new IllegalArgumentException("n must be > 0");
            }
    
            long current;
            long next;
    
            do
            {
                current = cursor.get();
                next = current + n;
    
                long wrapPoint = next - bufferSize;
                long cachedGatingSequence = gatingSequenceCache.get();
    
                if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
                {
                    long gatingSequence = Util.getMinimumSequence(gatingSequences, current);  通过这个方法获取到消费端最慢的序号
    
                    if (wrapPoint > gatingSequence)
                    {
                        LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
                        continue;
                    }
    
                    gatingSequenceCache.set(gatingSequence);
                }
                else if (cursor.compareAndSet(current, next))
                {
                    break;
                }
            }
            while (true);
    
            return next;
        }
    

    再来看消费者是怎么消费的

    单消费者

    com.lmax.disruptor.dsl.Disruptor.start
     for循环
     ->com.lmax.disruptor.dsl.EventProcessorInfo.start
       ->com.lmax.disruptor.BatchEventProcessor.run
    

    BatchEventProcessor.run()方法如下

    public void run()
        {
            if (running.compareAndSet(IDLE, RUNNING))
            {
                sequenceBarrier.clearAlert();
    
                notifyStart();
                try
                {
                    if (running.get() == RUNNING)
                    {
                        processEvents();
                    }
                }
                finally
                {
                    notifyShutdown();
                    running.set(IDLE);
                }
            }
            else
            {
                // This is a little bit of guess work.  The running state could of changed to HALTED by
                // this point.  However, Java does not have compareAndExchange which is the only way
                // to get it exactly correct.
                if (running.get() == RUNNING)
                {
                    throw new IllegalStateException("Thread is already running");
                }
                else
                {
                    earlyExit();
                }
            }
        }
    

    processEvents()方法如下

    private void processEvents()
        {
            T event = null;
            long nextSequence = sequence.get() + 1L;
    
            while (true)
            {
                try
                {
                    final long availableSequence = sequenceBarrier.waitFor(nextSequence);  从位数组中获取可消费的序号
                    if (batchStartAware != null)
                    {
                        batchStartAware.onBatchStart(availableSequence - nextSequence + 1);   获取做大的可消费序号,批量消费
                    }
    
                    while (nextSequence <= availableSequence)
                    {
                        event = dataProvider.get(nextSequence);
                        eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);  调用消费者消费的逻辑
                        nextSequence++;
                    }
    
                    sequence.set(availableSequence);
                }
                catch (final TimeoutException e)
                {
                    notifyTimeout(sequence.get());
                }
                catch (final AlertException ex)
                {
                    if (running.get() != RUNNING)
                    {
                        break;
                    }
                }
                catch (final Throwable ex)
                {
                    exceptionHandler.handleEventException(ex, nextSequence, event);
                    sequence.set(nextSequence);
                    nextSequence++;
                }
            }
        }
    

    多消费者情况

    com.lmax.disruptor.dsl.Disruptor.start
     for循环
     ->com.lmax.disruptor.dsl.EventProcessorInfo.start
       ->com.lmax.disruptor.WorkProcessor.run
    

    WorkProcessor.run()方法

    public void run()
        {
            if (!running.compareAndSet(false, true))
            {
                throw new IllegalStateException("Thread is already running");
            }
            sequenceBarrier.clearAlert();
    
            notifyStart();
    
            boolean processedSequence = true;
            long cachedAvailableSequence = Long.MIN_VALUE;
            long nextSequence = sequence.get();  使用了原子变量
            T event = null;
            while (true)
            {
                try
                {
                    // if previous sequence was processed - fetch the next sequence and set
                    // that we have successfully processed the previous sequence
                    // typically, this will be true
                    // this prevents the sequence getting too far forward if an exception
                    // is thrown from the WorkHandler
                  
                    使用CAS来获取可消费的序号
                    if (processedSequence)
                    {
                        processedSequence = false;
                        do
                        {
                            nextSequence = workSequence.get() + 1L;
                            sequence.set(nextSequence - 1L);
                        }
                        while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence));
                    }
    
                    if (cachedAvailableSequence >= nextSequence)
                    {
                        event = ringBuffer.get(nextSequence);
                        workHandler.onEvent(event);
                        processedSequence = true;
                    }
                    else
                    {
                        cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence);
                    }
                }
                catch (final TimeoutException e)
                {
                    notifyTimeout(sequence.get());
                }
                catch (final AlertException ex)
                {
                    if (!running.get())
                    {
                        break;
                    }
                }
                catch (final Throwable ex)
                {
                    // handle, mark as processed, unless the exception handler threw an exception
                    exceptionHandler.handleEventException(ex, nextSequence, event);
                    processedSequence = true;
                }
            }
    
            notifyShutdown();
    
            running.set(false);
        }
    

    所以,单消费者和多消费者的区别是什么?

    单消费者时:

    1.直接获取欲发布的
    long nextSequence = sequence.get() + 1L;
    
    2.满足条件时进行数据处理
    while (nextSequence <= availableSequence)
    {
        event = dataProvider.get(nextSequence);
        eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
        nextSequence++;
    }
    
    3.更新序号
    sequence.set(availableSequence);
    

    这里是不需要考虑并发问题的

    多消费者时

    消费者保持一个自己的序列,每次累加后nextSequence,去获取可访问的最大序列。对于一个生产者,就是nextSequence到RingBuffer当前游标的序列。对于多个生产者,就是nextSequence到RingBuffer当前游标之间,最大的连续的序列集。

    1.使用cas操作获取欲发布的
    do
                        {
                            nextSequence = workSequence.get() + 1L;
                            sequence.set(nextSequence - 1L);//备注  直接在这里更新序号了
                        }
                        while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence));
    
    2.满足条件时进行数据处理
     if (cachedAvailableSequence >= nextSequence)
                    {
                        event = ringBuffer.get(nextSequence);
                        workHandler.onEvent(event);
                        processedSequence = true;
                    }
    
    3.更新序号
    
    

    消费者的等待策略

    名称 措施 适用场景
    BlockingWaitStrategy 加锁 CPU资源紧缺,吞吐量和延迟并不重要的场景
    BusySpinWaitStrategy 自旋 通过不断重试,减少切换线程导致的系统调用,而降低延迟。推荐在线程绑定到固定的CPU的场景下使用
    PhasedBackoffWaitStrategy 自旋 + yield + 自定义策略 CPU资源紧缺,吞吐量和延迟并不重要的场景
    SleepingWaitStrategy 自旋 + yield + sleep 性能和CPU资源之间有很好的折中。延迟不均匀
    TimeoutBlockingWaitStrategy 加锁,有超时限制 CPU资源紧缺,吞吐量和延迟并不重要的场景
    YieldingWaitStrategy 自旋 + yield + 自旋 性能和CPU资源之间有很好的折中。延迟比较均匀

    相关文章

      网友评论

          本文标题:Distruptor总结

          本文链接:https://www.haomeiwen.com/subject/tbqzdqtx.html