美文网首页
Disruptor小结

Disruptor小结

作者: searchworld | 来源:发表于2018-01-23 15:51 被阅读153次

第一次听说Disruptor还是在storm里面,用来做线程间交换数据。后面看了下介绍,感觉有点像黑科技,作为一个类似BlockingQueue的存在,将性能发挥到极致。最近大概翻了下源码,总结下自己的理解。

术语

  1. Ring Buffer: 简单说就是一个预先分配的数组
  2. Sequence: 可以理解为进一步优化的AtomicLong,在RingBuffer中用于维护当前最新的数据的索引;在EventHandler中用于维护自己消费到的索引
  3. Sequencer: 这个是Disruptor的核心,每个RingBuffer对应一个,用于维护游标(cursor),即RingBuffer的Sequence
  4. Sequence Barrier: 由RingBuffer的Sequence产生,每个EventHandler的对应一个,用于确定是否有事件可以处理。
  5. Wait Strategy: 当EventHandler没有数据可以处理的时候的等待逻辑,可以是阻塞等待,如果要延迟低可以直接自旋等待。
  6. EventHandler: Consumer需要实现的接口。
  7. EventProcessor:
  8. Event: EventHandler最终被封装成一个EventProcessor,本身是一个Runnable。
  9. Producer: 自定义实现。

执行流程

Disruptor
  1. Produce过程
    • Producer产生数据
    • 调用RingBuffer.next()中获取一个空闲的sequenceID。代理给Sequencer.next(),更新RingBuffer的Sequence。
    • 从RingBuffer中获取这个sequenceID对应的空的Event
    • 向event中设置第一步产生的数据
    • 调用RingBuffer.publish()发布数据,也是代理给Sequencer.publist(),会唤醒等待的线程。

整个过程就是预先占领位置,填充数据,然后发布,发布之后数据才对Consumer可用

  1. Consume过程
    • 每个EventHandler被封装成一个EventProcessor,在Disruptor.start()的时候开始运行,执行一个死循环
    • 调用SequenceBarrier.waitFor获取RingBuffer当前最新的cursor位置
    • 从当前的sequence开始遍历到上一步获取的最新的cursor,获取Event,调用EventHandler.onEvent进行处理
    • 更新Consumer上的Sequence

黑科技

的执行流程看感觉除了过程比BlockingQueue繁琐外,似乎没有其他特别的地方。但是在实现过程中Disruptor使用了几种优化手段来优化性能。

  1. Memory Barrier (参考 http://ifeve.com/locks-are-bad/http://ifeve.com/disruptor-memory-barrier/
    第一个优化点是尽量使用volatile,不行再使用CAS,避免使用锁。锁会造成上下文切换,比较重;CAS虽然不会造成上下文切换,但是也会锁定缓存行。volatile只需要插入内存屏障即可,效率最高。典型的例子就是在Sequence类中,其父类Value维护一个volatile类型的value。在SingleProducerSequencer的next方法中,由于只有一个Producer,不会产生竞争,直接使用setVolatile,立即对Consumer可见即可;在MultiProducerSequencer中由于有多个Producer来竞争,需要使用compareAndSet。

  2. Cache Line(参考 http://ifeve.com/disruptor-cacheline-padding/
    缓存行一般是64B(也有可能更大),从内存中加载或者写入都是以缓存行为单位。连续的内存地址有可能加载到同一个缓存行,Disruptor利用了这个有点,把数据存放在连续的数组中,加载的时候效率比较高。但是缓存行会带来伪共享的问题,即几个不想关的数据加载到同一个缓存行,其中一个数据需要写内存的时候其他数据也会被写进去。这个问题可以使用缓存行填充来解决。看下面Sequence的代码,在value前面和后面各填充了7个long,保证value值只会跟p1-p15的部分数据在同一个缓存行,避免影响到其他数据。

class LhsPadding
{
    protected long p1, p2, p3, p4, p5, p6, p7;
}

class Value extends LhsPadding
{
    protected volatile long value;
}

class RhsPadding extends Value
{
    protected long p9, p10, p11, p12, p13, p14, p15;
}

public class Sequence extends RhsPadding
{
  ...
}
  1. 对象预分配
    这个是基于GC的考虑。RingBuffer中数据的值不是每次new一个放进去,而是在初始化的时候就全部创建好,使用的时候只是将值set进去。这样的好处是RingBuffer中的数据进入老年代,不需要频繁gc。

相关文章

网友评论

      本文标题:Disruptor小结

      本文链接:https://www.haomeiwen.com/subject/rjjsaxtx.html