美文网首页并发编程
Disruptor核心(一) 快速上手

Disruptor核心(一) 快速上手

作者: 乌鲁木齐001号程序员 | 来源:发表于2019-04-02 12:24 被阅读119次

    Disruptor概述

    • 是一个无锁并发框架;
    • 能够在一个线程里每秒处理6百万订单;
    • 业务逻辑处理器完全在内存中运行,使用事件驱动方式;
    • 业务逻辑处理器核心是Disruptor;
    • 其比较像一个生产消费模型,可以类比于ArrayBlockingQueue,是个有界的容器;

    Disruptor的底层性能为何如此牛掰?

    • 数据结构层面:使用环形结构,数组,内存预加载;
    • 使用单线程写方式,内存屏障;
    • 消除伪共享(填充缓存行);
    • 序号栅栏和序号配合使用,消除锁和CAS;

    建立Disruptor编程模型四步走

    • 建立一个工厂Event类,用于创建Event类实例,Event相当于消息;
    • 需要一个监听事件类(相当于消费端),用于处理数据(Event);
    • 需要实例化Disruptor实例(相当于容器),配置一些列参数,编写Disruptor核心组件;
    • 编写生产者组件,向Disruptor容器中投递数据;

    Disruptor示例

    Disruptor依赖
    <dependency>
        <groupId>com.lmax</groupId>
        <artifactId>disruptor</artifactId>
        <version>3.3.2</version>
    </dependency>
    
    建立一个工厂Event类
    • Event类:相当于传送的消息;
    • 如果需要实现读写,需要实现Serializable接口,Disruptor走纯内存,不实现Serializable接口也可以;
    /**
     * 如果需要实现读写,需要实现Serializable接口;
     * Disruptor走纯内存,不实现Serializable接口也可以;
     */
    public class OrderEvent {
    
        private long value; //订单的价格
    
        public long getValue() {
            return value;
        }
    
        public void setValue(long value) {
            this.value = value;
        }
        
    }
    
    • Event工厂类:用于生产Event对象;
    • 需要实现com.lmax.disruptor.EventFactory接口;
    import com.lmax.disruptor.EventFactory;
    
    public class OrderEventFactory implements EventFactory<OrderEvent>{
    
        /**
         * 这个方法就是为了返回空的数据对象(Event)
         * @return
         */
        public OrderEvent newInstance() {
            return new OrderEvent();        
        }
    
    }
    
    创建一个监听事件类
    • 相当于消费者;
    • 需要实现com.lmax.disruptor.EventHandler接口;
    import com.lmax.disruptor.EventHandler;
    
    public class OrderEventHandler implements EventHandler<OrderEvent>{
    
        public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {
            System.err.println("消费者: " + event.getValue());
        }
    
    }
    
    实例化Disruptor实例
    • 实例化disruptor对象;
    • 添加消费者的监听 (构建disruptor与消费者的一个关联关系);
    • 启动disruptor;
    import java.nio.ByteBuffer;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    import com.lmax.disruptor.BlockingWaitStrategy;
    import com.lmax.disruptor.RingBuffer;
    import com.lmax.disruptor.dsl.Disruptor;
    import com.lmax.disruptor.dsl.ProducerType;
    
    public class Main { 
        
        public static void main(String[] args) {        
            
            // 参数准备工作
            OrderEventFactory orderEventFactory = new OrderEventFactory();
            int ringBufferSize = 4;
            ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
            
            /**
             * 1 eventFactory: 消息(event)工厂对象
             * 2 ringBufferSize: 容器的长度
             * 3 executor: 线程池(建议使用自定义线程池) RejectedExecutionHandler
             * 4 ProducerType: 单生产者 还是 多生产者
             * 5 waitStrategy: 等待策略
             */
            //1. 实例化disruptor对象
            Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(orderEventFactory,
                    ringBufferSize,
                    executor,
                    ProducerType.SINGLE,
                    new BlockingWaitStrategy());
            
            //2. 添加消费者的监听 (构建disruptor 与 消费者的一个关联关系)
            disruptor.handleEventsWith(new OrderEventHandler());
            
            //3. 启动disruptor
            disruptor.start();      
        }
    
    }
    
    编写生产者组件,向Disruptor容器中投递数据
    • 生产者组件
    import java.nio.ByteBuffer;
    
    import com.lmax.disruptor.RingBuffer;
    
    public class OrderEventProducer {
    
        private RingBuffer<OrderEvent> ringBuffer;
        
        public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {
            this.ringBuffer = ringBuffer;
        }
        
        public void sendData(ByteBuffer data) {
            //1 在生产者发送消息的时候, 首先 需要从我们的ringBuffer里面 获取一个可用的序号
            long sequence = ringBuffer.next();  //0 
            try {
                //2 根据这个序号, 找到具体的 "OrderEvent" 元素 注意:此时获取的OrderEvent对象是一个没有被赋值的"空对象"
                OrderEvent event = ringBuffer.get(sequence);
                //3 进行实际的赋值处理
                event.setValue(data.getLong(0));            
            } finally {
                //4 提交发布操作
                ringBuffer.publish(sequence);           
            }
        }   
        
    }
    
    • 生产者组件的创建与使用;
    import java.nio.ByteBuffer;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    import com.lmax.disruptor.BlockingWaitStrategy;
    import com.lmax.disruptor.RingBuffer;
    import com.lmax.disruptor.dsl.Disruptor;
    import com.lmax.disruptor.dsl.ProducerType;
    
    public class Main {
        
        public static void main(String[] args) {        
            
            // 参数准备工作
            OrderEventFactory orderEventFactory = new OrderEventFactory();
            int ringBufferSize = 4;
            ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
            
            /**
             * 1 eventFactory: 消息(event)工厂对象
             * 2 ringBufferSize: 容器的长度
             * 3 executor: 线程池(建议使用自定义线程池) RejectedExecutionHandler
             * 4 ProducerType: 单生产者 还是 多生产者
             * 5 waitStrategy: 等待策略
             */
            //1. 实例化disruptor对象
            Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(orderEventFactory,
                    ringBufferSize,
                    executor,
                    ProducerType.SINGLE,
                    new BlockingWaitStrategy());
            
            //2. 添加消费者的监听 (构建disruptor 与 消费者的一个关联关系)
            disruptor.handleEventsWith(new OrderEventHandler());
            
            //3. 启动disruptor
            disruptor.start();
            
            //4. 获取实际存储数据的容器: RingBuffer
            RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
            
            OrderEventProducer producer = new OrderEventProducer(ringBuffer);
            
            ByteBuffer bb = ByteBuffer.allocate(8);
            
            for(long i = 0 ; i < 5; i ++){
                bb.putLong(0, i);
                producer.sendData(bb);
            }
            
            disruptor.shutdown();
            executor.shutdown();        
        }
    
    }
    

    输出:

    消费者: 0
    消费者: 1
    消费者: 2
    消费者: 3
    消费者: 4

    相关文章

      网友评论

        本文标题:Disruptor核心(一) 快速上手

        本文链接:https://www.haomeiwen.com/subject/djrwbqtx.html