美文网首页程序员
使用Disruptor处理多请求响应

使用Disruptor处理多请求响应

作者: 简单是美美 | 来源:发表于2020-01-04 12:51 被阅读0次

1. Disruptor简介

  Disruptor是一个用于在线程间通信的高效低延时的消息组件,像个增强的队列,是LMAX 公司在开发金融交易系统中的一个关键创新。
  关于Disruptor的相关知识可以从“http://lmax-exchange.github.io/disruptor/”处获取。Disruptor是一个轻量的框架组件,其类图如下图所示:

disruptor类图
  Disruptor以RingBuffer为核心,发布、处理注册到其上的事件。初识Disruptor,我的感觉是很像nodejs的请求处理框架和windows的窗口过程的处理方式。调用方不断在Disruptor上发布事件,而Disruptor则通过预置的事件处理函数(消费者)来快速处理这些事件。幸运的是,Disruptor提供了多种灵活的消费模式,可用于解决一些常见问题。
  本文中,我们希望先通过一个使用Disruptor的应用实例先熟悉Disruptor的使用,再去剖析Disruptor的架构和实现原理。

2.问题的提出

  在分布式系统中,经常会出现同一条信令发往多个节点,汇聚这多个节点的响应后,再一并发给调用方的场景。
  使用Disruptor的消费者模型,如下图所示:


消费模型

  在这个模型中,P1是事件发布(生产)过程,即将要发送的信令发布到Disruptor上,C1和C2是第一层消费者,即将信令发送到不同的节点(C1和C2代表不同节点),C3是第二层消费者,待C1和C2都完成之后才执行,即汇聚来自不同的节点的响应并返回给调用方。

3.代码实现

  示例代码见https://github.com/solarkai/Disruptor4MultiResponseExample

3.1 SignalDisruptorService

  实现disruptor的启动,停止,事件注册等操作。
  该服务中,定义了等待信令响应的信号量全局MAP,和信令返回响应的全局MAP,如下:

@Getter
    // key为消息的唯一标识,value为本条信令对应的信号量
    private final ConcurrentHashMap<Long, CountDownLatch> cdlMap = new ConcurrentHashMap<Long, CountDownLatch>();

    @Getter
    // key为消息的唯一标识,value为本条信令对应的回应列表(来自多网)
    private final ConcurrentHashMap<Long, List<SignalResponse>> responseMap = new ConcurrentHashMap<Long, List<SignalResponse>>();

  在启动disruptor时,根据配置的节点动态构建第一层消费者(发送信令),代码如下:

if (!isDisruptorStarted) {
            Send2NodeEventHandler[] send2NodeEventHandlerArray = new Send2NodeEventHandler[nodeNameList.size()];
            for (int i = 0; i < nodeNameList.size(); i++) {
                String nodeName = nodeNameList.get(i);
                Send2NodeEventHandler handler = new Send2NodeEventHandler();
                handler.setNodeName(nodeName);
                send2NodeEventHandlerArray[i] = handler;
            }

            // 设置消费者依赖图
            disruptor.handleEventsWith(send2NodeEventHandlerArray).then(rceh);
            disruptor.start();
            isDisruptorStarted = true;
        }

3.2 事件处理定义

  信令的请求和响应通过信令的唯一标示ID字段关联。第一层消费者使用Send2NodeEventHandler定义,只完成发送信令到节点的处理,第二层消费者使用ResponseCollectionEventHandler定义,完成信令在各节点响应的汇聚工作。
  在disputor上发布一条信令事件的代码如下,发布一条信令后,同时也初始化了该条信令对应的信号量(CountDownLatch):

RingBuffer<SignalSendEvent> rb = disruptor.getRingBuffer();
        long sequence = rb.next();
        try {
            SignalSendEvent event = rb.get(sequence);
            event.setDsm(dsm);

            // 放入回调函数可访问的map
            this.cdlMap.put(dsm.getId(), new CountDownLatch(nodeNameList.size()));
            this.responseMap.put(dsm.getId(), new ArrayList<SignalResponse>());
        } finally {
            rb.publish(sequence);
        }
        return sequence;

  ResponseCollectionEventHandler在此信令对应的信号量上等待,汇聚所有响应,代码如下:

long id = event.getDsm().getId();
        // 阻塞等待所有节点对信令的响应
        CountDownLatch latch = sds.getCdlMap().get(id);
        if (null != latch) {
            log.info("start to wait response by id:{}",id);
            latch.await(SIGNAL_TIMEOUT, TimeUnit.MILLISECONDS);
            // 阻塞结束,处理所有响应
            List<SignalResponse> respList = sds.getResponseMap().get(id);
            handleRespList(respList);
            // 清除资源
            sds.getCdlMap().remove(event.getDsm().getId());
            sds.getResponseMap().remove(event.getDsm().getId());
        } else {
            log.error("event not set latch:{}", event);
        }

3.3 模拟信令响应

  使用SignalDisruptorController中的函数responseSignal模拟节点对信令的响应,代码如下:

@RequestMapping(value = "/responsesignal", method = RequestMethod.POST)
    @ApiOperation(value = "siganl响应", notes = "siganl响应")
    public long responseSignal(@RequestBody(required = true) SignalResponse sr) {
        long id = sr.getId();
        CountDownLatch cdl = sds.getCdlMap().get(id);
        if (null != cdl) {
            cdl.countDown();
        }
        List<SignalResponse> respList = sds.getResponseMap().get(id);
        if (null != respList) {
            respList.add(sr);
        }
        return id;
    }

3.4 执行结果

  执行该代码,发现在模拟一条信令发布后,disruptor执行第一层消费者,使用3个线程(代码中配了3个节点)分别模拟发送信令过程,第二层消费者(响应汇聚)在第一层消费者都执行完之后再执行,并分配到一个新的线程。
  从代码执行打印,可以看到同一个EventHandler一直在同一个线程上执行,因而在编程中需关注某个事件的EventHandler处理时间过长会阻塞后一个事件的处理。

4 小结

  通过disruptor的消费依赖图定义,的确简化了多请求响应的代码处理。后面我们会再根据执行结果分析一下Disruptor的实现原理。

相关文章

  • 使用Disruptor处理多请求响应

    1. Disruptor简介   Disruptor是一个用于在线程间通信的高效低延时的消息组件,像个增强的队列,...

  • java笔记2-servlet-处理请求与响应

    目标 怎么处理请求与响应web容器到底是怎么实现请求与响应的HttpServletRequest处理请求http请...

  • axios拦截器

      页面发送http请求,很多情况我们要对请求和其响应进行特定的处理;如果请求数非常多,单独对每一个请求进行处理会...

  • Nginx的upstream机制概述

    对于使用了upstream的nginx,在处理请求和响应数据的时候是有区别的。 在处理请求数据时,是将请求的数据包...

  • axios拦截器 vue

    页面发送http请求,很多情况我们要对请求和其响应进行特定的处理;如果请求数非常多,单独对每一个请求进行处理会变得...

  • 补习系列-springboot-mime类型处理

    目标 了解http常见的mime类型定义; 如何使用springboot 处理json请求及响应; 如何使用spr...

  • 环境CLOSE_WAIT

    今天测试环境的应用出现一直处理请求状态中1.请求首页无响应,一直处理请求状态 2.使用top查询了进程情况,没有发...

  • Disruptor 实践:整合到现有的爬虫框架

    一. Disruptor Disruptor 是一个高性能的异步处理框架。 Disruptor 是 LMAX 在线...

  • 过滤器

    过滤器 Filter 用于对用户请求进行预处理,和对请求响应进行后处理的 web 应用组件。 过滤源:请求与响应过...

  • 接口响应内容的格式

    服务端成功处理API请求的时候,响应的关键内容有: 处理API请求异常的时候,响应的关键内容有: 所以,将正常响应...

网友评论

    本文标题:使用Disruptor处理多请求响应

    本文链接:https://www.haomeiwen.com/subject/zipxactx.html