美文网首页
Distruptor总结

Distruptor总结

作者: 知止9528 | 来源:发表于2019-01-19 14:51 被阅读55次
Disruptor结构图.png

disruptor的核心设计思路

  • 使用循环数组的方式代替队列,使用预先填充数据的方式来避免GC
  • 使用CPU缓存行填充的方式来避免极端情况下的数据争用导致的性能下降
  • 多线程编程中尽量避免锁争用的编码技巧

生成端减少资源的竞争
多线程在队列也好,循环数组也好,必然存在对标志位的竞争。无论是使用锁来避免竞争,还是使用cas来进行无锁算法。只要争用的情况存在,并且线程较多,都会出现对资源的不断消耗。争用的对象越多,争用中消耗掉的资源也就越多。为了避免这样的情况,减少争用的资源就是一个手段。比如在循环数组中只保留一个标志位,也就是下一个可以写入数据位置的标志位。而尾部标志位则在各个消费者线程中保存(具体的编程手法后续细讲)

如果存在多个生产者,则可写入的标志位需要用cas算法来进行争夺,避免锁的使用。多个线程通过cas得到唯一的不冲突的下一个可写序号。由于需要获得序号后才能进行写入,而写入完成才可以让消费者线程进行消费。所以才获得序号后,完成写入前,必须有一种方式让消费者检测是否完成。以避免消费者拿到还未填入输入的数组位。

具体编程手法
主要的争夺环节集中在多线程发布中,序号大的线程发布需要等到序号小的线程发布完成后才能发布。那我们的优化的点也在这个地方。如果只有一个地方可以写入完成信息,必然需要争夺。为了避免争夺,我们可以使用标志数组(长度和内容数组相同,每一位表示相同下标的内容数组是否发布)来表示每一个位置是否写入。这样就可以避免发布的争夺(大家的标志位都不在一起了)。这也是Disruptor里面的availableSequence

但是又来带来一个问题,用什么数字来表示是否已经发布完成?如果只是0和1,那么写过1轮以后,标志数组位上就都是1了。又无法区分。所以标志数组上的数字应该在循环数组的每一轮循环的值都不同。比如一开始都是-1,第一轮中是0的表示已发布,第二轮中是0表示没发布,是1的表示已发布。

Disruptor中的具体处理
发布的算法步骤
1.将序号除以标志数组长度(因为长度是2的次方幂,这一步可以通过右移来完成)得到填入值x。(序号每次都会增长,所以每次都应该不同)
2.将序号和标志数组长度减一进行并运算得到填入位置index。即类似于hash&(length-1)获取填入位置
4.将x写入index位置。

消费端减少对锁的使用
生产者消费者中最容易出现争夺的,采用的优化手段包括

  • 使用循环数组代替队列,使用cas算法来代替锁争夺
  • 消费者各自保存自己当前已经处理过的序号,而不是将这个序号的信息在队列中来存储,避免多线程争用。
  • 生产者线程则需要持有消费者的类的信息,好用来判断所有消费者中消费的最小的序号,以避免在数据写入时覆盖了某个消费者尚未处理的数据信息。(生成端是怎么来获取消费者消费最小的序号的,后面讲)

伪共享
这个主要跟计算机的基本结构。L1、L2、L3分别表示一级缓存、二级缓存、三级缓存,越靠近CPU的缓存,速度越快,容量也越小。所以L1缓存很小但很快,并且紧靠着在使用它的CPU内核;L2大一些,也慢一些,并且仍然只能被一个单独的CPU核使用;L3更大、更慢,并且被单个插槽上的所有CPU核共享;最后是主存,由全部插槽上的所有CPU核共享。

当CPU执行运算的时候,它先去L1查找所需的数据、再去L2、然后是L3,如果最后这些缓存中都没有,所需的数据就要去主内存拿。走得越远,运算耗费的时间就越长。所以如果你在做一些很频繁的事,你要尽量确保数据在L1缓存中。

如果不同的线程变量处于同一个缓存行,及时其中一个的缓存数据是最新的,如果另外一个变量需要更新,那么整个缓存行都会更新,具体可以参见计算机MESI协议.

解决伪共享的一种方法是通过补齐(Padding),使得每一条缓存行只存一个多线程变量
32位的计算机上一个缓存行是64个字节,而一个Java的long类型变量是8字节,因此在一个缓存行中可以存8个long类型的变量

CPU每次从主存中拉取数据时,会把相邻的数据也存入同一个cache line

在访问一个long数组的时候,如果数组中的一个值被加载到缓存中,它会自动加载另外7个
所以一般都是左右补7个。但这种方法是有局限性就是和计算机位数有关系以及jvm可能会对无用的变量做优化导致补齐并不能生效

解决伪共享的第二种方法就是使用@ Contended注解,这个是java给我们提供的方案


下面看源码
单生产者生产数据
1.申请写入m个元素
2.若是有m个元素可以写入,则返回最大的序列号。这儿需要注意的是是否会覆盖未读的元素
3.若是返回正确,则生产者开始写入元素

public long next(int n)
    {
        if (n < 1)
        {
            throw new IllegalArgumentException("n must be > 0");
        }

        long nextValue = this.nextValue; 上一次生产的

        long nextSequence = nextValue + n;  
        long wrapPoint = nextSequence - bufferSize;  同一起跑线进行比较
        long cachedGatingSequence = this.cachedValue;

        if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
        {
            cursor.setVolatile(nextValue);  // StoreLoad fence 

            long minSequence;
           判断是否覆盖了最慢的消费者
            while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
            {
                LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?   进行阻塞
            }

            this.cachedValue = minSequence;  将最慢的消费者缓存起来
        }

        this.nextValue = nextSequence;

        return nextSequence;
    }
image.png

多个生产者

防止多个线程重复写同一个元素

解决方案:

  • 每个线程获取不同的一段数组空间进行操作,在分配元素的时候,通过CAS判断一下这段空间是否已经分配出去即可

防止读取的时候,读到还未写的元素

解决方案
引入了一个与Ring Buffer大小相同的buffer
即位数组available Buffer
当某个位置写入成功的时候,便把availble Buffer相应的位置置位,标记为写入成功
读取的时候,会遍历available Buffer,来判断元素是否已经就绪

代码如下,引入了CAS操作以及原子变量

public long next(int n)
    {
        if (n < 1)
        {
            throw new IllegalArgumentException("n must be > 0");
        }

        long current;
        long next;

        do
        {
            current = cursor.get();
            next = current + n;

            long wrapPoint = next - bufferSize;
            long cachedGatingSequence = gatingSequenceCache.get();

            if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
            {
                long gatingSequence = Util.getMinimumSequence(gatingSequences, current);  通过这个方法获取到消费端最慢的序号

                if (wrapPoint > gatingSequence)
                {
                    LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
                    continue;
                }

                gatingSequenceCache.set(gatingSequence);
            }
            else if (cursor.compareAndSet(current, next))
            {
                break;
            }
        }
        while (true);

        return next;
    }

再来看消费者是怎么消费的

单消费者

com.lmax.disruptor.dsl.Disruptor.start
 for循环
 ->com.lmax.disruptor.dsl.EventProcessorInfo.start
   ->com.lmax.disruptor.BatchEventProcessor.run

BatchEventProcessor.run()方法如下

public void run()
    {
        if (running.compareAndSet(IDLE, RUNNING))
        {
            sequenceBarrier.clearAlert();

            notifyStart();
            try
            {
                if (running.get() == RUNNING)
                {
                    processEvents();
                }
            }
            finally
            {
                notifyShutdown();
                running.set(IDLE);
            }
        }
        else
        {
            // This is a little bit of guess work.  The running state could of changed to HALTED by
            // this point.  However, Java does not have compareAndExchange which is the only way
            // to get it exactly correct.
            if (running.get() == RUNNING)
            {
                throw new IllegalStateException("Thread is already running");
            }
            else
            {
                earlyExit();
            }
        }
    }

processEvents()方法如下

private void processEvents()
    {
        T event = null;
        long nextSequence = sequence.get() + 1L;

        while (true)
        {
            try
            {
                final long availableSequence = sequenceBarrier.waitFor(nextSequence);  从位数组中获取可消费的序号
                if (batchStartAware != null)
                {
                    batchStartAware.onBatchStart(availableSequence - nextSequence + 1);   获取做大的可消费序号,批量消费
                }

                while (nextSequence <= availableSequence)
                {
                    event = dataProvider.get(nextSequence);
                    eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);  调用消费者消费的逻辑
                    nextSequence++;
                }

                sequence.set(availableSequence);
            }
            catch (final TimeoutException e)
            {
                notifyTimeout(sequence.get());
            }
            catch (final AlertException ex)
            {
                if (running.get() != RUNNING)
                {
                    break;
                }
            }
            catch (final Throwable ex)
            {
                exceptionHandler.handleEventException(ex, nextSequence, event);
                sequence.set(nextSequence);
                nextSequence++;
            }
        }
    }

多消费者情况

com.lmax.disruptor.dsl.Disruptor.start
 for循环
 ->com.lmax.disruptor.dsl.EventProcessorInfo.start
   ->com.lmax.disruptor.WorkProcessor.run

WorkProcessor.run()方法

public void run()
    {
        if (!running.compareAndSet(false, true))
        {
            throw new IllegalStateException("Thread is already running");
        }
        sequenceBarrier.clearAlert();

        notifyStart();

        boolean processedSequence = true;
        long cachedAvailableSequence = Long.MIN_VALUE;
        long nextSequence = sequence.get();  使用了原子变量
        T event = null;
        while (true)
        {
            try
            {
                // if previous sequence was processed - fetch the next sequence and set
                // that we have successfully processed the previous sequence
                // typically, this will be true
                // this prevents the sequence getting too far forward if an exception
                // is thrown from the WorkHandler
              
                使用CAS来获取可消费的序号
                if (processedSequence)
                {
                    processedSequence = false;
                    do
                    {
                        nextSequence = workSequence.get() + 1L;
                        sequence.set(nextSequence - 1L);
                    }
                    while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence));
                }

                if (cachedAvailableSequence >= nextSequence)
                {
                    event = ringBuffer.get(nextSequence);
                    workHandler.onEvent(event);
                    processedSequence = true;
                }
                else
                {
                    cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence);
                }
            }
            catch (final TimeoutException e)
            {
                notifyTimeout(sequence.get());
            }
            catch (final AlertException ex)
            {
                if (!running.get())
                {
                    break;
                }
            }
            catch (final Throwable ex)
            {
                // handle, mark as processed, unless the exception handler threw an exception
                exceptionHandler.handleEventException(ex, nextSequence, event);
                processedSequence = true;
            }
        }

        notifyShutdown();

        running.set(false);
    }

所以,单消费者和多消费者的区别是什么?

单消费者时:

1.直接获取欲发布的
long nextSequence = sequence.get() + 1L;

2.满足条件时进行数据处理
while (nextSequence <= availableSequence)
{
    event = dataProvider.get(nextSequence);
    eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
    nextSequence++;
}

3.更新序号
sequence.set(availableSequence);

这里是不需要考虑并发问题的

多消费者时

消费者保持一个自己的序列,每次累加后nextSequence,去获取可访问的最大序列。对于一个生产者,就是nextSequence到RingBuffer当前游标的序列。对于多个生产者,就是nextSequence到RingBuffer当前游标之间,最大的连续的序列集。

1.使用cas操作获取欲发布的
do
                    {
                        nextSequence = workSequence.get() + 1L;
                        sequence.set(nextSequence - 1L);//备注  直接在这里更新序号了
                    }
                    while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence));

2.满足条件时进行数据处理
 if (cachedAvailableSequence >= nextSequence)
                {
                    event = ringBuffer.get(nextSequence);
                    workHandler.onEvent(event);
                    processedSequence = true;
                }

3.更新序号


消费者的等待策略

名称 措施 适用场景
BlockingWaitStrategy 加锁 CPU资源紧缺,吞吐量和延迟并不重要的场景
BusySpinWaitStrategy 自旋 通过不断重试,减少切换线程导致的系统调用,而降低延迟。推荐在线程绑定到固定的CPU的场景下使用
PhasedBackoffWaitStrategy 自旋 + yield + 自定义策略 CPU资源紧缺,吞吐量和延迟并不重要的场景
SleepingWaitStrategy 自旋 + yield + sleep 性能和CPU资源之间有很好的折中。延迟不均匀
TimeoutBlockingWaitStrategy 加锁,有超时限制 CPU资源紧缺,吞吐量和延迟并不重要的场景
YieldingWaitStrategy 自旋 + yield + 自旋 性能和CPU资源之间有很好的折中。延迟比较均匀

相关文章

网友评论

      本文标题:Distruptor总结

      本文链接:https://www.haomeiwen.com/subject/tbqzdqtx.html