Disruptor入门

作者: Real_man | 来源:发表于2019-03-13 12:12 被阅读59次

    Disruptor使用

    Disruptor是LMAX公司开源的一款高性能的多线程通信库。Java的队列在高并发场景下会带来延迟,而LMAX目标是成为世界上最快的交易平台,也就是系统要有非常低的延迟和很好的吞吐量。为了优化Java队列的延迟问题,LMAX研发了Disruptor。

    Disruptor不是仅仅为金融领域专用的,在解决并发编程的难题上,它都是可以适用的。

    在普通的并发编程中,CPU级别的缓存未命中,还有内核对锁的操作都是有很大的开销,Disruptor则是无锁的。

    实战

    在Disruptor入门中,我们会考虑用一个简单的例子来理解它,从producer中传递一个Long类型的值,然后在consumer中将数字输出。

    引入依赖:

           <dependency>
                <groupId>com.lmax</groupId>
                <artifactId>disruptor</artifactId>
                <version>3.2.0</version>
            </dependency>
    
    1. 首先,定义要携带数据的事件对象。
    public class LongEvent
    {
        private long value;
    
        public void set(long value)
        {
            this.value = value;
        }
    }
    
    
    1. 为了Disrutpor可以预先分配这些对象,还需要创建一个事件工厂。
    import com.lmax.disruptor.EventFactory;
    
    public class LongEventFactory implements EventFactory<LongEvent>
    {
        public LongEvent newInstance()
        {
            return new LongEvent();
        }
    }
    
    1. 当我们的事件工厂创建完毕以后,我们需要创建consume来处理这些事件,本次我们只需要在控制台中输出事件携带的数据就好。
    import com.lmax.disruptor.EventHandler;
    
    public class LongEventHandler implements EventHandler<LongEvent>
    {
        public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
        {
            System.out.println("Event: " + event);
        }
    }
    
    1. 发布事件,在Disruptor 3.0之后,更喜欢使用Event Publisher/Event Translator来发布事件。使用这种方式的好处是,translator代码可以独立出来,并且更加容易进行测试,Disruptor也提供了很多内置的(EventTranslator, EventTranslatorOneArg, EventTranslatorTwoArg)等,
    public class LongEventProducerWithTranslator {
        private final RingBuffer<LongEvent> ringBuffer;
    
        public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) {
            this.ringBuffer = ringBuffer;
        }
    
        private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR =
                new EventTranslatorOneArg<LongEvent, ByteBuffer>() {
                    @Override
                    public void translateTo(LongEvent event, long sequence, ByteBuffer bb) {
                        event.set(bb.getLong(0));
                    }
                };
    
        public void onData(ByteBuffer bb) {
            ringBuffer.publishEvent(TRANSLATOR, bb);
        }
    }
    

    除了使用Translator,还有一种更加常见的方法,但是这种方法在多发布者的场景中,可能会出现一些无法预期的问题,因此还是建议使用Translator。

    // 不建议使用
    @Deprecated
    public class LongEventProducer {
        private final RingBuffer<LongEvent> ringBuffer;
    
        public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
            this.ringBuffer = ringBuffer;
        }
    
        public void onData(ByteBuffer bb) {
            long sequence = ringBuffer.next();  // Grab the next sequence
            try {
                LongEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor
                // for the sequence
                event.set(bb.getLong(0));  // Fill with data
            } finally {
                ringBuffer.publish(sequence);
            }
        }
    }
    
    
    1. 将整个过程串联起来。
    image-20190313102722542
    public class LongEventMain {
        public static void main(String[] args) throws InterruptedException {
            // 构建Disrutor对象
            Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(new LongEventFactory(),
                    1024 * 1024,
                    Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()),
                    ProducerType.SINGLE,
                    new BusySpinWaitStrategy()
            );
    
            // 设置消费者
            disruptor.handleEventsWith(new LongEventHandler());
    
            // 启动Disrutor
            disruptor.start();
    
    
            RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
            LongEventProducerWithTranslator producer = new LongEventProducerWithTranslator(ringBuffer);
    
            ByteBuffer bb = ByteBuffer.allocate(8);
            for (long l = 0; true; l++)
            {
                bb.putLong(0, l);
                producer.onData(bb);
                Thread.sleep(1000);
            }
    
        }
    }
    
    1. 查看运行结果
    image-20190313100417148

    用法就是上面那些了,还有一些Disrutor的具体使用,可以结合实际的业务场景。

    扩展

    在创建Disrutor对象的时候,需要指定RingBuffer的等待策略,默认的测试是BlockingWaitStrategy,阻塞等待。也就是使用锁,然后等待其它的线程进行唤醒。除了阻塞等待之外,还有一些系统提供的策略,当然也可以自定义等待策略

    • SleepingWaitStrategy 在不需要低延迟的场景下,可以使用它,内部用的是LockSupport.parkNanos。
    • YieldingWaitStrategy 内部使用的是Threa.yield,允许其他的线程先进行,比较推荐的使用方式
    • BusySpinWaitStrategy 一般在实际的消费者数量小于核心线程数的时候使用,因为它会不断的循环,占用CPU资源

    其它用法

    我看到内部的系统再使用Disrutor的时候,在消费事件的时候,将事件类型进行转换,再次异步处理。

    public class AlarmDisruptorConsumer implements WorkHandler<AlarmDisruptorVO> {
    
        @Override
        public void onEvent(AlarmDisruptorVO event) throws Exception {
    
            if (event != null) {
                process(event.getAlarmMsgVO());
                process(event.getAlarmTaskVO());
            }
        }
    
    
        public void process(AlarmMsgVO msgVO) {
    
            if (msgVO == null) {
                return;
            }
            AlarmMsgHandlerService alarmMsgHandlerService = getAlarmMsgHandlerService();
            // 将消息转换为任务
            alarmMsgHandlerService.alarmMsg2Task(msgVO);
        }
    
        public void process(AlarmTaskVO msgVO) {
            if (msgVO == null) {
                return;
            }
            AlarmMsgHandlerService alarmMsgHandlerService = getAlarmMsgHandlerService();
            // 处理任务
            alarmMsgHandlerService.alarmTask2Process( msgVO);
    
        }
    
        private AlarmMsgHandlerService getAlarmMsgHandlerService(){
            return (AlarmMsgHandlerService) SpringApplicationContext.getBean("alarmMsgHandlerService");
        }
    }
    
    

    最后

    关于Disrutor的使用就说到这了,如果JDK内部的并发编程框架不能满足你对性能上的要求,那么这是一个值得尝试的方案。

    参考

    相关文章

      网友评论

        本文标题:Disruptor入门

        本文链接:https://www.haomeiwen.com/subject/dnyzpqtx.html