disruptor的核心设计思路
- 使用循环数组的方式代替队列,使用预先填充数据的方式来避免GC
- 使用CPU缓存行填充的方式来避免极端情况下的数据争用导致的性能下降
- 多线程编程中尽量避免锁争用的编码技巧
生成端减少资源的竞争
多线程在队列也好,循环数组也好,必然存在对标志位的竞争。无论是使用锁来避免竞争,还是使用cas来进行无锁算法。只要争用的情况存在,并且线程较多,都会出现对资源的不断消耗。争用的对象越多,争用中消耗掉的资源也就越多。为了避免这样的情况,减少争用的资源就是一个手段。比如在循环数组中只保留一个标志位,也就是下一个可以写入数据位置的标志位。而尾部标志位则在各个消费者线程中保存(具体的编程手法后续细讲)
如果存在多个生产者,则可写入的标志位需要用cas算法来进行争夺,避免锁的使用。多个线程通过cas得到唯一的不冲突的下一个可写序号。由于需要获得序号后才能进行写入,而写入完成才可以让消费者线程进行消费。所以才获得序号后,完成写入前,必须有一种方式让消费者检测是否完成。以避免消费者拿到还未填入输入的数组位。
具体编程手法
主要的争夺环节集中在多线程发布中,序号大的线程发布需要等到序号小的线程发布完成后才能发布。那我们的优化的点也在这个地方。如果只有一个地方可以写入完成信息,必然需要争夺。为了避免争夺,我们可以使用标志数组(长度和内容数组相同,每一位表示相同下标的内容数组是否发布)来表示每一个位置是否写入。这样就可以避免发布的争夺(大家的标志位都不在一起了)。这也是Disruptor里面的availableSequence
但是又来带来一个问题,用什么数字来表示是否已经发布完成?如果只是0和1,那么写过1轮以后,标志数组位上就都是1了。又无法区分。所以标志数组上的数字应该在循环数组的每一轮循环的值都不同。比如一开始都是-1,第一轮中是0的表示已发布,第二轮中是0表示没发布,是1的表示已发布。
Disruptor中的具体处理
发布的算法步骤
1.将序号除以标志数组长度(因为长度是2的次方幂,这一步可以通过右移来完成)得到填入值x。(序号每次都会增长,所以每次都应该不同)
2.将序号和标志数组长度减一进行并运算得到填入位置index。即类似于hash&(length-1)获取填入位置
4.将x写入index位置。
消费端减少对锁的使用
生产者消费者中最容易出现争夺的,采用的优化手段包括
- 使用循环数组代替队列,使用cas算法来代替锁争夺
- 消费者各自保存自己当前已经处理过的序号,而不是将这个序号的信息在队列中来存储,避免多线程争用。
- 生产者线程则需要持有消费者的类的信息,好用来判断所有消费者中消费的最小的序号,以避免在数据写入时覆盖了某个消费者尚未处理的数据信息。(生成端是怎么来获取消费者消费最小的序号的,后面讲)
伪共享
这个主要跟计算机的基本结构。L1、L2、L3分别表示一级缓存、二级缓存、三级缓存,越靠近CPU的缓存,速度越快,容量也越小。所以L1缓存很小但很快,并且紧靠着在使用它的CPU内核;L2大一些,也慢一些,并且仍然只能被一个单独的CPU核使用;L3更大、更慢,并且被单个插槽上的所有CPU核共享;最后是主存,由全部插槽上的所有CPU核共享。
当CPU执行运算的时候,它先去L1查找所需的数据、再去L2、然后是L3,如果最后这些缓存中都没有,所需的数据就要去主内存拿。走得越远,运算耗费的时间就越长。所以如果你在做一些很频繁的事,你要尽量确保数据在L1缓存中。
如果不同的线程变量处于同一个缓存行,及时其中一个的缓存数据是最新的,如果另外一个变量需要更新,那么整个缓存行都会更新,具体可以参见计算机MESI协议.
解决伪共享的一种方法是通过补齐(Padding),使得每一条缓存行只存一个多线程变量
32位的计算机上一个缓存行是64个字节,而一个Java的long类型变量是8字节,因此在一个缓存行中可以存8个long类型的变量
CPU每次从主存中拉取数据时,会把相邻的数据也存入同一个cache line
在访问一个long数组的时候,如果数组中的一个值被加载到缓存中,它会自动加载另外7个
所以一般都是左右补7个。但这种方法是有局限性就是和计算机位数有关系以及jvm可能会对无用的变量做优化导致补齐并不能生效
解决伪共享的第二种方法就是使用@ Contended注解,这个是java给我们提供的方案
下面看源码
单生产者生产数据
1.申请写入m个元素
2.若是有m个元素可以写入,则返回最大的序列号。这儿需要注意的是是否会覆盖未读的元素
3.若是返回正确,则生产者开始写入元素
public long next(int n)
{
if (n < 1)
{
throw new IllegalArgumentException("n must be > 0");
}
long nextValue = this.nextValue; 上一次生产的
long nextSequence = nextValue + n;
long wrapPoint = nextSequence - bufferSize; 同一起跑线进行比较
long cachedGatingSequence = this.cachedValue;
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
{
cursor.setVolatile(nextValue); // StoreLoad fence
long minSequence;
判断是否覆盖了最慢的消费者
while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
{
LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin? 进行阻塞
}
this.cachedValue = minSequence; 将最慢的消费者缓存起来
}
this.nextValue = nextSequence;
return nextSequence;
}
image.png
多个生产者
防止多个线程重复写同一个元素
解决方案:
- 每个线程获取不同的一段数组空间进行操作,在分配元素的时候,通过CAS判断一下这段空间是否已经分配出去即可
防止读取的时候,读到还未写的元素
解决方案
引入了一个与Ring Buffer大小相同的buffer
即位数组available Buffer
当某个位置写入成功的时候,便把availble Buffer相应的位置置位,标记为写入成功
读取的时候,会遍历available Buffer,来判断元素是否已经就绪
代码如下,引入了CAS操作以及原子变量
public long next(int n)
{
if (n < 1)
{
throw new IllegalArgumentException("n must be > 0");
}
long current;
long next;
do
{
current = cursor.get();
next = current + n;
long wrapPoint = next - bufferSize;
long cachedGatingSequence = gatingSequenceCache.get();
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
{
long gatingSequence = Util.getMinimumSequence(gatingSequences, current); 通过这个方法获取到消费端最慢的序号
if (wrapPoint > gatingSequence)
{
LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
continue;
}
gatingSequenceCache.set(gatingSequence);
}
else if (cursor.compareAndSet(current, next))
{
break;
}
}
while (true);
return next;
}
再来看消费者是怎么消费的
单消费者
com.lmax.disruptor.dsl.Disruptor.start
for循环
->com.lmax.disruptor.dsl.EventProcessorInfo.start
->com.lmax.disruptor.BatchEventProcessor.run
BatchEventProcessor.run()方法如下
public void run()
{
if (running.compareAndSet(IDLE, RUNNING))
{
sequenceBarrier.clearAlert();
notifyStart();
try
{
if (running.get() == RUNNING)
{
processEvents();
}
}
finally
{
notifyShutdown();
running.set(IDLE);
}
}
else
{
// This is a little bit of guess work. The running state could of changed to HALTED by
// this point. However, Java does not have compareAndExchange which is the only way
// to get it exactly correct.
if (running.get() == RUNNING)
{
throw new IllegalStateException("Thread is already running");
}
else
{
earlyExit();
}
}
}
processEvents()方法如下
private void processEvents()
{
T event = null;
long nextSequence = sequence.get() + 1L;
while (true)
{
try
{
final long availableSequence = sequenceBarrier.waitFor(nextSequence); 从位数组中获取可消费的序号
if (batchStartAware != null)
{
batchStartAware.onBatchStart(availableSequence - nextSequence + 1); 获取做大的可消费序号,批量消费
}
while (nextSequence <= availableSequence)
{
event = dataProvider.get(nextSequence);
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence); 调用消费者消费的逻辑
nextSequence++;
}
sequence.set(availableSequence);
}
catch (final TimeoutException e)
{
notifyTimeout(sequence.get());
}
catch (final AlertException ex)
{
if (running.get() != RUNNING)
{
break;
}
}
catch (final Throwable ex)
{
exceptionHandler.handleEventException(ex, nextSequence, event);
sequence.set(nextSequence);
nextSequence++;
}
}
}
多消费者情况
com.lmax.disruptor.dsl.Disruptor.start
for循环
->com.lmax.disruptor.dsl.EventProcessorInfo.start
->com.lmax.disruptor.WorkProcessor.run
WorkProcessor.run()方法
public void run()
{
if (!running.compareAndSet(false, true))
{
throw new IllegalStateException("Thread is already running");
}
sequenceBarrier.clearAlert();
notifyStart();
boolean processedSequence = true;
long cachedAvailableSequence = Long.MIN_VALUE;
long nextSequence = sequence.get(); 使用了原子变量
T event = null;
while (true)
{
try
{
// if previous sequence was processed - fetch the next sequence and set
// that we have successfully processed the previous sequence
// typically, this will be true
// this prevents the sequence getting too far forward if an exception
// is thrown from the WorkHandler
使用CAS来获取可消费的序号
if (processedSequence)
{
processedSequence = false;
do
{
nextSequence = workSequence.get() + 1L;
sequence.set(nextSequence - 1L);
}
while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence));
}
if (cachedAvailableSequence >= nextSequence)
{
event = ringBuffer.get(nextSequence);
workHandler.onEvent(event);
processedSequence = true;
}
else
{
cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence);
}
}
catch (final TimeoutException e)
{
notifyTimeout(sequence.get());
}
catch (final AlertException ex)
{
if (!running.get())
{
break;
}
}
catch (final Throwable ex)
{
// handle, mark as processed, unless the exception handler threw an exception
exceptionHandler.handleEventException(ex, nextSequence, event);
processedSequence = true;
}
}
notifyShutdown();
running.set(false);
}
所以,单消费者和多消费者的区别是什么?
单消费者时:
1.直接获取欲发布的
long nextSequence = sequence.get() + 1L;
2.满足条件时进行数据处理
while (nextSequence <= availableSequence)
{
event = dataProvider.get(nextSequence);
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
nextSequence++;
}
3.更新序号
sequence.set(availableSequence);
这里是不需要考虑并发问题的
多消费者时
消费者保持一个自己的序列,每次累加后nextSequence,去获取可访问的最大序列。对于一个生产者,就是nextSequence到RingBuffer当前游标的序列。对于多个生产者,就是nextSequence到RingBuffer当前游标之间,最大的连续的序列集。
1.使用cas操作获取欲发布的
do
{
nextSequence = workSequence.get() + 1L;
sequence.set(nextSequence - 1L);//备注 直接在这里更新序号了
}
while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence));
2.满足条件时进行数据处理
if (cachedAvailableSequence >= nextSequence)
{
event = ringBuffer.get(nextSequence);
workHandler.onEvent(event);
processedSequence = true;
}
3.更新序号
消费者的等待策略
名称 | 措施 | 适用场景 |
---|---|---|
BlockingWaitStrategy | 加锁 | CPU资源紧缺,吞吐量和延迟并不重要的场景 |
BusySpinWaitStrategy | 自旋 | 通过不断重试,减少切换线程导致的系统调用,而降低延迟。推荐在线程绑定到固定的CPU的场景下使用 |
PhasedBackoffWaitStrategy | 自旋 + yield + 自定义策略 | CPU资源紧缺,吞吐量和延迟并不重要的场景 |
SleepingWaitStrategy | 自旋 + yield + sleep | 性能和CPU资源之间有很好的折中。延迟不均匀 |
TimeoutBlockingWaitStrategy | 加锁,有超时限制 | CPU资源紧缺,吞吐量和延迟并不重要的场景 |
YieldingWaitStrategy | 自旋 + yield + 自旋 | 性能和CPU资源之间有很好的折中。延迟比较均匀 |
网友评论