美文网首页
Dubbo DefaultFuture 初探

Dubbo DefaultFuture 初探

作者: 谁说咖啡不苦 | 来源:发表于2019-04-02 23:07 被阅读0次

最近在学习Java并发相关的内容,偶然看到了介绍Dubbo 的DefaultFuture,之前也没有对这方面做过什么深入探索,索性这次就来一个一探究竟。

直入主题

DefaultFuture是实现自ResponseFuture,而ResponseFuture又是ExchangeChannel的一个处理的结果。

/**
 * Future. (API/SPI, Prototype, ThreadSafe)
 * 
 * @see com.alibaba.dubbo.remoting.exchange.ExchangeChannel#request(Object)
 * @see com.alibaba.dubbo.remoting.exchange.ExchangeChannel#request(Object, int)
 * @author qian.lei
 * @author william.liangf
 */
public interface ResponseFuture {

    /**
     * get result.
     * 
     * @return result.
     */
    Object get() throws RemotingException;

    /**
     * get result with the specified timeout.
     * 
     * @param timeoutInMillis timeout.
     * @return result.
     */
    Object get(int timeoutInMillis) throws RemotingException;

    /**
     * set callback.
     * 
     * @param callback
     */
    void setCallback(ResponseCallback callback);

    /**
     * check is done.
     * 
     * @return done or not.
     */
    boolean isDone();

}

这里我们顺便就从ExchangeChannel简单看下他是如何给我们ResponseFuture。在HeaderExchangeChannel中

public ResponseFuture request(Object request, int timeout) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
        }
        // create request.
        Request req = new Request();
        req.setVersion("2.0.0");
        req.setTwoWay(true);
        req.setData(request);
        DefaultFuture future = new DefaultFuture(channel, req, timeout);
        try{
            channel.send(req);
        }catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        return future;
    }

看到没,在这里他有定义了一个DefaultFuture并将该值在最后返回。由于底下的send是使用了netty进行操作,而执行到netty发送消息时,不会等待结果,直接返回,所以这里就需要了DefaultFuture来进行一个封装处理,将"异步转同步"操作,这也就是DefaultFuture的价值体现了。我们将该出的request往上层调用查询,可以看到,最初的调用来自于DubboInvoker的doInvoke。

@Override
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
        inv.setAttachment(Constants.VERSION_KEY, version);
        
        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                RpcContext.getContext().setFuture(null);
                return new RpcResult();
            } else if (isAsync) {
                ResponseFuture future = currentClient.request(inv, timeout) ;
                RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                return new RpcResult();
            } else {
                RpcContext.getContext().setFuture(null);
                return (Result) currentClient.request(inv, timeout).get();
            }
        } catch (TimeoutException e) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (RemotingException e) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
RpcContext.getContext().setFuture(null);
                return (Result) currentClient.request(inv, timeout).get();

这一步就是通过异步返回的Future进行一个get的调用。由此可以推断,将异步转为同步的实现就是在get中的内容。那么我们继续往下看:

public Object get(int timeout) throws RemotingException {
        if (timeout <= 0) {
            timeout = Constants.DEFAULT_TIMEOUT;
        }
        if (! isDone()) {
            long start = System.currentTimeMillis();
            lock.lock();
            try {
                while (! isDone()) {
                    done.await(timeout, TimeUnit.MILLISECONDS);
                    if (isDone() || System.currentTimeMillis() - start > timeout) {
                        break;
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
            if (! isDone()) {
                throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
            }
        }
        return returnFromResponse();
    }

核心功能代码就是在当中的while循环中了,这里使用Condition的await进行一个等待,通过isDone来判断结果是否已经返回。当有结果返回的时候,对Condition进行发送signal进行一个通知,释放等待返回结果。

private void doReceived(Response res) {
        lock.lock();
        try {
            response = res;
            if (done != null) {
                done.signal();
            }
        } finally {
            lock.unlock();
        }
        if (callback != null) {
            invokeCallback(callback);
        }
    }

在DefaultFuture中的操作,都是通过一个CHANNELS和FUTURES两个ConcurrentHashMap来维护Dubbo的Request的id和channel和Future的关系。(id是通过Request中创建生成

private static long newId() {
        // getAndIncrement()增长到MAX_VALUE时,再增长会变为MIN_VALUE,负数也可以做为ID
        return INVOKE_ID.getAndIncrement();
    }

现在就一个问题,这个Response是在什么时候返回? Dubbo又是如何调用到DefaultFuture的received方法,继而调用到doReceived返回结果.
这一步的调用,我们放到下一篇文章中进行一个追踪分析。

相关文章

  • Dubbo DefaultFuture 初探

    最近在学习Java并发相关的内容,偶然看到了介绍Dubbo 的DefaultFuture,之前也没有对这方面做过什...

  • Dubbo源码分析----DefaultFuture

    前面两篇文章已经分析了provider和consumer之间的通信过程,那么还有几个问题: 由于请求是异步的,pr...

  • Dubbo 传输协作线程

    Dubbo 传输协作线程-------DefaultFuture 用到了锁和同步队列1.客户端调度线程:用于发起远...

  • dubbo初探

    定义快速开始2.1 demo-provider部分2.2 demo-consumer部分2.3 启动 1.定义 D...

  • dubbo rpc 初探

    由于网络的抖动的原因,经常在cat监控当中,看到dubbo的com.alibaba.dubbo.rpc.RpcEx...

  • Dubbo Admin 初探

    Dubbo Admin介绍 在Dubbo2.6.0版本之前,Dubbo的工程内部包含有dubbo-admin的模块...

  • dubbo-mybatis初探

    前言 我方阵营项目中用到的微服务架构是spring cloud系,持久层用的是Hibernate+QueryDSL...

  • Dubbo zookeeper 初探【转】

    先把zookeeper在本地给安装好, 这里的话讲述了两个工程一个工程是提供服务的,一个工程是调用服务的,因为du...

  • Dubbo Admin 2.7.X 初探

    开篇 为了统一不同配置中心(如Zookeeper、Nacos、Etcd等)对于服务治理规则的实现,Dubbo2.7...

  • 苹果 ARKit 初探

    苹果 ARKit 初探 苹果 ARKit 初探

网友评论

      本文标题:Dubbo DefaultFuture 初探

      本文链接:https://www.haomeiwen.com/subject/iotobqtx.html