开篇
Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题。与Kafka、RabbitMQ用于服务间的消息队列不同,disruptor一般用于线程间消息的传递。基于Disruptor开发的系统单线程能支撑每秒600万订单,2010年在QCon演讲后,获得了业界关注。2011年,企业应用软件专家Martin Fowler专门撰写长文介绍The LMAX Architecture。同年它还获得了Oracle官方的Duke大奖。其他关于disruptor的背景就不在此多言,可以自己google。
Disruptor性能
disruptor是用于一个JVM中多个线程之间的消息队列,作用与ArrayBlockingQueue有相似之处,但是disruptor从功能、性能都远好于ArrayBlockingQueue,当多个线程之间传递大量数据或对性能要求较高时,可以考虑使用disruptor作为ArrayBlockingQueue的替代者。
官方也对disruptor和ArrayBlockingQueue的性能在不同的应用场景下做了对比,本文列出其中一组数据,数据中P代表producer,C代表consumer,ABS代表ArrayBlockingQueue,目测性能只有有5~10倍左右的提升。
完整的官方性能测试数据在Performance Results · LMAX-Exchange/disruptor Wiki可以看到,性能测试的代码已经包含在disruptor的代码中,有兴趣的可以直接过去看看。
Disruptor原理介绍
Disruptor组成元素图Sequence
Sequence是Disruptor最核心的组件,上面已经提到过了。生产者对RingBuffer的互斥访问,生产者与消费者之间的协调以及消费者之间的协调,都是通过Sequence实现。几乎每一个重要的组件都包含Sequence。那么Sequence是什么呢?首先Sequence是一个递增的序号,说白了就是计数器;其次,由于需要在线程间共享,所以Sequence是引用传递,并且是线程安全的;再次,Sequence支持CAS操作;最后,为了提高效率,Sequence通过padding来避免伪共享。
RingBuffer
RingBuffer是存储消息的地方,通过一个名为cursor的Sequence对象指示队列的头,协调多个生产者向RingBuffer中添加消息,并用于在消费者端判断RingBuffer是否为空。巧妙的是,表示队列尾的Sequence并没有在RingBuffer中,而是由消费者维护。这样的好处是多个消费者处理消息的方式更加灵活,可以在一个RingBuffer上实现消息的单播,多播,流水线以及它们的组合。其缺点是在生产者端判断RingBuffer是否已满是需要跟踪更多的信息,为此,在RingBuffer中维护了一个名为gatingSequences的Sequence数组来跟踪相关Seqence。
SequenceBarrier
SequenceBarrier用来在消费者之间以及消费者和RingBuffer之间建立依赖关系。在Disruptor中,依赖关系实际上指的是Sequence的大小关系,消费者A依赖于消费者B指的是消费者A的Sequence一定要小于等于消费者B的Sequence,这种大小关系决定了处理某个消息的先后顺序。因为所有消费者都依赖于RingBuffer,所以消费者的Sequence一定小于等于RingBuffer中名为cursor的Sequence,即消息一定是先被生产者放到Ringbuffer中,然后才能被消费者处理。
SequenceBarrier在初始化的时候会收集需要依赖的组件的Sequence,RingBuffer的cursor会被自动的加入其中。需要依赖其他消费者和/或RingBuffer的消费者在消费下一个消息时,会先等待在SequenceBarrier上,直到所有被依赖的消费者和RingBuffer的Sequence大于等于这个消费者的Sequence。当被依赖的消费者或RingBuffer的Sequence有变化时,会通知SequenceBarrier唤醒等待在它上面的消费者。
WaitStrategy
当消费者等待在SequenceBarrier上时,有许多可选的等待策略,不同的等待策略在延迟和CPU资源的占用上有所不同,可以视应用场景选择:
- BusySpinWaitStrategy : 自旋等待,类似Linux Kernel使用的自旋锁。低延迟但同时对CPU资源的占用也多。
- BlockingWaitStrategy : 使用锁和条件变量。CPU资源的占用少,延迟大。
- SleepingWaitStrategy : 在多次循环尝试不成功后,选择让出CPU,等待下次调度,多次调度后仍不成功,尝试前睡眠一个纳秒级别的时间再尝试。这种策略平衡了延迟和CPU资源占用,但延迟不均匀。
- YieldingWaitStrategy : 在多次循环尝试不成功后,选择让出CPU,等待下次调。平衡了延迟和CPU资源占用,但延迟也比较均匀。
- PhasedBackoffWaitStrategy : 上面多种策略的综合,CPU资源的占用少,延迟大。
为什么RingBuffer这么快,首先抛出两个原因:
-
首先是CPU false sharing的解决,Disruptor通过将基本对象填充冗余基本类型变量来填充满整个缓存行,减少false sharing的概率,这部分没怎么看懂,Disruptor通过填充失效这个效果。
-
无锁队列的实现,对于传统并发队列,至少要维护两个指针,一个头指针和一个尾指针。在并发访问修改时,头指针和尾指针的维护不可避免的应用了锁。Disruptor由于是环状队列,对于Producer而言只有头指针而且锁是乐观锁,在标准Disruptor应用中,只有一个生产者,避免了头指针锁的争用。所以我们可以理解Disruptor为无锁队列。
RingBuffer整个工作流程
工作流程图-
每个RingBuffer是一个环状队列,队列中每个元素可以理解为一个槽。在初始化时,RingBuffer规定了总大小,就是这个环最多可以容纳多少槽。这里Disruptor规定了,RingBuffer大小必须是2的n次方。这里用了一个小技巧,就是将取模转变为取与运算。在内存管理中,我们常用的就是取余定位操作。如果我们想在Ringbuffer定位,一般会用到某个数字对Ringbuffer的大小取余。如果是对2的n次方取余,则可以简化成m % 2^n = m & ( 2^n - 1 )
-
Producer会向这个RingBuffer中填充元素,填充元素的流程是首先从RingBuffer读取下一个Sequence,之后在这个Sequence位置的槽填充数据,之后发布。
-
Consumer消费RingBuffer中的数据,通过SequenceBarrier来协调不同的Consumer的消费先后顺序,以及获取下一个消费位置Sequence。
-
Producer在RingBuffer写满时,会从头开始继续写替换掉以前的数据。但是如果有SequenceBarrier指向下一个位置,则不会覆盖这个位置,阻塞到这个位置被消费完成。Consumer同理,在所有Barrier被消费完之后,会阻塞到有新的数据进来。
Disruptor例子
如何使用 Disruptor ,Disruptor 的 API 十分简单,主要有以下几个步骤:
-
1、定义事件:事件(Event)就是通过 Disruptor 进行交换的数据类型。
-
2、定义事件工厂:事件工厂(Event Factory)定义了如何实例化前面第1步中定义的事件(Event),需要实现接口 com.lmax.disruptor.EventFactory<T>。Disruptor 通过 EventFactory 在 RingBuffer 中预创建 Event 的实例。一个 Event 实例实际上被用作一个“数据槽”,发布者发布前,先从 RingBuffer 获得一个 Event 的实例,然后往 Event 实例中填充数据,之后再发布到 RingBuffer 中,之后由 Consumer 获得该 Event 实例并从中读取数据。
-
3、定义事件处理的具体实现:通过实现接口 com.lmax.disruptor.EventHandler<T> 定义事件处理的具体实现。
-
4、定义用于事件处理的线程池:Disruptor 通过 java.util.concurrent.ExecutorService 提供的线程来触发 Consumer 的事件处理。
-
5、指定等待策略:Disruptor 定义了 com.lmax.disruptor.WaitStrategy 接口用于抽象 Consumer 如何等待新事件,这是策略模式的应用。
Disruptor 提供了多个 WaitStrategy 的实现,每种策略都具有不同性能和优缺点,根据实际运行环境的 CPU 的硬件特点选择恰当的策略,并配合特定的 JVM 的配置参数,能够实现不同的性能提升。例如,BlockingWaitStrategy、SleepingWaitStrategy、YieldingWaitStrategy 等,BlockingWaitStrategy 是最低效的策略,但其对CPU的消耗最小并且在各种不同部署环境中能提供更加一致的性能表现;SleepingWaitStrategy 的性能表现跟 BlockingWaitStrategy 差不多,对 CPU 的消耗也类似,但其对生产者线程的影响最小,适合用于异步日志类似的场景;YieldingWaitStrategy 的性能是最好的,适合用于低延迟的系统。在要求极高性能且事件处理线数小于 CPU 逻辑核心数的场景中,推荐使用此策略;例如,CPU开启超线程的特性。 -
6、启动 Disruptor。
-
7、发布事件:Disruptor 的事件发布过程是一个两阶段提交的过程:
第一步:先从 RingBuffer 获取下一个可以写入的事件的序号;
第二步:获取对应的事件对象,将数据写入事件对象;
第三部:将事件提交到 RingBuffer;
事件只有在提交之后才会通知 EventProcessor 进行处理 -
8、关闭 Disruptor。
Disruptor使用例子源码
// step_1 定义事件
public class LongEvent
{
private long value;
public void set(long value)
{
this.value = value;
}
}
// step_2 定义事件工厂
import com.lmax.disruptor.EventFactory;
public class LongEventFactory implements EventFactory<LongEvent>
{
public LongEvent newInstance()
{
return new LongEvent();
}
}
// step_3 定义事件处理的具体实现
import com.lmax.disruptor.EventHandler;
public class LongEventHandler implements EventHandler<LongEvent>
{
public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
{
System.out.println("Event: " + event);
}
}
// step_4 定义用于事件处理的线程池
ExecutorService executor = Executors.newCachedThreadPool();
// 指定等待策略
WaitStrategy BLOCKING_WAIT = new BlockingWaitStrategy();
WaitStrategy SLEEPING_WAIT = new SleepingWaitStrategy();
WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();
// step_5 启动 Disruptor
EventFactory<LongEvent> eventFactory = new LongEventFactory();
ExecutorService executor = Executors.newSingleThreadExecutor();
int ringBufferSize = 1024 * 1024; // RingBuffer 大小,必须是 2 的 N 次方;
// step_6 发布事件
Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(eventFactory,
ringBufferSize, executor, ProducerType.SINGLE,
new YieldingWaitStrategy());
EventHandler<LongEvent> eventHandler = new LongEventHandler();
disruptor.handleEventsWith(eventHandler);
disruptor.start();
// step_7 发布事件;
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
long sequence = ringBuffer.next();//请求下一个事件序号;
try {
LongEvent event = ringBuffer.get(sequence);//获取该序号对应的事件对象;
long data = getEventData();//获取要通过事件传递的业务数据;
event.set(data);
} finally{
ringBuffer.publish(sequence);//发布事件;
}
// step_8 Disruptor 还提供另外一种形式的调用来简化以上操作,并确保 publish 总是得到调用。
static class Translator implements EventTranslatorOneArg<LongEvent, Long>{
@Override
public void translateTo(LongEvent event, long sequence, Long data) {
event.set(data);
}
}
public static Translator TRANSLATOR = new Translator();
public static void publishEvent2(Disruptor<LongEvent> disruptor) {
// 发布事件;
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
long data = getEventData();//获取要通过事件传递的业务数据;
ringBuffer.publishEvent(TRANSLATOR, data);
}
// step_9 关闭 Disruptor
disruptor.shutdown();//关闭 disruptor,方法会堵塞,直至所有的事件都得到处理;
executor.shutdown();//关闭 disruptor 使用的线程池;如果需要的话,必须手动关闭, disruptor 在 shutdown 时不会自动关闭;
Disruptor核心组件源码解析
Disruptor核心组件 - Disruptor组件
- RingBuffer<T> ringBuffer用于存储数据对象
- Executor executor用于保存消费端执行线程service
- ConsumerRepository保存消费分组信息
public class Disruptor<T>
{
//Disruptor核心变量RingBuffer用于存储变量
private final RingBuffer<T> ringBuffer;
// Disruptor用于运行consumer的ExecutorService对象
private final Executor executor;
// Disruptor保存的所有消费者信息
private final ConsumerRepository<T> consumerRepository = new ConsumerRepository<T>();
// 标记是否已经开始
private final AtomicBoolean started = new AtomicBoolean(false);
// 标记异常处理的handler
private ExceptionHandler<? super T> exceptionHandler = new ExceptionHandlerWrapper<T>();
@Deprecated
public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final Executor executor)
{
this(RingBuffer.createMultiProducer(eventFactory, ringBufferSize), executor);
}
@Deprecated
public Disruptor(
final EventFactory<T> eventFactory,
final int ringBufferSize,
final Executor executor,
final ProducerType producerType,
final WaitStrategy waitStrategy)
{
this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy), executor);
}
public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final ThreadFactory threadFactory)
{
this(RingBuffer.createMultiProducer(eventFactory, ringBufferSize), new BasicExecutor(threadFactory));
}
public Disruptor(
final EventFactory<T> eventFactory,
final int ringBufferSize,
final ThreadFactory threadFactory,
final ProducerType producerType,
final WaitStrategy waitStrategy)
{
this(
RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
new BasicExecutor(threadFactory));
}
private Disruptor(final RingBuffer<T> ringBuffer, final Executor executor)
{
this.ringBuffer = ringBuffer;
this.executor = executor;
}
}
Disruptor核心组件 - RingBuffer组件
- RingBuffer内部通过填充解决内存伪共享问题,没看懂!!
- Object[] entries用来保存数据
- int bufferSize环形数组大小
- Sequencer sequencer 分为多生产者和单生产者两种
abstract class RingBufferPad
{
protected long p1, p2, p3, p4, p5, p6, p7;
}
abstract class RingBufferFields<E> extends RingBufferPad
{
//用于填充的对象引用,为什么填充不知道?
private static final int BUFFER_PAD;
//entry存储位置相对与array起始位置的偏移量,用于UNSAFE内存操作时进行寻址,注意这个偏移量加上了用于填充的BUFFER_PAD大小
private static final long REF_ARRAY_BASE;
//对应对象引用占用内存大小,计算出来的相对位移数,比如对象引用大小是4byte,那么REF_ELEMENT_SHIFT=2,因为2的2次方=4;
private static final int REF_ELEMENT_SHIFT;
private static final Unsafe UNSAFE = Util.getUnsafe();
static
{
final int scale = UNSAFE.arrayIndexScale(Object[].class);
if (4 == scale)
{
REF_ELEMENT_SHIFT = 2;
}
else if (8 == scale)
{
REF_ELEMENT_SHIFT = 3;
}
else
{
throw new IllegalStateException("Unknown pointer size");
}
BUFFER_PAD = 128 / scale;
REF_ARRAY_BASE = UNSAFE.arrayBaseOffset(Object[].class) + (BUFFER_PAD << REF_ELEMENT_SHIFT);
}
private final long indexMask;
// 用于保存Entry的对象数组,也就是RingBuffer当中保存数据的数据结构
private final Object[] entries;
// 环形数据库大小
protected final int bufferSize;
// RingBuffer当中的sequencer,分为SingleProducerSequencer和MultiProducerSequencer两类
protected final Sequencer sequencer;
RingBufferFields(
EventFactory<E> eventFactory,
Sequencer sequencer)
{
this.sequencer = sequencer;
this.bufferSize = sequencer.getBufferSize();
if (bufferSize < 1)
{
throw new IllegalArgumentException("bufferSize must not be less than 1");
}
if (Integer.bitCount(bufferSize) != 1)
{
throw new IllegalArgumentException("bufferSize must be a power of 2");
}
this.indexMask = bufferSize - 1;
// 2 * BUFFER_PAD代表头尾都需要增加BUFFER_PAD的空间,所以访问空间需要以数组的起始位置+BUFFER_PAD,当然需要转化为字节
this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
fill(eventFactory);
}
private void fill(EventFactory<E> eventFactory)
{
for (int i = 0; i < bufferSize; i++)
{
entries[BUFFER_PAD + i] = eventFactory.newInstance();
}
}
@SuppressWarnings("unchecked")
protected final E elementAt(long sequence)
{
return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));
}
}
public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E>
{
public static final long INITIAL_CURSOR_VALUE = Sequence.INITIAL_VALUE;
protected long p1, p2, p3, p4, p5, p6, p7;
RingBuffer(
EventFactory<E> eventFactory,
Sequencer sequencer)
{
super(eventFactory, sequencer);
}
}
Disruptor核心组件 - SingleProducerSequencer
-
单生产者的Sequenecer对象
SingleProducerSequencer
public abstract class AbstractSequencer implements Sequencer
{
private static final AtomicReferenceFieldUpdater<AbstractSequencer, Sequence[]> SEQUENCE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences");
// RingBuffer的大小
protected final int bufferSize;
// 等待策略
protected final WaitStrategy waitStrategy;
// 当前RingBuffer对应的油表位置
protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
//各个消费者持有的取数sequence数组
protected volatile Sequence[] gatingSequences = new Sequence[0];
public AbstractSequencer(int bufferSize, WaitStrategy waitStrategy)
{
if (bufferSize < 1)
{
throw new IllegalArgumentException("bufferSize must not be less than 1");
}
if (Integer.bitCount(bufferSize) != 1)
{
throw new IllegalArgumentException("bufferSize must be a power of 2");
}
this.bufferSize = bufferSize;
this.waitStrategy = waitStrategy;
}
}
abstract class SingleProducerSequencerPad extends AbstractSequencer
{
protected long p1, p2, p3, p4, p5, p6, p7;
public SingleProducerSequencerPad(int bufferSize, WaitStrategy waitStrategy)
{
super(bufferSize, waitStrategy);
}
}
abstract class SingleProducerSequencerFields extends SingleProducerSequencerPad
{
public SingleProducerSequencerFields(int bufferSize, WaitStrategy waitStrategy)
{
super(bufferSize, waitStrategy);
}
protected long nextValue = Sequence.INITIAL_VALUE;
protected long cachedValue = Sequence.INITIAL_VALUE;
}
// 适用于单生产者的场景,由于没有实现任何栅栏,使用多线程的生产者进行操作并不安全。
public final class SingleProducerSequencer extends SingleProducerSequencerFields
{
protected long p1, p2, p3, p4, p5, p6, p7;
public SingleProducerSequencer(int bufferSize, final WaitStrategy waitStrategy)
{
super(bufferSize, waitStrategy);
}
}
Sequencer组件-MultiProducerSequencer
-
多生产者对象MultiProducerSequencer
MultiProducerSequencer
public abstract class AbstractSequencer implements Sequencer
{
private static final AtomicReferenceFieldUpdater<AbstractSequencer, Sequence[]> SEQUENCE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences");
// RingBuffer的大小
protected final int bufferSize;
// 等待策略
protected final WaitStrategy waitStrategy;
// 当前RingBuffer对应的油表位置
protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
//各个消费者持有的取数sequence数组
protected volatile Sequence[] gatingSequences = new Sequence[0];
public AbstractSequencer(int bufferSize, WaitStrategy waitStrategy)
{
if (bufferSize < 1)
{
throw new IllegalArgumentException("bufferSize must not be less than 1");
}
if (Integer.bitCount(bufferSize) != 1)
{
throw new IllegalArgumentException("bufferSize must be a power of 2");
}
this.bufferSize = bufferSize;
this.waitStrategy = waitStrategy;
}
}
public final class MultiProducerSequencer extends AbstractSequencer
{
private static final Unsafe UNSAFE = Util.getUnsafe();
private static final long BASE = UNSAFE.arrayBaseOffset(int[].class);
private static final long SCALE = UNSAFE.arrayIndexScale(int[].class);
private final Sequence gatingSequenceCache = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
private final int[] availableBuffer;
private final int indexMask;
private final int indexShift;
public MultiProducerSequencer(int bufferSize, final WaitStrategy waitStrategy)
{
super(bufferSize, waitStrategy);
availableBuffer = new int[bufferSize];
indexMask = bufferSize - 1;
indexShift = Util.log2(bufferSize);
initialiseAvailableBuffer();
}
Sequencer组件-Sequence
- 通过unsafe实现的线程安全对象
- 负责实现生产者和消费者消费进度的原子技术
public class Sequence extends RhsPadding
{
static final long INITIAL_VALUE = -1L;
private static final Unsafe UNSAFE;
private static final long VALUE_OFFSET;
static
{
UNSAFE = Util.getUnsafe();
try
{
VALUE_OFFSET = UNSAFE.objectFieldOffset(Value.class.getDeclaredField("value"));
}
catch (final Exception e)
{
throw new RuntimeException(e);
}
}
public Sequence()
{
this(INITIAL_VALUE);
}
public Sequence(final long initialValue)
{
UNSAFE.putOrderedLong(this, VALUE_OFFSET, initialValue);
}
public long get()
{
return value;
}
public void set(final long value)
{
UNSAFE.putOrderedLong(this, VALUE_OFFSET, value);
}
public void setVolatile(final long value)
{
UNSAFE.putLongVolatile(this, VALUE_OFFSET, value);
}
public boolean compareAndSet(final long expectedValue, final long newValue)
{
return UNSAFE.compareAndSwapLong(this, VALUE_OFFSET, expectedValue, newValue);
}
public long incrementAndGet()
{
return addAndGet(1L);
}
public long addAndGet(final long increment)
{
long currentValue;
long newValue;
do
{
currentValue = get();
newValue = currentValue + increment;
}
while (!compareAndSet(currentValue, newValue));
return newValue;
}
@Override
public String toString()
{
return Long.toString(get());
}
}
参考文章
Disruptor 极速体验
Disruptor3.0的实现细节
并发框架Disruptor
原高并发数据结构Disruptor解析
Disruptor使用指南
网友评论