美文网首页Netty NIO同步转异步
参考dubbo实现异步转同步方案

参考dubbo实现异步转同步方案

作者: 咦咦咦萨 | 来源:发表于2020-06-22 14:25 被阅读0次

1. 需求背景

在支付系统中,因为上游通道的差异化,有时会出现这种情形:

  1. 上游提供的接口是异步的,即上游接收请求后,立即响应处理中(中间状态),最终处理结果以异步通知的形式告知;

  2. 平台对接多家上游机构,有同步响应也有异步响应,为了标准统一,平台对下游统一封装了同步响应接口,方便下游处理业务。

此时,对于异步响应的通道,平台需要通过异步转同步功能,让下游无感知。

2. 方案简介

请求与响应流程

具体流程见上图:

    1. 客户端同步请求服务端(集群);
  • 2.1 将orderNo/uniqueId作为key/value 放入redis缓存,其中uniqueId用于异步转同步识别请求线程;

  • 2.2 服务端请求第三方服务;

(中间忽略了同步响应的中间状态响应)

    1. 第三方发起异步响应到服务端集群;

(此时,不能保证异步响应路由到源请求的节点上)

  • 4.1 按照orderNo查询redis,获取请求的uniqueId;

  • 4.2 按约定组装结果放入MQ中(此处使用RabbitMq演示);

    1. MQ向消费者广播;
  • 5.1 节点接收广播后,识别是该节点发出的请求,则进行响应处理;

  • 5.2 节点接收广播后,识别不是该节点发出的请求,则丢弃该通知;

    1. 在未超时的情况下,同步响应客户端。

3. 技术要点

参考Dubbo 2.5.x com.alibaba.dubbo.remoting.exchange.support. DefaultFuture类

主要使用 ReentrantLock解决互斥问题,使用 Condition 实现超时等待功能。对DefaultFuture做适当的简化之后,示例如下:

public class DefaultFuture {

    private static final Map<String, DefaultFuture> FUTURES  = new ConcurrentHashMap<>();

    private final String id;
    private final int timeout;
    private final Lock lock = new ReentrantLock();
    private final Condition done = lock.newCondition();
    private final long start = System.currentTimeMillis();
    private volatile Response response;
    private final Request request;

    public DefaultFuture(Request request, int timeout){
        this.id = request.getUniqueId();
        this.timeout = timeout > 0 ? timeout : Constants.DEFAULT_TIMEOUT;
        this.request = request;
        // put into waiting map.
        FUTURES.put(id, this);
    }

    /**
     * 阻塞获取响应
     * @return
     * @throws TimeoutException
     */
    public Object get() throws Exception {
        if (!isDone()) {
            long start = System.currentTimeMillis();
            lock.lock();
            try {
                while (!isDone()) {
                    // 超时等待
                    done.await(timeout, TimeUnit.MILLISECONDS);
                    // 如果有返回结果了,或者,超时了,就退出循环
                    if (isDone() || System.currentTimeMillis() - start > timeout) {
                        break;
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
            // 如果是超时了,就抛出异常
            if (!isDone()) {
                throw new TimeoutException(request.getUniqueId());
            }
        }
        // 远程服务正常返回结果,则返回给调用方
        return returnFromResponse();

    }

    private Object returnFromResponse() throws Exception {
        Response res = response;
        if (res.getStatus() == Response.OK) {
            return response.getResult();
        }
        if (res.getStatus() == Response.TIMEOUT) {
            throw new TimeoutException(res.getErrorMessage());
        }
        FUTURES.remove(id);
        throw new Exception(res.getErrorMessage());
    }

    /**
     * 是否响应
     * @return
     */
    private boolean isDone(){
        return this.response != null;
    }

    public static void received(Response response) {
        try {
            // 根据请求id从FUTURES中获取DefaultFuture,并删除
            DefaultFuture future = FUTURES.remove(response.getUniqueId());
            if (future != null) {
                future.doReceived(response);
            } else {
                log.warn("The timeout response finally returned at "
                    + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
                    + ", response " + response);
            }
        } finally {
        }
    }

    private void doReceived(Response response) {
        lock.lock();
        try {
            this.response = response;
            this.response.setStatus(Response.OK);
            if (done != null) {
                // 唤醒阻塞的线程
                done.signal();
            }
        } finally {
            lock.unlock();
        }
    }

    /**
     * 是否是正确的响应
     * @param id
     * @return
     */
    public static boolean isCorrectResponse(String id){
        return FUTURES.containsKey(id);
    }
    private int getTimeout() {
        return timeout;
    }

    private long getStartTimestamp() {
        return start;
    }

    public String getId() {
        return id;
    }

    private static class RemotingInvocationTimeoutScan implements Runnable {
        @Override
        public void run() {
            while (true) {
                try {
                    for (DefaultFuture future : FUTURES.values()) {
                        if (future == null || future.isDone()) {
                            continue;
                        }
                        if (System.currentTimeMillis() - future.getStartTimestamp() > future.getTimeout()) {
                            // create exception response.
                            Response timeoutResponse = new Response(future.getId());
                            // set timeout status.
                            timeoutResponse.setErrorMessage("响应超时");
                            // handle response.
                            DefaultFuture.received(timeoutResponse);
                        }
                    }
                    Thread.sleep(30);
                } catch (Throwable e) {
                    log.error("Exception when scan the timeout invocation of remoting.", e);
                }
            }
        }
    }

    static {
        // 自动清除超时任务
        Thread th = new Thread(new RemotingInvocationTimeoutScan(), "remoting-invocation-timeout-scan");
        th.setDaemon(true);
        th.start();
    }

}

4. 测试

  1. 测试类接收请求后,直接组装响应,将响应放入mq,然后调用DefaultFuture.get()阻塞等待响应;
  2. mq发送广播,测试项目接收后,按流程通过DefaultFuture.received()处理响应;
  3. DefaultFuture.doReceived()方法中,唤醒等待的请求线程,测试完成。

测试环境: Macbook Pro 8G + Spring boot(单实例)+ jmeter(共100个线程)

线程组 TPS RabbitMQ

相关文章

  • 参考dubbo实现异步转同步方案

    1. 需求背景 在支付系统中,因为上游通道的差异化,有时会出现这种情形: 上游提供的接口是异步的,即上游接收请求后...

  • Java中实现异步转同步的几种方式

    Java中实现异步转同步的几种方式 Android常见的异步转同步的方式是通过Callback + Handler...

  • java 手写并发框架(一)异步查询转同步的 7 种实现方式

    序言 本节将学习一下如何实现异步查询转同步的方式,共计介绍了 7 种常见的实现方式。 思维导图如下: 异步转同步 ...

  • 实现异步转同步

    极客时间-《Java并发编程实战》学习笔记 异步方法:调用方法,在方法中启动子线程异步调用:启动子线程调用方法异步...

  • Dubbo架构原理

    1 Dubbo核心功能 Remoting:远程通讯,提供对多种NIO框架抽象封装,包括“同步转异步”和“请求-响应...

  • Dubbo架构原理

    1 Dubbo核心功能 Remoting:远程通讯,提供对多种NIO框架抽象封装,包括“同步转异步”和“请求-响应...

  • Dubbo使用了CompletableFuture,实现了真异步

    Dubbo在服务调用时支持同步调用和异步调用等方式。 在Dubbo2.6版本及之前的版本在实现异步调用时存在一定的...

  • iOS 多线程基础

    转自:iOS 多线程基础 - 简书 多线程同步和异步的区别?IOS中如何实现多线程的同步? 异步:可以同时执行多条...

  • dubbo之ExtensionLoader调用

    参考 Dubbo实现原理之基于SPI思想实现Dubbo内核

  • Java进阶-Dubbo-进阶

    一、服务调用过程 1.1 服务调用方式   Dubbo 服务调用过程:   Dubbo 支持同步和异步两种调用方式...

网友评论

    本文标题:参考dubbo实现异步转同步方案

    本文链接:https://www.haomeiwen.com/subject/msotfktx.html