Disruptor概述
- 是一个无锁并发框架;
- 能够在一个线程里每秒处理6百万订单;
- 业务逻辑处理器完全在内存中运行,使用事件驱动方式;
- 业务逻辑处理器核心是Disruptor;
- 其比较像一个生产消费模型,可以类比于ArrayBlockingQueue,是个有界的容器;
Disruptor的底层性能为何如此牛掰?
- 数据结构层面:使用环形结构,数组,内存预加载;
- 使用单线程写方式,内存屏障;
- 消除伪共享(填充缓存行);
- 序号栅栏和序号配合使用,消除锁和CAS;
建立Disruptor编程模型四步走
- 建立一个工厂Event类,用于创建Event类实例,Event相当于消息;
- 需要一个监听事件类(相当于消费端),用于处理数据(Event);
- 需要实例化Disruptor实例(相当于容器),配置一些列参数,编写Disruptor核心组件;
- 编写生产者组件,向Disruptor容器中投递数据;
Disruptor示例
Disruptor依赖
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.2</version>
</dependency>
建立一个工厂Event类
- Event类:相当于传送的消息;
- 如果需要实现读写,需要实现Serializable接口,Disruptor走纯内存,不实现Serializable接口也可以;
/**
* 如果需要实现读写,需要实现Serializable接口;
* Disruptor走纯内存,不实现Serializable接口也可以;
*/
public class OrderEvent {
private long value; //订单的价格
public long getValue() {
return value;
}
public void setValue(long value) {
this.value = value;
}
}
- Event工厂类:用于生产Event对象;
- 需要实现com.lmax.disruptor.EventFactory接口;
import com.lmax.disruptor.EventFactory;
public class OrderEventFactory implements EventFactory<OrderEvent>{
/**
* 这个方法就是为了返回空的数据对象(Event)
* @return
*/
public OrderEvent newInstance() {
return new OrderEvent();
}
}
创建一个监听事件类
- 相当于消费者;
- 需要实现com.lmax.disruptor.EventHandler接口;
import com.lmax.disruptor.EventHandler;
public class OrderEventHandler implements EventHandler<OrderEvent>{
public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {
System.err.println("消费者: " + event.getValue());
}
}
实例化Disruptor实例
- 实例化disruptor对象;
- 添加消费者的监听 (构建disruptor与消费者的一个关联关系);
- 启动disruptor;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
public class Main {
public static void main(String[] args) {
// 参数准备工作
OrderEventFactory orderEventFactory = new OrderEventFactory();
int ringBufferSize = 4;
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
/**
* 1 eventFactory: 消息(event)工厂对象
* 2 ringBufferSize: 容器的长度
* 3 executor: 线程池(建议使用自定义线程池) RejectedExecutionHandler
* 4 ProducerType: 单生产者 还是 多生产者
* 5 waitStrategy: 等待策略
*/
//1. 实例化disruptor对象
Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(orderEventFactory,
ringBufferSize,
executor,
ProducerType.SINGLE,
new BlockingWaitStrategy());
//2. 添加消费者的监听 (构建disruptor 与 消费者的一个关联关系)
disruptor.handleEventsWith(new OrderEventHandler());
//3. 启动disruptor
disruptor.start();
}
}
编写生产者组件,向Disruptor容器中投递数据
- 生产者组件
import java.nio.ByteBuffer;
import com.lmax.disruptor.RingBuffer;
public class OrderEventProducer {
private RingBuffer<OrderEvent> ringBuffer;
public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void sendData(ByteBuffer data) {
//1 在生产者发送消息的时候, 首先 需要从我们的ringBuffer里面 获取一个可用的序号
long sequence = ringBuffer.next(); //0
try {
//2 根据这个序号, 找到具体的 "OrderEvent" 元素 注意:此时获取的OrderEvent对象是一个没有被赋值的"空对象"
OrderEvent event = ringBuffer.get(sequence);
//3 进行实际的赋值处理
event.setValue(data.getLong(0));
} finally {
//4 提交发布操作
ringBuffer.publish(sequence);
}
}
}
- 生产者组件的创建与使用;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
public class Main {
public static void main(String[] args) {
// 参数准备工作
OrderEventFactory orderEventFactory = new OrderEventFactory();
int ringBufferSize = 4;
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
/**
* 1 eventFactory: 消息(event)工厂对象
* 2 ringBufferSize: 容器的长度
* 3 executor: 线程池(建议使用自定义线程池) RejectedExecutionHandler
* 4 ProducerType: 单生产者 还是 多生产者
* 5 waitStrategy: 等待策略
*/
//1. 实例化disruptor对象
Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(orderEventFactory,
ringBufferSize,
executor,
ProducerType.SINGLE,
new BlockingWaitStrategy());
//2. 添加消费者的监听 (构建disruptor 与 消费者的一个关联关系)
disruptor.handleEventsWith(new OrderEventHandler());
//3. 启动disruptor
disruptor.start();
//4. 获取实际存储数据的容器: RingBuffer
RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
OrderEventProducer producer = new OrderEventProducer(ringBuffer);
ByteBuffer bb = ByteBuffer.allocate(8);
for(long i = 0 ; i < 5; i ++){
bb.putLong(0, i);
producer.sendData(bb);
}
disruptor.shutdown();
executor.shutdown();
}
}
输出:
消费者: 0
消费者: 1
消费者: 2
消费者: 3
消费者: 4
网友评论