美文网首页dubbo中间件
Dubbo 2.6.5 TimeoutException触发机制

Dubbo 2.6.5 TimeoutException触发机制

作者: 晴天哥_王志 | 来源:发表于2020-02-22 22:43 被阅读0次

    开篇

    • 这篇文章的目的是讲解在Dubbo 2.6.5版本中TimeoutException的抛出时机,比较简单,抽空看一下能够更好理解抛出的异常信息。

    DubboInvoker#doInvoke

    public class DubboInvoker<T> extends AbstractInvoker<T> {
    
        protected Result doInvoke(final Invocation invocation) throws Throwable {
            RpcInvocation inv = (RpcInvocation) invocation;
            final String methodName = RpcUtils.getMethodName(invocation);
            inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
            inv.setAttachment(Constants.VERSION_KEY, version);
    
            ExchangeClient currentClient;
            if (clients.length == 1) {
                currentClient = clients[0];
            } else {
                currentClient = clients[index.getAndIncrement() % clients.length];
            }
            try {
                boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
                boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
                int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
                if (isOneway) {
                    boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                    currentClient.send(inv, isSent);
                    RpcContext.getContext().setFuture(null);
                    return new RpcResult();
                } else if (isAsync) {
                    ResponseFuture future = currentClient.request(inv, timeout);
                    RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                    return new RpcResult();
                } else {
                    RpcContext.getContext().setFuture(null);
                    // 发送请求并同步等待结果
                    return (Result) currentClient.request(inv, timeout).get();
                }
            } catch (TimeoutException e) {
                throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
            } catch (RemotingException e) {
                throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
            }
        }
    }
    
    • DubboInvoker对象的invoke()方法在同步调用过程中会执行currentClient.request(inv, timeout).get()。
    • currentClient.request(inv, timeout)负责发送请求,get()负责同步等待结果。

    HeaderExchangeChannel#request

    final class HeaderExchangeChannel implements ExchangeChannel {
    
        public ResponseFuture request(Object request, int timeout) throws RemotingException {
            if (closed) {
                throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
            }
            // create request.
            Request req = new Request();
            req.setVersion(Version.getProtocolVersion());
            req.setTwoWay(true);
            req.setData(request);
            DefaultFuture future = new DefaultFuture(channel, req, timeout);
            try {
                channel.send(req);
            } catch (RemotingException e) {
                future.cancel();
                throw e;
            }
            return future;
        }
    }
    
    • HeaderExchangeChannel的request()方法内部创建DefaultFuture并返回。
    • DubboInvoker的get()方法执行的就是DefaultFuture的get()方法。

    DefaultFuture#get

    public class DefaultFuture implements ResponseFuture {
    
        private final Lock lock = new ReentrantLock();
        private final Condition done = lock.newCondition();
    
        public Object get(int timeout) throws RemotingException {
            if (timeout <= 0) {
                timeout = Constants.DEFAULT_TIMEOUT;
            }
            if (!isDone()) {
                long start = System.currentTimeMillis();
                lock.lock();
                try {
                    while (!isDone()) {
                        done.await(timeout, TimeUnit.MILLISECONDS);
                        if (isDone() || System.currentTimeMillis() - start > timeout) {
                            break;
                        }
                    }
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                } finally {
                    lock.unlock();
                }
                if (!isDone()) {
                    throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
                }
            }
            return returnFromResponse();
        }
    
        private void doReceived(Response res) {
            lock.lock();
            try {
                response = res;
                if (done != null) {
                    done.signal();
                }
            } finally {
                lock.unlock();
            }
            if (callback != null) {
                invokeCallback(callback);
            }
        }
    
        public boolean isDone() {
            return response != null;
        }
    
        private String getTimeoutMessage(boolean scan) {
            long nowTimestamp = System.currentTimeMillis();
            return (sent > 0 ? "Waiting server-side response timeout" : "Sending request timeout in client-side")
                    + (scan ? " by scan timer" : "") + ". start time: "
                    + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date(start))) + ", end time: "
                    + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) + ","
                    + (sent > 0 ? " client elapsed: " + (sent - start)
                    + " ms, server elapsed: " + (nowTimestamp - sent)
                    : " elapsed: " + (nowTimestamp - start)) + " ms, timeout: "
                    + timeout + " ms, request: " + request + ", channel: " + channel.getLocalAddress()
                    + " -> " + channel.getRemoteAddress();
        }
    }
    
    • DefaultFuture内部的get()方法通过Condition的await()方法实现超时等待。
    • 如果await()返回但是没有收到响应,即isDone()为false,抛出异常TimeoutException。
    • 如果provider端成功响应,doReceived()设置response的值,则isDone()为true。
    • 在isDone()返回false的情况会抛出TimeoutException异常,具体的异常信息通过getTimeoutMessage方法拼接。

    相关文章

      网友评论

        本文标题:Dubbo 2.6.5 TimeoutException触发机制

        本文链接:https://www.haomeiwen.com/subject/pkvrqhtx.html