Disruptor是一个高性能的异步处理框架,或者可以认为是最快的消息框架(轻量的JMS),也可以认为是一个观察者模式的实现,或者事件监听模式的实现。
Disruptor的设计方案
Disruptor通过以下设计来解决队列速度慢的问题:
环形数组结构
为了避免垃圾回收,采用数组而非链表。同时,数组对处理器的缓存机制更加友好。
元素位置定位
数组长度2^n,通过位运算,加快定位的速度。下标采取递增的形式。不用担心index溢出的问题。index是long类型,即使100万QPS的处理速度,也需要30万年才能用完。
无锁设计
每个生产者或者消费者线程,会先申请可以操作的元素在数组中的位置,申请到之后,直接在该位置写入或者读取数据。
下面忽略数组的环形结构,介绍一下如何实现无锁设计。整个过程通过原子变量CAS,保证操作的线程安全。
本文只讲怎么使用该框架。更详细说明的可以查看阿里大牛对Disruptor翻译的博客http://ifeve.com/disruptor/
Disruptor实现生产与消费
maven依赖
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.2.1</version>
</dependency>
声明一个Event来包含需要传递的数据
/**
* @author: admin
* @description: 定义一个事件通过Disruptor进行交换数据类型
*/
public class StringEvent {
private String value;
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}
/**
* @author: admin
* @description: 让disruptor 创建事件
*/
public class StringEventFactory implements EventFactory<StringEvent> {
public StringEvent newInstance() {
return new StringEvent();
}
}
/**
* @author: admin
* @description: 事件消费者,也是是事件处理器
*/
public class StringEventHandler implements EventHandler<StringEvent> {
public void onEvent(StringEvent stringEvent, long l, boolean b) throws Exception {
System.out.println("事件消费者:" + Thread.currentThread().getName() + stringEvent.getValue());
}
}
**
* @author: admin
* @description: 事件生产者
*/
public class StringEventProducer {
public final RingBuffer<StringEvent> ringBuffer;
public StringEventProducer(RingBuffer<StringEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void onData(String string) {
//ringBuffer事件队列下一个槽
long sequence = ringBuffer.next();
//取出空的事件队列
StringEvent stringEvent = ringBuffer.get(sequence);
//获取事件传递的数据
stringEvent.setValue(string);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
System.out.println("生产这准备发送数据");
//4.发布事件
ringBuffer.publish(sequence);
}
}
}
/**
* @author: admin
* @description:
*/
public class DisruptorMain {
public static void main(String[] args) {
//创建一个可缓存的线程池 提供线程来触发消费者事件处理
ExecutorService executorService = Executors.newFixedThreadPool(10);
//创建工厂
StringEventFactory stringEventFactory = new StringEventFactory();
//创建ringBuffer大小
int ringBufferSize = 1024;
//创建disruptor
Disruptor<StringEvent> disruptor = new Disruptor<StringEvent>(stringEventFactory, ringBufferSize, executorService, ProducerType.SINGLE, new YieldingWaitStrategy());
//连接消费端方法
disruptor.handleEventsWith(new StringEventHandler());
//启动
disruptor.start();
//创建RingBuffer 容器
RingBuffer<StringEvent> ringBuffer = disruptor.getRingBuffer();
//创建生产者
StringEventProducer stringEventProducer = new StringEventProducer(ringBuffer);
//指定缓冲池大小
for (int i = 0; i < 100000; i++) {
stringEventProducer.onData("这是第" + i + "条数据");
}
//10.关闭disruptor和executor
disruptor.shutdown();
executorService.shutdown();
}
生产这准备发送数据
事件消费者:这是第671条数据
生产这准备发送数据
事件消费者:这是第672条数据
生产这准备发送数据
事件消费者:这是第673条数据
生产这准备发送数据
事件消费者:这是第674条数据
生产这准备发送数据
事件消费者:这是第675条数据
生产这准备发送数据
事件消费者:这是第676条数据
生产这准备发送数据
事件消费者:这是第677条数据
生产这准备发送数据
事件消费者:这是第678条数据
生产这准备发送数据
事件消费者:这是第679条数据
生产这准备发送数据
事件消费者:这是第680条数据
生产这准备发送数据
事件消费者:这是第681条数据
``
网友评论