Disruptor是什么
线程间通信的消息组件,类似java里的阻塞队列(BlockingQueue),与BlockingQueue的异同:
- 同:目的相同,都是为了在同一进程的线程间传输数据。
- 异:对消费者多播事件;预分配事件内存;可选无锁
使用场景
- 你是否有这样的应用场景,需要高性能的线程间通信的队列?
- MPSC 多生产单消费
- SPSC 单生产单消费
- SPMC 单生产 多消费
Disruptor的核心组成
- 消息
- 存放消息的容器
- 消息的生产者
- 消息的消费者
Disruptor的核心流程
- 构建Disruptor
1.1 指定ringBuffer的大小,队列的容量是多大
1.2 指定EventFactory - Disruptor中添加消费者
- 启动Disruptor
- producer 往Disruptor中投递消息.
- Disruptor中有消费后,消费者开始消费消息
Disruptor的入门使用
事件类:
public class LongEvent {
private long value;
public void set(long value)
{
this.value = value;
}
@Override
public String toString() {
return "LongEvent{" +
"value=" + value +
'}';
}
}
事件工厂类:
public class LongEventFactory implements EventFactory<LongEvent>
{
public LongEvent newInstance()
{
return new LongEvent();
}
}
事件消费者
public class LongEventHandler implements EventHandler<LongEvent> {
@Override
public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception {
System.out.println("----------------"+ longEvent.toString());
}
}
事件转换器
public class LongEventTranslator implements EventTranslatorOneArg<LongEvent,Long> {
@Override
public void translateTo(LongEvent longEvent, long l, Long aLong) {
longEvent.set(aLong);
}
}
事件生产者
public class LongEventProducer {
private RingBuffer<LongEvent> ringBuffer;
public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void publishData(Long aLong) {
EventTranslatorOneArg<LongEvent, Long> translator = new LongEventTranslator();
ringBuffer.publishEvent(translator, aLong);
}
}
主流程
public static void main(String[] args) throws InterruptedException {
// Executor that will be used to construct new threads for consumers
Executor executor = Executors.newCachedThreadPool();
// 事件工厂,用于创建event
LongEventFactory factory = new LongEventFactory();
// 指定ringbuf的大小,必须是2的整数倍
int bufferSize = 1024;
// 构建一个 Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, executor);
// 给disruptor中添加消费者
disruptor.handleEventsWith(new LongEventHandler());
// 启动disruptor
disruptor.start();
//-----------万事俱备,只欠消息(消息的生产者投递消息)
// Get the ring buffer from the Disruptor to be used for publishing.
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
LongEventProducer producer = new LongEventProducer(ringBuffer);
// 跟blockqueue 比对一下
for (long l = 0; l<100_0000; l++)
{
long startAt = System.currentTimeMillis();
producer.publishData(l);
long endAt = System.currentTimeMillis();
System.out.println(endAt-startAt);
//Thread.sleep(1000);
}
}