第一次听说Disruptor
还是在storm里面,用来做线程间交换数据。后面看了下介绍,感觉有点像黑科技,作为一个类似BlockingQueue
的存在,将性能发挥到极致。最近大概翻了下源码,总结下自己的理解。
术语
- Ring Buffer: 简单说就是一个预先分配的数组
-
Sequence: 可以理解为进一步优化的
AtomicLong
,在RingBuffer中用于维护当前最新的数据的索引;在EventHandler中用于维护自己消费到的索引 - Sequencer: 这个是Disruptor的核心,每个RingBuffer对应一个,用于维护游标(cursor),即RingBuffer的Sequence
- Sequence Barrier: 由RingBuffer的Sequence产生,每个EventHandler的对应一个,用于确定是否有事件可以处理。
- Wait Strategy: 当EventHandler没有数据可以处理的时候的等待逻辑,可以是阻塞等待,如果要延迟低可以直接自旋等待。
- EventHandler: Consumer需要实现的接口。
- EventProcessor:
- Event: EventHandler最终被封装成一个EventProcessor,本身是一个Runnable。
- Producer: 自定义实现。
执行流程
Disruptor- Produce过程
- Producer产生数据
- 调用RingBuffer.next()中获取一个空闲的sequenceID。代理给Sequencer.next(),更新RingBuffer的Sequence。
- 从RingBuffer中获取这个sequenceID对应的空的Event
- 向event中设置第一步产生的数据
- 调用RingBuffer.publish()发布数据,也是代理给Sequencer.publist(),会唤醒等待的线程。
整个过程就是预先占领位置,填充数据,然后发布,发布之后数据才对Consumer可用
- Consume过程
- 每个EventHandler被封装成一个EventProcessor,在Disruptor.start()的时候开始运行,执行一个死循环
- 调用SequenceBarrier.waitFor获取RingBuffer当前最新的cursor位置
- 从当前的sequence开始遍历到上一步获取的最新的cursor,获取Event,调用EventHandler.onEvent进行处理
- 更新Consumer上的Sequence
黑科技
的执行流程看感觉除了过程比BlockingQueue繁琐外,似乎没有其他特别的地方。但是在实现过程中Disruptor使用了几种优化手段来优化性能。
-
Memory Barrier (参考 http://ifeve.com/locks-are-bad/ 和 http://ifeve.com/disruptor-memory-barrier/)
第一个优化点是尽量使用volatile,不行再使用CAS,避免使用锁。锁会造成上下文切换,比较重;CAS虽然不会造成上下文切换,但是也会锁定缓存行。volatile只需要插入内存屏障即可,效率最高。典型的例子就是在Sequence类中,其父类Value维护一个volatile类型的value。在SingleProducerSequencer
的next方法中,由于只有一个Producer,不会产生竞争,直接使用setVolatile
,立即对Consumer可见即可;在MultiProducerSequencer
中由于有多个Producer来竞争,需要使用compareAndSet。 -
Cache Line(参考 http://ifeve.com/disruptor-cacheline-padding/)
缓存行一般是64B(也有可能更大),从内存中加载或者写入都是以缓存行为单位。连续的内存地址有可能加载到同一个缓存行,Disruptor利用了这个有点,把数据存放在连续的数组中,加载的时候效率比较高。但是缓存行会带来伪共享的问题,即几个不想关的数据加载到同一个缓存行,其中一个数据需要写内存的时候其他数据也会被写进去。这个问题可以使用缓存行填充来解决。看下面Sequence的代码,在value前面和后面各填充了7个long,保证value值只会跟p1-p15的部分数据在同一个缓存行,避免影响到其他数据。
class LhsPadding
{
protected long p1, p2, p3, p4, p5, p6, p7;
}
class Value extends LhsPadding
{
protected volatile long value;
}
class RhsPadding extends Value
{
protected long p9, p10, p11, p12, p13, p14, p15;
}
public class Sequence extends RhsPadding
{
...
}
- 对象预分配
这个是基于GC的考虑。RingBuffer中数据的值不是每次new一个放进去,而是在初始化的时候就全部创建好,使用的时候只是将值set进去。这样的好处是RingBuffer中的数据进入老年代,不需要频繁gc。
网友评论