一、简介
LMAX是一家外汇黄金交易所,Disruptor是由LMAX公司开发的可信消息传递架构的一部分
以便用非常快速的方法来在多组件之间传递数据。
核心思想是理解并适应硬件工作方式来达到最优的效果。
github地址:https://github.com/LMAX-Exchange/disruptor
LMAX架构:https://martinfowler.com/articles/lmax.html
二、架构图
结构图三、成员
sequencer:序列号分配
sequence:序号,自增不减
MultiProducerSequencer 多生产者序列分配器
SingleProducerSequencer 单生产者序列分配器
ProcessingSequenceBarrier 管理消费者和生产者的依赖关系,
Ring Buffer:负责存储和更新事件的数据
Sequence Barrier:由Sequencer生成,它包含此Sequencer发布的Sequence指针以及依赖的其它消费者的Sequence。 它包含为消费者检查是否有可用的事件的代码逻辑。
Wait Strategy: 消费者等待事件的策略, 这些事件由生产者放入。
Event:传递的事件,完全有用户定义
EventProcessor:处理事件的主要循环,包含一个Sequence。有一个具体的实现类BatchEventProcessor.
EventHandler: 用户实现的接口,代表一个消费者
Producer:生产者,先获得占位,然后提交事件。
四、示例
maven依赖
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.6</version>
</dependency>
消息体:
import lombok.Data;
/**
* Created by yangzaining on 2020-04-06.
*/
@Data
public class LongEvent {
private Long id;
}
消息工厂
import com.lmax.disruptor.EventFactory;
import lombok.Data;
/**
* Created by yangzaining on 2020-04-06.
*/
@Data
public class LongEventFactory implements EventFactory<LongEvent> {
@Override
public LongEvent newInstance() {
return new LongEvent();
}
}
消费者
import com.lmax.disruptor.EventHandler;
import lombok.extern.slf4j.Slf4j;
/**
* Created by yangzaining on 2020-04-06.
*/
@Slf4j
public class LongEventHandler implements EventHandler<LongEvent> {
@Override
public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
log.info("event = {},sequence = {}, endOfBatch = {}", event.getId(), sequence, endOfBatch);
}
}
生产者
package demo;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.util.DaemonThreadFactory;
/**
* Created by yangzaining on 2020-04-06.
*/
public class LongEventPusher {
private RingBuffer<LongEvent> ringBuffer;
LongEventPusher(RingBuffer<LongEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void push(Long id) {
long sequence = ringBuffer.next();
try {
LongEvent event = ringBuffer.get(sequence);
event.setId(id);
} finally {
ringBuffer.publish(sequence);
}
}
}
public static void main(String[] args) throws InterruptedException {
LongEventFactory factory = new LongEventFactory();
int bufferSize = 1024;
Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE);
disruptor.handleEventsWith(new LongEventHandler(), new LongEventHandler());
disruptor.start();
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
LongEventPusher pusher = new LongEventPusher(ringBuffer);//java 8可以用pushEvent
for (long l = 0; l < 100; l++) {
// long finalL = l;
// disruptor.publishEvent((eventWrapper, sequence) -> eventWrapper.setId(finalL));
pusher.push(l);
}
Thread.sleep(10000);
}
producer.png
/**
* @see Sequencer#next(int)
*/
@Override
public long next(int n)
{
if (n < 1)
{
throw new IllegalArgumentException("n must be > 0");
}
long current;
long next;
do
{
current = cursor.get();
next = current + n;
long wrapPoint = next - bufferSize;
long cachedGatingSequence = gatingSequenceCache.get();
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
{
long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
if (wrapPoint > gatingSequence)
{
waitStrategy.signalAllWhenBlocking();
LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
continue;
}
gatingSequenceCache.set(gatingSequence);
}
else if (cursor.compareAndSet(current, next))
{
break;
}
}
while (true);
return next;
}
consumer.png
while (true)
{
try
{
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
while (nextSequence <= availableSequence)
{
event = dataProvider.get(nextSequence);
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
nextSequence++;
}
sequence.set(availableSequence);
}
catch (final TimeoutException e)
{
notifyTimeout(sequence.get());
}
消费图解.png
注:以上全凭自己对Disruptor的理解,有不对的地方欢迎指正
网友评论