美文网首页dubbo
Dubbo 异步调用过程

Dubbo 异步调用过程

作者: 晴天哥_王志 | 来源:发表于2020-01-12 15:15 被阅读0次

    开篇

    • 本篇是基于Dubbo-2.6.7版本的异步调用的分析,在这个过程中会涉及异步的调用过程和响应过程的分析。
    • 文章中会有一部分简单的例子,用于讲解异步的是使用方式。

    异步调用说明

    Dubbo 异步调用过程
    • 关注userThread的行为,用户发出调用后,IOThread会在上下文RpcContext中设置Future,对应上图中步骤1.2.3。
    • 用户从RpcContext中取得Future,然后wait这个Future其它的事情都由IOThread完成,对应上图中步骤4.5。
    • server端响应后会把调用结果设置在RpcContext上下文当中,同时通知UserThread线程。

    异步回调使用案例

    <dubbo:reference id="fooService" interface="com.alibaba.foo.FooService">
          <dubbo:method name="findFoo" async="true" />
    </dubbo:reference>
    <dubbo:reference id="barService" interface="com.alibaba.bar.BarService">
          <dubbo:method name="findBar" async="true" />
    </dubbo:reference>
    
    // 此方法应该返回Foo,但异步后会立刻返回NULL
    fooService.findFoo(fooId);
    // 立刻得到当前调用的Future实例,当发生新的调用时这个东西将会被覆盖
    Future<Foo> fooFuture = RpcContext.getContext().getFuture();
     
    // 调用另一个服务的方法
    barService.findBar(barId);
    // 立刻得到当前调用的Future
    Future<Bar> barFuture = RpcContext.getContext().getFuture();
     
    // 此时,两个服务的方法在并发执行
    // 等待第一个调用完成,线程会进入Sleep状态,当调用完成后被唤醒。
    Foo foo = fooFuture.get();
    // 同上
    Bar bar = barFuture.get();
    // 假如第一个调用需要等待5秒,第二个等待6秒,则整个调用过程完成的时间是6秒。
    
    • 1、异步调用的实现步骤先执行fooService.findFoo()的执行服务调用。
    • 2、获取RPC上下文RpcContext.getContext().getFuture()。
    • 3、通过future.get()方法获取执行结果,如果当时没有结果当前线程就会被挂起。

    Dubbo异步调用栈

    • Consumer => InvokerInvocationHandler =>DubboInvoker =>HeaderExchangeClient。

    InvokerInvocationHandler

    public class InvokerInvocationHandler implements InvocationHandler {
    
        private final Invoker<?> invoker;
    
        public InvokerInvocationHandler(Invoker<?> handler) {
            this.invoker = handler;
        }
    
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            String methodName = method.getName();
            Class<?>[] parameterTypes = method.getParameterTypes();
            if (method.getDeclaringClass() == Object.class) {
                return method.invoke(invoker, args);
            }
            if ("toString".equals(methodName) && parameterTypes.length == 0) {
                return invoker.toString();
            }
            if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
                return invoker.hashCode();
            }
            if ("equals".equals(methodName) && parameterTypes.length == 1) {
                return invoker.equals(args[0]);
            }
            return invoker.invoke(new RpcInvocation(method, args)).recreate();
        }
    
    }
    
    • InvokerInvocationHandler的invoke()方法创建rpc调用的RpcInvocation对象,这个对象会在单次调用的过程中传递,相当于单次调用的上下文。

    RpcInvocation

    public class RpcInvocation implements Invocation, Serializable {
    
        private static final long serialVersionUID = -4355285085441097045L;
        // 方法名字
        private String methodName;
        // 参数类型
        private Class<?>[] parameterTypes;
        // 参数值
        private Object[] arguments;
        // 上下文透传的参数值
        private Map<String, String> attachments;
    
        private transient Invoker<?> invoker;
    
        public RpcInvocation() {
        }
    
        public RpcInvocation(Method method, Object[] arguments) {
            this(method.getName(), method.getParameterTypes(), arguments, null, null);
        }
    
        public RpcInvocation(Method method, Object[] arguments, Map<String, String> attachment) {
            this(method.getName(), method.getParameterTypes(), arguments, attachment, null);
        }
    
        public RpcInvocation(String methodName, Class<?>[] parameterTypes, Object[] arguments) {
            this(methodName, parameterTypes, arguments, null, null);
        }
    
        public RpcInvocation(String methodName, Class<?>[] parameterTypes, Object[] arguments, Map<String, String> attachments) {
            this(methodName, parameterTypes, arguments, attachments, null);
        }
    
        public RpcInvocation(String methodName, Class<?>[] parameterTypes, Object[] arguments, Map<String, String> attachments, Invoker<?> invoker) {
            this.methodName = methodName;
            this.parameterTypes = parameterTypes == null ? new Class<?>[0] : parameterTypes;
            this.arguments = arguments == null ? new Object[0] : arguments;
            this.attachments = attachments == null ? new HashMap<String, String>() : attachments;
            this.invoker = invoker;
        }
    }
    
    RpcInvocation
    • RpcInvocation的核心变量包括方法名、参数类型、参数值、附带上下文数据。
    • methodName为方法名。
    • parameterTypes为参数类型。
    • arguments为参数值。
    • attachments为附带上下文数据。

    异步调用流程

    dubbo异步调用.jpg

    DubboInvoker

    public class DubboInvoker<T> extends AbstractInvoker<T> {
        protected Result doInvoke(final Invocation invocation) throws Throwable {
            RpcInvocation inv = (RpcInvocation) invocation;
            final String methodName = RpcUtils.getMethodName(invocation);
            inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
            inv.setAttachment(Constants.VERSION_KEY, version);
    
            ExchangeClient currentClient;
            if (clients.length == 1) {
                currentClient = clients[0];
            } else {
                currentClient = clients[index.getAndIncrement() % clients.length];
            }
            try {
                // 是否异步
                boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
                // 是否单向
                boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
                // 超时时间
                int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
    
                if (isOneway) {
                    // 处理单向发送
                    boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                    currentClient.send(inv, isSent);
                    RpcContext.getContext().setFuture(null);
                    return new RpcResult();
                } else if (isAsync) {
                    // 处理异步发送
                    ResponseFuture future = currentClient.request(inv, timeout);
                    RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                    return new RpcResult();
                } else {
                    // 处理同步发送
                    RpcContext.getContext().setFuture(null);
                    return (Result) currentClient.request(inv, timeout).get();
                }
            } catch (TimeoutException e) {
                throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
            } catch (RemotingException e) {
                throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
            }
        }
    }
    
    • 异步调用执行ResponseFuture future = currentClient.request(inv, timeout)发送请求。
    • 异步调用执行RpcContext.getContext().setFuture(new FutureAdapter<Object>(future))保存future到RpcContext
    • 同步调用执行RpcContext.getContext().setFuture(null)设置RpcContext为空。
    • 同步调用执行currentClient.request(inv, timeout).get()等待同步消息结果。
    • 同步执行和异步执行的差别在于同步发送requst之后执行get()同步等待结果,异步执行发送request之后保存future到RpcContext上下文。

    HeaderExchangeChannel

    final class HeaderExchangeChannel implements ExchangeChannel {
    
        private static final Logger logger = LoggerFactory.getLogger(HeaderExchangeChannel.class);
    
        private static final String CHANNEL_KEY = HeaderExchangeChannel.class.getName() + ".CHANNEL";
    
        private final Channel channel;
    
        private volatile boolean closed = false;
    
        HeaderExchangeChannel(Channel channel) {
            if (channel == null) {
                throw new IllegalArgumentException("channel == null");
            }
            this.channel = channel;
        }
    
        public ResponseFuture request(Object request, int timeout) throws RemotingException {
            if (closed) {
                throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
            }
            // create request.
            Request req = new Request();
            req.setVersion(Version.getProtocolVersion());
            req.setTwoWay(true);
            req.setData(request);
            // 保存channel、req等信息到DefaultFuture对象当中
            DefaultFuture future = new DefaultFuture(channel, req, timeout);
            try {
                // 调用底层逻辑发送消息
                channel.send(req);
            } catch (RemotingException e) {
                future.cancel();
                throw e;
            }
            return future;
        }
    }
    
    • HeaderExchangeChannel的request()方法内部会创建Request对象,核心变量包括version、data的变量。
    • channel.send()方法中channel指的是NettyClient对象。
    • HeaderExchangeChannel.request()方法返回DefaultFuture对象,用于保存异步至上下文的RpcContext当中
    • HeaderExchangeChannel的request()方法内部创建Request对象,创建DefaultFuture对象(包含request对象),调用NettyClient.send()异步发送消息。

    Request

    public class Request {
    
        public static final String HEARTBEAT_EVENT = null;
    
        public static final String READONLY_EVENT = "R";
    
        private static final AtomicLong INVOKE_ID = new AtomicLong(0);
    
        private final long mId;
    
        private String mVersion;
    
        private boolean mTwoWay = true;
    
        private boolean mEvent = false;
    
        private boolean mBroken = false;
    
        private Object mData;
    
        public Request() {
            mId = newId();
        }
    
        public Request(long id) {
            mId = id;
        }
    
        private static long newId() {
            // getAndIncrement() When it grows to MAX_VALUE, it will grow to MIN_VALUE, and the negative can be used as ID
            return INVOKE_ID.getAndIncrement();
        }
    
        public long getId() {
            return mId;
        }
    }
    
    • Request对象的核心字段INVOKE_ID,全局静态用于记录标识request对象的唯一性。
    • Request对象的核心变量如上图所示,其中mData保存RpcInvocation对象。

    DefaultFuture

    public class DefaultFuture implements ResponseFuture {
    
        private static final Logger logger = LoggerFactory.getLogger(DefaultFuture.class);
        // 保存等待响应的Channel
        private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<Long, Channel>();
        // 保存等待响应的DefaultFuture
        private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>();
    
        static {
            // 超时检测线程
            Thread th = new Thread(new RemotingInvocationTimeoutScan(), "DubboResponseTimeoutScanTimer");
            th.setDaemon(true);
            th.start();
        }
    
        // invoke id.
        private final long id;
        private final Channel channel;
        private final Request request;
        private final int timeout;
        // 核心的lock和done字段
        private final Lock lock = new ReentrantLock();
        private final Condition done = lock.newCondition();
    
        private final long start = System.currentTimeMillis();
        private volatile long sent;
        private volatile Response response;
        private volatile ResponseCallback callback;
    
        public DefaultFuture(Channel channel, Request request, int timeout) {
            this.channel = channel;
            this.request = request;
            this.id = request.getId();
            this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            // put into waiting map.
            FUTURES.put(id, this);
            CHANNELS.put(id, channel);
        }
    
        // 超时检测线程RemotingInvocationTimeoutScan
        private static class RemotingInvocationTimeoutScan implements Runnable {
    
            @Override
            public void run() {
                while (true) {
                    try {
                        // 遍历所有保存的DefaultFuture对象检测超时
                        for (DefaultFuture future : FUTURES.values()) {
                            if (future == null || future.isDone()) {
                                continue;
                            }
    
                            // 检测超时的处理逻辑
                            if (System.currentTimeMillis() - future.getStartTimestamp() > future.getTimeout()) {
                                // 创建Response对象,唯一标识符为Request的唯一标识mId
                                // future.getId() 等价于request.getId()
                                Response timeoutResponse = new Response(future.getId());
                                // set timeout status.
                                timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);
                                timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));
                                // 调用DefaultFuture.received执行超时响应逻辑
                                DefaultFuture.received(future.getChannel(), timeoutResponse);
                            }
                        }
                        Thread.sleep(30);
                    } catch (Throwable e) {
                    }
                }
            }
        }
    
        // 处理数据接受的逻辑或者超时响应的逻辑received => doReceived
        public static void received(Channel channel, Response response) {
            try {
                DefaultFuture future = FUTURES.remove(response.getId());
                if (future != null) {
                    future.doReceived(response);
                } else {
                }
            } finally {
                CHANNELS.remove(response.getId());
            }
        }
    
        // 内部通过done.signal()方式通知等待的线程异步结果。
        private void doReceived(Response res) {
            // 通过lock来实现互斥
            lock.lock();
            try {
                response = res;
                if (done != null) {
                    // 通知等待线程
                    done.signal();
                }
            } finally {
                lock.unlock();
            }
            // 如果配置回调函数就执行回调函数callback
            if (callback != null) {
                invokeCallback(callback);
            }
        }
    
        // 以下逻辑是同步等待的逻辑
        public Object get() throws RemotingException {
            return get(timeout);
        }
    
        @Override
        public Object get(int timeout) throws RemotingException {
            if (timeout <= 0) {
                timeout = Constants.DEFAULT_TIMEOUT;
            }
            if (!isDone()) {
                long start = System.currentTimeMillis();
                lock.lock();
                try {
                    while (!isDone()) {
                        done.await(timeout, TimeUnit.MILLISECONDS);
                        if (isDone() || System.currentTimeMillis() - start > timeout) {
                            break;
                        }
                    }
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                } finally {
                    lock.unlock();
                }
                if (!isDone()) {
                    throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
                }
            }
            return returnFromResponse();
        }
    
        private Object returnFromResponse() throws RemotingException {
            Response res = response;
            if (res == null) {
                throw new IllegalStateException("response cannot be null");
            }
            if (res.getStatus() == Response.OK) {
                return res.getResult();
            }
            if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
                throw new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage());
            }
            throw new RemotingException(channel, res.getErrorMessage());
        }
    }
    
    • DefaultFuture作为异步实现的核心,本质上通过ReentrantLock来实现异步通知。
    • Lock lock = new ReentrantLock(),互斥锁用于保证单个DefaultFuture的线程安全。
    • Condition done = lock.newCondition(),用于单个DefaultFuture的唤醒通知机制。
    • DefaultFuture包含全局唯一的静态线程RemotingInvocationTimeoutScan用于扫描超时的DefaultFuture对象。
    • DefaultFuture包含静态变量FUTURES保存所有请求的DefaultFuture对象。
    • RemotingInvocationTimeoutScan扫描超时线程后会执行DefaultFuture的received => doReceived流程进行响应。
    • 正常响应返回的处理流程会执行DefaultFuture的received => doReceived流程进行响应。
    • DefaultFuture的get方法用于执行等待操作,通过done.await()方法实现。

    正常返回的处理流程

    public class HeaderExchangeHandler implements ChannelHandlerDelegate {
    
        protected static final Logger logger = LoggerFactory.getLogger(HeaderExchangeHandler.class);
    
        public static String KEY_READ_TIMESTAMP = HeartbeatHandler.KEY_READ_TIMESTAMP;
    
        public static String KEY_WRITE_TIMESTAMP = HeartbeatHandler.KEY_WRITE_TIMESTAMP;
    
        private final ExchangeHandler handler;
    
        public HeaderExchangeHandler(ExchangeHandler handler) {
            if (handler == null) {
                throw new IllegalArgumentException("handler == null");
            }
            this.handler = handler;
        }
    
        static void handleResponse(Channel channel, Response response) throws RemotingException {
            if (response != null && !response.isHeartbeat()) {
                DefaultFuture.received(channel, response);
            }
        }
    
        public void received(Channel channel, Object message) throws RemotingException {
            channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
            ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
            try {
                if (message instanceof Request) {
                    // handle request.
                    Request request = (Request) message;
                    if (request.isEvent()) {
                        handlerEvent(channel, request);
                    } else {
                        if (request.isTwoWay()) {
                            Response response = handleRequest(exchangeChannel, request);
                            channel.send(response);
                        } else {
                            handler.received(exchangeChannel, request.getData());
                        }
                    }
                } else if (message instanceof Response) {
                    handleResponse(channel, (Response) message);
                } else if (message instanceof String) {
                    if (isClientSide(channel)) {
                        Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                        logger.error(e.getMessage(), e);
                    } else {
                        String echo = handler.telnet(channel, (String) message);
                        if (echo != null && echo.length() > 0) {
                            channel.send(echo);
                        }
                    }
                } else {
                    handler.received(exchangeChannel, message);
                }
            } finally {
                HeaderExchangeChannel.removeChannelIfDisconnected(channel);
            }
        }
    }
    
    • 正常响应的时候通过received => handleResponse执行到DefaultFuture的received进行响应。

    结论

    • Dubbo的异步调用流程的底层核心借助于NettyClient的异步过程。
    • Dubbo的异步调用流程的每个请求对象Request都有唯一的标识符(使用递增的数字标识)。
    • Dubbo的异步调用流程的核心逻辑通过DefaultFuture来完成,底层逻辑是通过ReentrantLock来实现的。

    参考

    Dubbo异步调用(七)

    相关文章

      网友评论

        本文标题:Dubbo 异步调用过程

        本文链接:https://www.haomeiwen.com/subject/fczdoctx.html