1. 急速入门-Disruptor,掌握编程模型
LAMX
- 它能够在一线程里每秒处理6百万订单
- 业务逻辑处理器完全是运行在内存中,使用事件源驱动方式
- 业务逻辑处理器的核心时Disruptor
Disruptor Quick Start 步骤
- 建立一个工厂Event类,用于创建Event类实例对象
@Data
public class OrderEvent {
private long value;
}
public class OrderEventFactory implements EventFactory<OrderEvent> {
@Override
public OrderEvent newInstance() {
return new OrderEvent();
}
}
- 需要有一个监听事件类,用于处理数据(Event类)
public class OrderEventHandler implements EventHandler<OrderEvent> {
@Override
public void onEvent(OrderEvent orderEvent, long l, boolean b) throws Exception {
System.err.println("消费者:" + orderEvent.getValue());
}
}
- 实例化Disruptor实例,配置一系列参数,编写Disruptor核心组件
public class Main {
public static void main(String[] args) {
OrderEventFactory orderEventFactory = new OrderEventFactory();
int ringBufferSize = 1024 * 1024;
ThreadFactory springThreadFactory = new CustomizableThreadFactory("springThread-pool-");
/**
* 1 eventFactory: 消息(event)工厂对象
* 2 ringBufferSize:容器的长度
* 3 ProducerType:生产者 还是 多生产者
* 4 waitStrategy:等待策略
*/
//1. 实例化disruptor对象
Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(
orderEventFactory,
ringBufferSize,
springThreadFactory,
ProducerType.SINGLE,
new BlockingWaitStrategy()
);
//2. 添加消费者的监听(disruptor 与消费者的一个关联关系)
disruptor.handleEventsWith(new OrderEventHandler());
//3.启动disruptor
disruptor.start();
//4.获取实际存储数据的容器:RingBuffer
RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
OrderEventProducer producer = new OrderEventProducer(ringBuffer);
ByteBuffer bb = ByteBuffer.allocate(8);
for (int i = 0; i < 100; i++) {
bb.putLong(0,i);
producer.sendData(bb);
}
disruptor.shutdown();
}
}
- 编写生产者组件,向Disruptor容器中去投递数据
public class OrderEventProducer {
private final RingBuffer<OrderEvent> ringBuffer;
public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void sendData(ByteBuffer data) {
//1 在生产者发送消息的时候,首先需要从我们的ringBuffer里面 获取一个可用的序号
long sequence = ringBuffer.next();
try {
//2 根据这个序号,找到具体的“OrderEvent”元素
//注意:此时获取的OrderEvent对象是一个没有被赋值的“空对象”
OrderEvent event = ringBuffer.get(sequence);
//3 进行实际的赋值处理
event.setValue(data.getLong(0));
} finally {
//4 提交发布操作
ringBuffer.publish(sequence);
}
}
}
运行
消费者:0
消费者:1
消费者:2
消费者:3
消费者:4
消费者:5
消费者:6
消费者:7
消费者:8
... 省略
消费者:96
消费者:97
消费者:98
消费者:99
2. Disruptor 核心原理
image.png初看Disruptor,给人的印象就是RingBuffer是其核心,生产者向RingBuffer中写入元素,消费者从RingBuffer中消费元素
RingBuffer是啥?
正如名字所说的一样,它是一个环(收尾相接的环)
它用做在不同上下文(线程)间传递数据的buffer!
RingBuffer拥有一个序号,这个序号指向数组中下一个可用元素
image.png
每次写数据:在Write对应位置写入新值,并向前移动对应的Write指针位置,如果遇到指针已经处于尾部,则移动到最开始位置,形成一个环形, 类似于双向链表。
每次读取数据:在Read位置读取当前值,并移动Read位置,同样如果遇到已经到达尾部,则返回到最开始的初始位置。
整个数据流的读写过程就是通过不断的操作Write和Read来实现数据的高效处理。
3. 扔芝麻与捡芝麻的小故事
Disruptor说的是生产者和消费者的故事
有一个数组:生产者往里面扔芝麻,消费者从里面捡芝麻
但是扔芝麻和捡芝麻也需要考虑速度问题
- 消费者捡的比扔的快,那么消费者要停下来。生产者扔了新的芝麻,然后消费者继续。
- 数组的长度是有限的,生产者到末尾的时候会再从数组的开始位置继续。这个时候可能会追上消费者,消费者还没从那个地方捡走芝麻,这个时候生产者要等待消费者捡走芝麻,然后继续。
- 随着你不停补充buffer(可能会有相应读取),这个序号会一直增长,直到绕过这个环
4.RingBuffer数据结构深入探究
要找到数组中当前序号指向的元素,可以通过mod操作。
sequence mod = array index(取模操作)
sequence :当前数据处理序列号
以上面的RingBuffer为例(java的mod语法):
image.png
例:当前RingBuffer sequence 9, array length为8,
9 % 8 = 1 即第2个位置(0是第1个位置)
网友评论