美文网首页程序员
使用Disruptor处理多请求响应

使用Disruptor处理多请求响应

作者: 简单是美美 | 来源:发表于2020-01-04 12:51 被阅读0次

    1. Disruptor简介

      Disruptor是一个用于在线程间通信的高效低延时的消息组件,像个增强的队列,是LMAX 公司在开发金融交易系统中的一个关键创新。
      关于Disruptor的相关知识可以从“http://lmax-exchange.github.io/disruptor/”处获取。Disruptor是一个轻量的框架组件,其类图如下图所示:

    disruptor类图
      Disruptor以RingBuffer为核心,发布、处理注册到其上的事件。初识Disruptor,我的感觉是很像nodejs的请求处理框架和windows的窗口过程的处理方式。调用方不断在Disruptor上发布事件,而Disruptor则通过预置的事件处理函数(消费者)来快速处理这些事件。幸运的是,Disruptor提供了多种灵活的消费模式,可用于解决一些常见问题。
      本文中,我们希望先通过一个使用Disruptor的应用实例先熟悉Disruptor的使用,再去剖析Disruptor的架构和实现原理。

    2.问题的提出

      在分布式系统中,经常会出现同一条信令发往多个节点,汇聚这多个节点的响应后,再一并发给调用方的场景。
      使用Disruptor的消费者模型,如下图所示:


    消费模型

      在这个模型中,P1是事件发布(生产)过程,即将要发送的信令发布到Disruptor上,C1和C2是第一层消费者,即将信令发送到不同的节点(C1和C2代表不同节点),C3是第二层消费者,待C1和C2都完成之后才执行,即汇聚来自不同的节点的响应并返回给调用方。

    3.代码实现

      示例代码见https://github.com/solarkai/Disruptor4MultiResponseExample

    3.1 SignalDisruptorService

      实现disruptor的启动,停止,事件注册等操作。
      该服务中,定义了等待信令响应的信号量全局MAP,和信令返回响应的全局MAP,如下:

    @Getter
        // key为消息的唯一标识,value为本条信令对应的信号量
        private final ConcurrentHashMap<Long, CountDownLatch> cdlMap = new ConcurrentHashMap<Long, CountDownLatch>();
    
        @Getter
        // key为消息的唯一标识,value为本条信令对应的回应列表(来自多网)
        private final ConcurrentHashMap<Long, List<SignalResponse>> responseMap = new ConcurrentHashMap<Long, List<SignalResponse>>();
    

      在启动disruptor时,根据配置的节点动态构建第一层消费者(发送信令),代码如下:

    if (!isDisruptorStarted) {
                Send2NodeEventHandler[] send2NodeEventHandlerArray = new Send2NodeEventHandler[nodeNameList.size()];
                for (int i = 0; i < nodeNameList.size(); i++) {
                    String nodeName = nodeNameList.get(i);
                    Send2NodeEventHandler handler = new Send2NodeEventHandler();
                    handler.setNodeName(nodeName);
                    send2NodeEventHandlerArray[i] = handler;
                }
    
                // 设置消费者依赖图
                disruptor.handleEventsWith(send2NodeEventHandlerArray).then(rceh);
                disruptor.start();
                isDisruptorStarted = true;
            }
    

    3.2 事件处理定义

      信令的请求和响应通过信令的唯一标示ID字段关联。第一层消费者使用Send2NodeEventHandler定义,只完成发送信令到节点的处理,第二层消费者使用ResponseCollectionEventHandler定义,完成信令在各节点响应的汇聚工作。
      在disputor上发布一条信令事件的代码如下,发布一条信令后,同时也初始化了该条信令对应的信号量(CountDownLatch):

    RingBuffer<SignalSendEvent> rb = disruptor.getRingBuffer();
            long sequence = rb.next();
            try {
                SignalSendEvent event = rb.get(sequence);
                event.setDsm(dsm);
    
                // 放入回调函数可访问的map
                this.cdlMap.put(dsm.getId(), new CountDownLatch(nodeNameList.size()));
                this.responseMap.put(dsm.getId(), new ArrayList<SignalResponse>());
            } finally {
                rb.publish(sequence);
            }
            return sequence;
    

      ResponseCollectionEventHandler在此信令对应的信号量上等待,汇聚所有响应,代码如下:

    long id = event.getDsm().getId();
            // 阻塞等待所有节点对信令的响应
            CountDownLatch latch = sds.getCdlMap().get(id);
            if (null != latch) {
                log.info("start to wait response by id:{}",id);
                latch.await(SIGNAL_TIMEOUT, TimeUnit.MILLISECONDS);
                // 阻塞结束,处理所有响应
                List<SignalResponse> respList = sds.getResponseMap().get(id);
                handleRespList(respList);
                // 清除资源
                sds.getCdlMap().remove(event.getDsm().getId());
                sds.getResponseMap().remove(event.getDsm().getId());
            } else {
                log.error("event not set latch:{}", event);
            }
    

    3.3 模拟信令响应

      使用SignalDisruptorController中的函数responseSignal模拟节点对信令的响应,代码如下:

    @RequestMapping(value = "/responsesignal", method = RequestMethod.POST)
        @ApiOperation(value = "siganl响应", notes = "siganl响应")
        public long responseSignal(@RequestBody(required = true) SignalResponse sr) {
            long id = sr.getId();
            CountDownLatch cdl = sds.getCdlMap().get(id);
            if (null != cdl) {
                cdl.countDown();
            }
            List<SignalResponse> respList = sds.getResponseMap().get(id);
            if (null != respList) {
                respList.add(sr);
            }
            return id;
        }
    

    3.4 执行结果

      执行该代码,发现在模拟一条信令发布后,disruptor执行第一层消费者,使用3个线程(代码中配了3个节点)分别模拟发送信令过程,第二层消费者(响应汇聚)在第一层消费者都执行完之后再执行,并分配到一个新的线程。
      从代码执行打印,可以看到同一个EventHandler一直在同一个线程上执行,因而在编程中需关注某个事件的EventHandler处理时间过长会阻塞后一个事件的处理。

    4 小结

      通过disruptor的消费依赖图定义,的确简化了多请求响应的代码处理。后面我们会再根据执行结果分析一下Disruptor的实现原理。

    相关文章

      网友评论

        本文标题:使用Disruptor处理多请求响应

        本文链接:https://www.haomeiwen.com/subject/zipxactx.html