模式
1.发布订阅模式,同一事件会被多个消费者并行消费
2.点对点模式,同一事件会被一组消费者其中之一消费
3.顺序消费;
使用场景
低延迟,高吞吐量,有界的缓存队列
提高吞吐量,减少并发执行上下文之间的延迟并确保可预测延迟
为什么RingBuffer这么快?
1.首先是CPU false sharing的解决,Disruptor通过将基本对象填充冗余基本类型变量来填充满整个缓存行,减少false sharing的概率,这部分没怎么看懂,Disruptor通过填充失效这个效果。
(就是一个缓存行8个变量,预设7个变量,然后再保存一个唯一变量,这样就不会出现相同的变量)
2.无锁队列的实现,对于传统并发队列,至少要维护两个指针,一个头指针和一个尾指针。在并发访问修改时,头指针和尾指针的维护不可避免的应用了锁。Disruptor由于是环状队列,对于Producer而言只有头指针而且锁是乐观锁,在标准Disruptor应用中,只有一个生产者,避免了头指针锁的争用。所以我们可以理解Disruptor为无锁队列。
为什么要用Disruptor?
锁的成本: 传统阻塞队列使用锁保证线程安全。而锁通过操作系统内核的上下文切换实现,会暂停线程去等待锁直到释放。执行这样的上下文切换,会丢失之前保存的数据和指令。由于消费者和生产者之间的速度差异,队列总是接近满或者空的状态。这种状态会导致高水平的写入争用。
伪共享问题导致的性能低下。
队列是垃圾的重要来源,队列中的元素和用于存储元素的节点对象需要进行频繁的重新分配。
代码demo
public class MessageEvent<T> {
private T message;
public T getMessage() {
return message;
}
public void setMessage(T message) {
this.message = message;
}
}
public class MessageEventFactory implements EventFactory<MessageEvent> {
@Override
public MessageEvent newInstance() {
return new MessageEvent();
}
}
public class MessageEvenHandler3 implements EventHandler<MessageEvent> {
@Override
public void onEvent(MessageEvent messageEvent, long l, boolean b) throws Exception {
System.out.println("----------------"+messageEvent.getMessage());
}
}
public class MessageEventProducer {
private RingBuffer<MessageEvent> ringBuffer;
public MessageEventProducer(RingBuffer<MessageEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void onData(String message) {
EventTranslatorOneArg<MessageEvent, String> translator = new MessageEventTranslator();
ringBuffer.publishEvent(translator, message);
}
}
public class MessageEventTranslator implements EventTranslatorOneArg<MessageEvent,String> {
@Override
public void translateTo(MessageEvent messageEvent, long l, String o2) {
messageEvent.setMessage(o2);
}
}
public class MessageExceptionHandler implements ExceptionHandler {
@Override
public void handleEventException(Throwable throwable, long l, Object o) {
throwable.printStackTrace();
}
@Override
public void handleOnStartException(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void handleOnShutdownException(Throwable throwable) {
throwable.printStackTrace();
}
}
public class MessageThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
return new Thread(r,"Simple Disruptor Test Thread");
}
}
public class MessageConsumer {
public static void main(String[] args) {
String message = "Hello Disruptor!";
int ringBufferSize = 1024;//必须是2的N次方
Disruptor<MessageEvent> disruptor = new Disruptor<MessageEvent>(new MessageEventFactory(),ringBufferSize,new MessageThreadFactory(), ProducerType.SINGLE,new BlockingWaitStrategy());
//这里用的是单一生成者,如果是多生成者的话是另一种模式,自己的类实现WorkHandler接口,
//然后这边调用 disruptor.handleEventsWithWorkerPool(new MessageEventHandler());
disruptor.handleEventsWith(new MessageEvenHandler3());
disruptor.setDefaultExceptionHandler(new MessageExceptionHandler());
RingBuffer<MessageEvent> ringBuffer = disruptor.start();
MessageEventProducer producer = new MessageEventProducer(ringBuffer);
IntStream.range(0,20).forEach(x->{
producer.onData(x+message);
});
}
}
下面是实现WorkHandler接口的类
public class MessageEventHandler implements WorkHandler<MessageEvent> {
@Override
public void onEvent(MessageEvent messageEvent) throws Exception {
System.out.println(System.currentTimeMillis()+"------我是1号消费者----------"+messageEvent.getMessage());
}
}
网友评论