美文网首页Netty NIO同步转异步
参考dubbo实现异步转同步方案

参考dubbo实现异步转同步方案

作者: 咦咦咦萨 | 来源:发表于2020-06-22 14:25 被阅读0次

    1. 需求背景

    在支付系统中,因为上游通道的差异化,有时会出现这种情形:

    1. 上游提供的接口是异步的,即上游接收请求后,立即响应处理中(中间状态),最终处理结果以异步通知的形式告知;

    2. 平台对接多家上游机构,有同步响应也有异步响应,为了标准统一,平台对下游统一封装了同步响应接口,方便下游处理业务。

    此时,对于异步响应的通道,平台需要通过异步转同步功能,让下游无感知。

    2. 方案简介

    请求与响应流程

    具体流程见上图:

      1. 客户端同步请求服务端(集群);
    • 2.1 将orderNo/uniqueId作为key/value 放入redis缓存,其中uniqueId用于异步转同步识别请求线程;

    • 2.2 服务端请求第三方服务;

    (中间忽略了同步响应的中间状态响应)

      1. 第三方发起异步响应到服务端集群;

    (此时,不能保证异步响应路由到源请求的节点上)

    • 4.1 按照orderNo查询redis,获取请求的uniqueId;

    • 4.2 按约定组装结果放入MQ中(此处使用RabbitMq演示);

      1. MQ向消费者广播;
    • 5.1 节点接收广播后,识别是该节点发出的请求,则进行响应处理;

    • 5.2 节点接收广播后,识别不是该节点发出的请求,则丢弃该通知;

      1. 在未超时的情况下,同步响应客户端。

    3. 技术要点

    参考Dubbo 2.5.x com.alibaba.dubbo.remoting.exchange.support. DefaultFuture类

    主要使用 ReentrantLock解决互斥问题,使用 Condition 实现超时等待功能。对DefaultFuture做适当的简化之后,示例如下:

    public class DefaultFuture {
    
        private static final Map<String, DefaultFuture> FUTURES  = new ConcurrentHashMap<>();
    
        private final String id;
        private final int timeout;
        private final Lock lock = new ReentrantLock();
        private final Condition done = lock.newCondition();
        private final long start = System.currentTimeMillis();
        private volatile Response response;
        private final Request request;
    
        public DefaultFuture(Request request, int timeout){
            this.id = request.getUniqueId();
            this.timeout = timeout > 0 ? timeout : Constants.DEFAULT_TIMEOUT;
            this.request = request;
            // put into waiting map.
            FUTURES.put(id, this);
        }
    
        /**
         * 阻塞获取响应
         * @return
         * @throws TimeoutException
         */
        public Object get() throws Exception {
            if (!isDone()) {
                long start = System.currentTimeMillis();
                lock.lock();
                try {
                    while (!isDone()) {
                        // 超时等待
                        done.await(timeout, TimeUnit.MILLISECONDS);
                        // 如果有返回结果了,或者,超时了,就退出循环
                        if (isDone() || System.currentTimeMillis() - start > timeout) {
                            break;
                        }
                    }
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                } finally {
                    lock.unlock();
                }
                // 如果是超时了,就抛出异常
                if (!isDone()) {
                    throw new TimeoutException(request.getUniqueId());
                }
            }
            // 远程服务正常返回结果,则返回给调用方
            return returnFromResponse();
    
        }
    
        private Object returnFromResponse() throws Exception {
            Response res = response;
            if (res.getStatus() == Response.OK) {
                return response.getResult();
            }
            if (res.getStatus() == Response.TIMEOUT) {
                throw new TimeoutException(res.getErrorMessage());
            }
            FUTURES.remove(id);
            throw new Exception(res.getErrorMessage());
        }
    
        /**
         * 是否响应
         * @return
         */
        private boolean isDone(){
            return this.response != null;
        }
    
        public static void received(Response response) {
            try {
                // 根据请求id从FUTURES中获取DefaultFuture,并删除
                DefaultFuture future = FUTURES.remove(response.getUniqueId());
                if (future != null) {
                    future.doReceived(response);
                } else {
                    log.warn("The timeout response finally returned at "
                        + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
                        + ", response " + response);
                }
            } finally {
            }
        }
    
        private void doReceived(Response response) {
            lock.lock();
            try {
                this.response = response;
                this.response.setStatus(Response.OK);
                if (done != null) {
                    // 唤醒阻塞的线程
                    done.signal();
                }
            } finally {
                lock.unlock();
            }
        }
    
        /**
         * 是否是正确的响应
         * @param id
         * @return
         */
        public static boolean isCorrectResponse(String id){
            return FUTURES.containsKey(id);
        }
        private int getTimeout() {
            return timeout;
        }
    
        private long getStartTimestamp() {
            return start;
        }
    
        public String getId() {
            return id;
        }
    
        private static class RemotingInvocationTimeoutScan implements Runnable {
            @Override
            public void run() {
                while (true) {
                    try {
                        for (DefaultFuture future : FUTURES.values()) {
                            if (future == null || future.isDone()) {
                                continue;
                            }
                            if (System.currentTimeMillis() - future.getStartTimestamp() > future.getTimeout()) {
                                // create exception response.
                                Response timeoutResponse = new Response(future.getId());
                                // set timeout status.
                                timeoutResponse.setErrorMessage("响应超时");
                                // handle response.
                                DefaultFuture.received(timeoutResponse);
                            }
                        }
                        Thread.sleep(30);
                    } catch (Throwable e) {
                        log.error("Exception when scan the timeout invocation of remoting.", e);
                    }
                }
            }
        }
    
        static {
            // 自动清除超时任务
            Thread th = new Thread(new RemotingInvocationTimeoutScan(), "remoting-invocation-timeout-scan");
            th.setDaemon(true);
            th.start();
        }
    
    }
    

    4. 测试

    1. 测试类接收请求后,直接组装响应,将响应放入mq,然后调用DefaultFuture.get()阻塞等待响应;
    2. mq发送广播,测试项目接收后,按流程通过DefaultFuture.received()处理响应;
    3. DefaultFuture.doReceived()方法中,唤醒等待的请求线程,测试完成。

    测试环境: Macbook Pro 8G + Spring boot(单实例)+ jmeter(共100个线程)

    线程组 TPS RabbitMQ

    相关文章

      网友评论

        本文标题:参考dubbo实现异步转同步方案

        本文链接:https://www.haomeiwen.com/subject/msotfktx.html