最近在学习Java并发相关的内容,偶然看到了介绍Dubbo 的DefaultFuture,之前也没有对这方面做过什么深入探索,索性这次就来一个一探究竟。
直入主题
DefaultFuture是实现自ResponseFuture,而ResponseFuture又是ExchangeChannel的一个处理的结果。
/**
* Future. (API/SPI, Prototype, ThreadSafe)
*
* @see com.alibaba.dubbo.remoting.exchange.ExchangeChannel#request(Object)
* @see com.alibaba.dubbo.remoting.exchange.ExchangeChannel#request(Object, int)
* @author qian.lei
* @author william.liangf
*/
public interface ResponseFuture {
/**
* get result.
*
* @return result.
*/
Object get() throws RemotingException;
/**
* get result with the specified timeout.
*
* @param timeoutInMillis timeout.
* @return result.
*/
Object get(int timeoutInMillis) throws RemotingException;
/**
* set callback.
*
* @param callback
*/
void setCallback(ResponseCallback callback);
/**
* check is done.
*
* @return done or not.
*/
boolean isDone();
}
这里我们顺便就从ExchangeChannel简单看下他是如何给我们ResponseFuture。在HeaderExchangeChannel中
public ResponseFuture request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
// create request.
Request req = new Request();
req.setVersion("2.0.0");
req.setTwoWay(true);
req.setData(request);
DefaultFuture future = new DefaultFuture(channel, req, timeout);
try{
channel.send(req);
}catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
看到没,在这里他有定义了一个DefaultFuture并将该值在最后返回。由于底下的send是使用了netty进行操作,而执行到netty发送消息时,不会等待结果,直接返回,所以这里就需要了DefaultFuture来进行一个封装处理,将"异步转同步"操作,这也就是DefaultFuture的价值体现了。我们将该出的request往上层调用查询,可以看到,最初的调用来自于DubboInvoker的doInvoke。
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {
ResponseFuture future = currentClient.request(inv, timeout) ;
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
} else {
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
这一步就是通过异步返回的Future进行一个get的调用。由此可以推断,将异步转为同步的实现就是在get中的内容。那么我们继续往下看:
public Object get(int timeout) throws RemotingException {
if (timeout <= 0) {
timeout = Constants.DEFAULT_TIMEOUT;
}
if (! isDone()) {
long start = System.currentTimeMillis();
lock.lock();
try {
while (! isDone()) {
done.await(timeout, TimeUnit.MILLISECONDS);
if (isDone() || System.currentTimeMillis() - start > timeout) {
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
if (! isDone()) {
throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
}
}
return returnFromResponse();
}
核心功能代码就是在当中的while循环中了,这里使用Condition的await进行一个等待,通过isDone来判断结果是否已经返回。当有结果返回的时候,对Condition进行发送signal进行一个通知,释放等待返回结果。
private void doReceived(Response res) {
lock.lock();
try {
response = res;
if (done != null) {
done.signal();
}
} finally {
lock.unlock();
}
if (callback != null) {
invokeCallback(callback);
}
}
在DefaultFuture中的操作,都是通过一个CHANNELS和FUTURES两个ConcurrentHashMap来维护Dubbo的Request的id和channel和Future的关系。(id是通过Request中创建生成
private static long newId() {
// getAndIncrement()增长到MAX_VALUE时,再增长会变为MIN_VALUE,负数也可以做为ID
return INVOKE_ID.getAndIncrement();
}
)
现在就一个问题,这个Response是在什么时候返回? Dubbo又是如何调用到DefaultFuture的received方法,继而调用到doReceived返回结果.
这一步的调用,我们放到下一篇文章中进行一个追踪分析。
网友评论