Disruptor

作者: yzn2015 | 来源:发表于2020-04-26 23:28 被阅读0次

    一、简介

    LMAX是一家外汇黄金交易所,Disruptor是由LMAX公司开发的可信消息传递架构的一部分
    以便用非常快速的方法来在多组件之间传递数据。
    核心思想是理解并适应硬件工作方式来达到最优的效果。
    github地址:https://github.com/LMAX-Exchange/disruptor
    LMAX架构:https://martinfowler.com/articles/lmax.html

    二、架构图

    结构图

    三、成员

    sequencer:序列号分配
    sequence:序号,自增不减
    MultiProducerSequencer 多生产者序列分配器
    SingleProducerSequencer 单生产者序列分配器
    ProcessingSequenceBarrier 管理消费者和生产者的依赖关系,
    Ring Buffer:负责存储和更新事件的数据
    Sequence Barrier:由Sequencer生成,它包含此Sequencer发布的Sequence指针以及依赖的其它消费者的Sequence。 它包含为消费者检查是否有可用的事件的代码逻辑。
    Wait Strategy: 消费者等待事件的策略, 这些事件由生产者放入。
    Event:传递的事件,完全有用户定义
    EventProcessor:处理事件的主要循环,包含一个Sequence。有一个具体的实现类BatchEventProcessor.
    EventHandler: 用户实现的接口,代表一个消费者
    Producer:生产者,先获得占位,然后提交事件。

    四、示例

    maven依赖

            <dependency>
                <groupId>com.lmax</groupId>
                <artifactId>disruptor</artifactId>
                <version>3.3.6</version>
            </dependency>
    

    消息体:

    import lombok.Data;
    /**
     * Created by yangzaining on 2020-04-06.
     */
    @Data
    public class LongEvent {
        private Long id;
    }
    

    消息工厂

    import com.lmax.disruptor.EventFactory;
    import lombok.Data;
    
    /**
     * Created by yangzaining on 2020-04-06.
     */
    @Data
    public class LongEventFactory implements EventFactory<LongEvent> {
        @Override
        public LongEvent newInstance() {
            return new LongEvent();
        }
    }
    

    消费者

    import com.lmax.disruptor.EventHandler;
    import lombok.extern.slf4j.Slf4j;
    
    /**
     * Created by yangzaining on 2020-04-06.
     */
    @Slf4j
    public class LongEventHandler implements EventHandler<LongEvent> {
        @Override
        public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
            log.info("event = {},sequence = {}, endOfBatch = {}", event.getId(), sequence, endOfBatch);
        }
    }
    
    

    生产者

    package demo;
    
    import com.lmax.disruptor.RingBuffer;
    import com.lmax.disruptor.dsl.Disruptor;
    import com.lmax.disruptor.util.DaemonThreadFactory;
    
    /**
    * Created by yangzaining on 2020-04-06.
    */
    public class LongEventPusher {
    
       private RingBuffer<LongEvent> ringBuffer;
    
       LongEventPusher(RingBuffer<LongEvent> ringBuffer) {
           this.ringBuffer = ringBuffer;
       }
    
       public void push(Long id) {
           long sequence = ringBuffer.next();
           try {
               LongEvent event = ringBuffer.get(sequence);
               event.setId(id);
           } finally {
               ringBuffer.publish(sequence);
           }
       }
    }
    
    
    
        public static void main(String[] args) throws InterruptedException {
            LongEventFactory factory = new LongEventFactory();
            int bufferSize = 1024;
            Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE);
            disruptor.handleEventsWith(new LongEventHandler(), new LongEventHandler());
            disruptor.start();
    
            RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
            LongEventPusher pusher = new LongEventPusher(ringBuffer);//java 8可以用pushEvent
            for (long l = 0; l < 100; l++) {
    //            long finalL = l;
    //            disruptor.publishEvent((eventWrapper, sequence) -> eventWrapper.setId(finalL));
                pusher.push(l);
            }
            Thread.sleep(10000);
        }
    
    
    producer.png
        /**
         * @see Sequencer#next(int)
         */
        @Override
        public long next(int n)
        {
            if (n < 1)
            {
                throw new IllegalArgumentException("n must be > 0");
            }
    
            long current;
            long next;
    
            do
            {
                current = cursor.get();
                next = current + n;
    
                long wrapPoint = next - bufferSize;
                long cachedGatingSequence = gatingSequenceCache.get();
    
                if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
                {
                    long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
    
                    if (wrapPoint > gatingSequence)
                    {
                        waitStrategy.signalAllWhenBlocking();
                        LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
                        continue;
                    }
    
                    gatingSequenceCache.set(gatingSequence);
                }
                else if (cursor.compareAndSet(current, next))
                {
                    break;
                }
            }
            while (true);
    
            return next;
        }
    
    consumer.png
               while (true)
                {
                    try
                    {
                        final long availableSequence = sequenceBarrier.waitFor(nextSequence);
    
                        while (nextSequence <= availableSequence)
                        {
                            event = dataProvider.get(nextSequence);
                            eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
                            nextSequence++;
                        }
    
                        sequence.set(availableSequence);
                    }
                    catch (final TimeoutException e)
                    {
                        notifyTimeout(sequence.get());
                    }
    
    消费图解.png

    注:以上全凭自己对Disruptor的理解,有不对的地方欢迎指正

    相关文章

      网友评论

          本文标题:Disruptor

          本文链接:https://www.haomeiwen.com/subject/zqywphtx.html