美文网首页
2.Disruptor核心原理与使用

2.Disruptor核心原理与使用

作者: 香沙小熊 | 来源:发表于2021-05-22 17:18 被阅读0次

1. 急速入门-Disruptor,掌握编程模型

LAMX
  • 它能够在一线程里每秒处理6百万订单
  • 业务逻辑处理器完全是运行在内存中,使用事件源驱动方式
  • 业务逻辑处理器的核心时Disruptor
Disruptor Quick Start 步骤
  1. 建立一个工厂Event类,用于创建Event类实例对象
@Data
public class OrderEvent {
    private long value;
}
public class OrderEventFactory implements EventFactory<OrderEvent> {
    @Override
    public OrderEvent newInstance() {

        return new OrderEvent();
    }
}
  1. 需要有一个监听事件类,用于处理数据(Event类)
public class OrderEventHandler implements EventHandler<OrderEvent> {

    @Override
    public void onEvent(OrderEvent orderEvent, long l, boolean b) throws Exception {

        System.err.println("消费者:" + orderEvent.getValue());
    }
}
  1. 实例化Disruptor实例,配置一系列参数,编写Disruptor核心组件
public class Main {


    public static void main(String[] args) {


        OrderEventFactory orderEventFactory = new OrderEventFactory();
        int ringBufferSize = 1024 * 1024;
        ThreadFactory springThreadFactory = new CustomizableThreadFactory("springThread-pool-");

        /**
         * 1 eventFactory: 消息(event)工厂对象
         * 2 ringBufferSize:容器的长度
         * 3 ProducerType:生产者 还是 多生产者
         * 4 waitStrategy:等待策略
         */
        //1. 实例化disruptor对象
        Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(
                orderEventFactory,
                ringBufferSize,
                springThreadFactory,
                ProducerType.SINGLE,
                new BlockingWaitStrategy()
        );

        //2. 添加消费者的监听(disruptor 与消费者的一个关联关系)
        disruptor.handleEventsWith(new OrderEventHandler());

        //3.启动disruptor
        disruptor.start();

        //4.获取实际存储数据的容器:RingBuffer
        RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();

        OrderEventProducer producer =  new OrderEventProducer(ringBuffer);

        ByteBuffer bb = ByteBuffer.allocate(8);

        for (int i = 0; i < 100; i++) {
            bb.putLong(0,i);
            producer.sendData(bb);
        }

        disruptor.shutdown();
    }
}
  1. 编写生产者组件,向Disruptor容器中去投递数据
public class OrderEventProducer {

    private final RingBuffer<OrderEvent> ringBuffer;

    public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    public void sendData(ByteBuffer data) {

        //1 在生产者发送消息的时候,首先需要从我们的ringBuffer里面 获取一个可用的序号
        long sequence = ringBuffer.next();
        try {
            //2 根据这个序号,找到具体的“OrderEvent”元素
            //注意:此时获取的OrderEvent对象是一个没有被赋值的“空对象”
            OrderEvent event = ringBuffer.get(sequence);
            //3 进行实际的赋值处理
            event.setValue(data.getLong(0));
        } finally {
            //4 提交发布操作
            ringBuffer.publish(sequence);
        }

    }
}

运行

消费者:0
消费者:1
消费者:2
消费者:3
消费者:4
消费者:5
消费者:6
消费者:7
消费者:8
... 省略
消费者:96
消费者:97
消费者:98
消费者:99

2. Disruptor 核心原理

初看Disruptor,给人的印象就是RingBuffer是其核心,生产者向RingBuffer中写入元素,消费者从RingBuffer中消费元素

image.png
RingBuffer是啥?

正如名字所说的一样,它是一个环(收尾相接的环)
它用做在不同上下文(线程)间传递数据的buffer!
RingBuffer拥有一个序号,这个序号指向数组中下一个可用元素


image.png

每次写数据:在Write对应位置写入新值,并向前移动对应的Write指针位置,如果遇到指针已经处于尾部,则移动到最开始位置,形成一个环形, 类似于双向链表。

每次读取数据:在Read位置读取当前值,并移动Read位置,同样如果遇到已经到达尾部,则返回到最开始的初始位置。

整个数据流的读写过程就是通过不断的操作Write和Read来实现数据的高效处理。

3. 扔芝麻与捡芝麻的小故事

Disruptor说的是生产者和消费者的故事
有一个数组:生产者往里面扔芝麻,消费者从里面捡芝麻
但是扔芝麻和捡芝麻也需要考虑速度问题

  • 消费者捡的比扔的快,那么消费者要停下来。生产者扔了新的芝麻,然后消费者继续。
  • 数组的长度是有限的,生产者到末尾的时候会再从数组的开始位置继续。这个时候可能会追上消费者,消费者还没从那个地方捡走芝麻,这个时候生产者要等待消费者捡走芝麻,然后继续。
  • 随着你不停补充buffer(可能会有相应读取),这个序号会一直增长,直到绕过这个环

4.RingBuffer数据结构深入探究

要找到数组中当前序号指向的元素,可以通过mod操作。

sequence mod   = array index(取模操作)
sequence :当前数据处理序列号

以上面的RingBuffer为例(java的mod语法):


image.png

例:当前RingBuffer sequence 9, array length为8,

9 % 8 = 1 即第2个位置(0是第1个位置)
注意:槽的个数是2的N次方,更有利于给予二进制的计算机进行计算。
特别感谢:

阿神

相关文章

网友评论

      本文标题:2.Disruptor核心原理与使用

      本文链接:https://www.haomeiwen.com/subject/gpfbjltx.html