Disruptor

作者: java_飞 | 来源:发表于2018-12-03 10:16 被阅读20次

    模式

    1.发布订阅模式,同一事件会被多个消费者并行消费
    2.点对点模式,同一事件会被一组消费者其中之一消费
    3.顺序消费;

    使用场景

    低延迟,高吞吐量,有界的缓存队列

    提高吞吐量,减少并发执行上下文之间的延迟并确保可预测延迟

    为什么RingBuffer这么快?

    1.首先是CPU false sharing的解决,Disruptor通过将基本对象填充冗余基本类型变量来填充满整个缓存行,减少false sharing的概率,这部分没怎么看懂,Disruptor通过填充失效这个效果。
    (就是一个缓存行8个变量,预设7个变量,然后再保存一个唯一变量,这样就不会出现相同的变量)

    2.无锁队列的实现,对于传统并发队列,至少要维护两个指针,一个头指针和一个尾指针。在并发访问修改时,头指针和尾指针的维护不可避免的应用了锁。Disruptor由于是环状队列,对于Producer而言只有头指针而且锁是乐观锁,在标准Disruptor应用中,只有一个生产者,避免了头指针锁的争用。所以我们可以理解Disruptor为无锁队列。

    为什么要用Disruptor?

    锁的成本: 传统阻塞队列使用锁保证线程安全。而锁通过操作系统内核的上下文切换实现,会暂停线程去等待锁直到释放。执行这样的上下文切换,会丢失之前保存的数据和指令。由于消费者和生产者之间的速度差异,队列总是接近满或者空的状态。这种状态会导致高水平的写入争用。
    伪共享问题导致的性能低下。
    队列是垃圾的重要来源,队列中的元素和用于存储元素的节点对象需要进行频繁的重新分配。

    代码demo

    public class MessageEvent<T> {
        private T message;
    
        public T getMessage() {
            return message;
        }
    
        public void setMessage(T message) {
            this.message = message;
        }
    }
    
    public class MessageEventFactory implements EventFactory<MessageEvent> {
    
        @Override
        public MessageEvent newInstance() {
            return new MessageEvent();
        }
    }
    
    public class MessageEvenHandler3 implements EventHandler<MessageEvent> {
        @Override
        public void onEvent(MessageEvent messageEvent, long l, boolean b) throws Exception {
            System.out.println("----------------"+messageEvent.getMessage());
    
        }
    }
    
    public class MessageEventProducer {
    
        private RingBuffer<MessageEvent> ringBuffer;
    
        public MessageEventProducer(RingBuffer<MessageEvent> ringBuffer) {
            this.ringBuffer = ringBuffer;
        }
        public void onData(String message) {
            EventTranslatorOneArg<MessageEvent, String> translator = new MessageEventTranslator();
            ringBuffer.publishEvent(translator, message);
        }
    
    
    }
    
    public class MessageEventTranslator implements EventTranslatorOneArg<MessageEvent,String> {
    
        @Override
        public void translateTo(MessageEvent messageEvent, long l, String o2) {
                messageEvent.setMessage(o2);
        }
    }
    
    public class MessageExceptionHandler implements ExceptionHandler {
    
        @Override
        public void handleEventException(Throwable throwable, long l, Object o) {
            throwable.printStackTrace();
        }
    
        @Override
        public void handleOnStartException(Throwable throwable) {
            throwable.printStackTrace();
        }
    
        @Override
        public void handleOnShutdownException(Throwable throwable) {
            throwable.printStackTrace();
        }
    }
    
    public class MessageThreadFactory implements ThreadFactory {
    
        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r,"Simple Disruptor Test Thread");
        }
    }
    
    public class MessageConsumer {
    
        public static void main(String[] args) {
            String message = "Hello Disruptor!";
            int ringBufferSize = 1024;//必须是2的N次方
            Disruptor<MessageEvent> disruptor = new Disruptor<MessageEvent>(new MessageEventFactory(),ringBufferSize,new MessageThreadFactory(), ProducerType.SINGLE,new BlockingWaitStrategy());
    //这里用的是单一生成者,如果是多生成者的话是另一种模式,自己的类实现WorkHandler接口,
    //然后这边调用    disruptor.handleEventsWithWorkerPool(new MessageEventHandler());
            disruptor.handleEventsWith(new MessageEvenHandler3());
            disruptor.setDefaultExceptionHandler(new MessageExceptionHandler());
            RingBuffer<MessageEvent> ringBuffer = disruptor.start();
            MessageEventProducer producer = new MessageEventProducer(ringBuffer);
            IntStream.range(0,20).forEach(x->{
                producer.onData(x+message);
            });
        }
    }
    

    下面是实现WorkHandler接口的类

    public class MessageEventHandler implements WorkHandler<MessageEvent> {
    
        @Override
        public void onEvent(MessageEvent messageEvent) throws Exception {
            System.out.println(System.currentTimeMillis()+"------我是1号消费者----------"+messageEvent.getMessage());
        }
    }
    
    部分摘自他人的文章,忘记出处了,文章可能有出入,如果有问题,请联系QQ:1107156537

    相关文章

      网友评论

          本文标题:Disruptor

          本文链接:https://www.haomeiwen.com/subject/jxwecqtx.html