1. Disruptor简介
Disruptor是一个用于在线程间通信的高效低延时的消息组件,像个增强的队列,是LMAX 公司在开发金融交易系统中的一个关键创新。
关于Disruptor的相关知识可以从“http://lmax-exchange.github.io/disruptor/”处获取。Disruptor是一个轻量的框架组件,其类图如下图所示:
Disruptor以RingBuffer为核心,发布、处理注册到其上的事件。初识Disruptor,我的感觉是很像nodejs的请求处理框架和windows的窗口过程的处理方式。调用方不断在Disruptor上发布事件,而Disruptor则通过预置的事件处理函数(消费者)来快速处理这些事件。幸运的是,Disruptor提供了多种灵活的消费模式,可用于解决一些常见问题。
本文中,我们希望先通过一个使用Disruptor的应用实例先熟悉Disruptor的使用,再去剖析Disruptor的架构和实现原理。
2.问题的提出
在分布式系统中,经常会出现同一条信令发往多个节点,汇聚这多个节点的响应后,再一并发给调用方的场景。
使用Disruptor的消费者模型,如下图所示:
消费模型
在这个模型中,P1是事件发布(生产)过程,即将要发送的信令发布到Disruptor上,C1和C2是第一层消费者,即将信令发送到不同的节点(C1和C2代表不同节点),C3是第二层消费者,待C1和C2都完成之后才执行,即汇聚来自不同的节点的响应并返回给调用方。
3.代码实现
示例代码见https://github.com/solarkai/Disruptor4MultiResponseExample。
3.1 SignalDisruptorService
实现disruptor的启动,停止,事件注册等操作。
该服务中,定义了等待信令响应的信号量全局MAP,和信令返回响应的全局MAP,如下:
@Getter
// key为消息的唯一标识,value为本条信令对应的信号量
private final ConcurrentHashMap<Long, CountDownLatch> cdlMap = new ConcurrentHashMap<Long, CountDownLatch>();
@Getter
// key为消息的唯一标识,value为本条信令对应的回应列表(来自多网)
private final ConcurrentHashMap<Long, List<SignalResponse>> responseMap = new ConcurrentHashMap<Long, List<SignalResponse>>();
在启动disruptor时,根据配置的节点动态构建第一层消费者(发送信令),代码如下:
if (!isDisruptorStarted) {
Send2NodeEventHandler[] send2NodeEventHandlerArray = new Send2NodeEventHandler[nodeNameList.size()];
for (int i = 0; i < nodeNameList.size(); i++) {
String nodeName = nodeNameList.get(i);
Send2NodeEventHandler handler = new Send2NodeEventHandler();
handler.setNodeName(nodeName);
send2NodeEventHandlerArray[i] = handler;
}
// 设置消费者依赖图
disruptor.handleEventsWith(send2NodeEventHandlerArray).then(rceh);
disruptor.start();
isDisruptorStarted = true;
}
3.2 事件处理定义
信令的请求和响应通过信令的唯一标示ID字段关联。第一层消费者使用Send2NodeEventHandler定义,只完成发送信令到节点的处理,第二层消费者使用ResponseCollectionEventHandler定义,完成信令在各节点响应的汇聚工作。
在disputor上发布一条信令事件的代码如下,发布一条信令后,同时也初始化了该条信令对应的信号量(CountDownLatch):
RingBuffer<SignalSendEvent> rb = disruptor.getRingBuffer();
long sequence = rb.next();
try {
SignalSendEvent event = rb.get(sequence);
event.setDsm(dsm);
// 放入回调函数可访问的map
this.cdlMap.put(dsm.getId(), new CountDownLatch(nodeNameList.size()));
this.responseMap.put(dsm.getId(), new ArrayList<SignalResponse>());
} finally {
rb.publish(sequence);
}
return sequence;
ResponseCollectionEventHandler在此信令对应的信号量上等待,汇聚所有响应,代码如下:
long id = event.getDsm().getId();
// 阻塞等待所有节点对信令的响应
CountDownLatch latch = sds.getCdlMap().get(id);
if (null != latch) {
log.info("start to wait response by id:{}",id);
latch.await(SIGNAL_TIMEOUT, TimeUnit.MILLISECONDS);
// 阻塞结束,处理所有响应
List<SignalResponse> respList = sds.getResponseMap().get(id);
handleRespList(respList);
// 清除资源
sds.getCdlMap().remove(event.getDsm().getId());
sds.getResponseMap().remove(event.getDsm().getId());
} else {
log.error("event not set latch:{}", event);
}
3.3 模拟信令响应
使用SignalDisruptorController中的函数responseSignal模拟节点对信令的响应,代码如下:
@RequestMapping(value = "/responsesignal", method = RequestMethod.POST)
@ApiOperation(value = "siganl响应", notes = "siganl响应")
public long responseSignal(@RequestBody(required = true) SignalResponse sr) {
long id = sr.getId();
CountDownLatch cdl = sds.getCdlMap().get(id);
if (null != cdl) {
cdl.countDown();
}
List<SignalResponse> respList = sds.getResponseMap().get(id);
if (null != respList) {
respList.add(sr);
}
return id;
}
3.4 执行结果
执行该代码,发现在模拟一条信令发布后,disruptor执行第一层消费者,使用3个线程(代码中配了3个节点)分别模拟发送信令过程,第二层消费者(响应汇聚)在第一层消费者都执行完之后再执行,并分配到一个新的线程。
从代码执行打印,可以看到同一个EventHandler一直在同一个线程上执行,因而在编程中需关注某个事件的EventHandler处理时间过长会阻塞后一个事件的处理。
4 小结
通过disruptor的消费依赖图定义,的确简化了多请求响应的代码处理。后面我们会再根据执行结果分析一下Disruptor的实现原理。
网友评论