Disruptor

作者: menghuijia | 来源:发表于2018-09-06 10:25 被阅读0次
    package com.meng.disruptor.demo;
    
    import com.lmax.disruptor.BlockingWaitStrategy;
    import com.lmax.disruptor.RingBuffer;
    import com.lmax.disruptor.dsl.Disruptor;
    import com.lmax.disruptor.dsl.ProducerType;
    
    import java.nio.ByteBuffer;
    import java.util.concurrent.ThreadFactory;
    import java.util.concurrent.TimeUnit;
    
    public class LongEvent {
    
        private long value;
    
        public void set(long value) {
            this.value = value;
        }
    
        @Override
        public String toString() {
            return "" + value;
        }
    
        public static void main(String[] args) throws InterruptedException {
    
            // 1
            int buffSize = 1024;
            
            // 2
            Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, buffSize, (ThreadFactory) Thread::new, ProducerType.SINGLE, new BlockingWaitStrategy());
            
            // 3
            disruptor.handleEventsWith((e, s, eob) -> System.out.println("[LongEvent1]: " + e), (e, s, eob) -> {
                System.out.println("[LongEvent2]: " + e);
                TimeUnit.MILLISECONDS.sleep(10);
            }).then((e, s, eob) -> System.out.println("[LongEvent3]: " + e));
    
            // 4
            disruptor.start();
            
            // 5
            RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
    
            // 6
            ByteBuffer bb = ByteBuffer.allocate(8);
            
            // 7
            for (long i = 0; i < 10; i++) {
                bb.putLong(0, i);
                ringBuffer.publishEvent((e, s, a) -> e.set(bb.getLong(0)), bb);
                TimeUnit.MILLISECONDS.sleep(1);
            }
            
            // 8
            disruptor.shutdown();
    
        }
    
    }
    
    1. RingBuffer的最大容量,必须是2^n
    2. 创建Disruptor对象
    3. 指定消费者,其中消费者3必须在消费者1和消费者2同时执行完毕后,再执行
    4. 启动Disruptor
    5. 通过Disruptor获取RingBuffer
    6. 创建ByteBuffer,并分配8个字节jvm的内存
    7. 每1毫秒,生产一个LongEvent,并通过RingBuffer发布,共10个
    8. 关闭Disruptor,会阻塞全部任务消费完成后,再执行
    执行结果
    [LongEvent1]: 0
    [LongEvent2]: 0
    [LongEvent1]: 1
    [LongEvent1]: 2
    [LongEvent2]: 1
    [LongEvent1]: 3
    [LongEvent3]: 0
    [LongEvent1]: 4
    [LongEvent1]: 5
    [LongEvent1]: 6
    [LongEvent1]: 7
    [LongEvent2]: 2
    [LongEvent1]: 8
    [LongEvent1]: 9
    shutdown disruptor
    [LongEvent2]: 3
    [LongEvent2]: 4
    [LongEvent3]: 1
    [LongEvent3]: 2
    [LongEvent2]: 5
    [LongEvent3]: 3
    [LongEvent2]: 6
    [LongEvent2]: 7
    [LongEvent2]: 8
    [LongEvent2]: 9
    [LongEvent3]: 4
    [LongEvent3]: 5
    [LongEvent3]: 6
    [LongEvent3]: 7
    [LongEvent3]: 8
    [LongEvent3]: 9
    disruptor is shutdown
    

    相关文章

      网友评论

          本文标题:Disruptor

          本文链接:https://www.haomeiwen.com/subject/zyvtgftx.html