pigeon源码分析-同步调用和异步调用
Pigeon是美团点评内部广泛使用的一个分布式服务通信框架(RPC),本文所有分析基于 pigeon 开源版本:RPC framework of DIANPING,具体版本为:2.9.12-SNAPSHOT
概要:先从调用方视角 RPC 框架基本原理,然后介绍动态代理的生成,从这里切入之后再详细介绍同步、异步调用;
RPC就是要在分布式场景下完成这样一个动作: Result result = service.call(args)
,在这个=
等号背后 RPC 框架做了很多事。
RPC 调用在客户端视角下的分解
简单来说可以分成这几步:
- Client client = findClient(); // 这一步主要是服务治理相关,本文不关注;
- client.write(request);
- Object resp = waitResponse();
- Result result = convert2Result(resp);
pigeon 在客户端是如何生成代理的?
远程调用离不开动态代理,其实 pigeon service 在客户端视角就是生成了一个被调用接口的实现类,通过网络请求来返回数据,这样就好像实现类在本地一样;
生成代理的逻辑:
在 bean 初始化的时候,获取 Proxy 实例,代码如下:
// com.dianping.pigeon.remoting.invoker.config.spring.ReferenceBean#init
public void init() throws Exception {
// 进行初始化配置
this.obj = ServiceFactory.getService(invokerConfig);
// 略
}
使用 JDK 动态代理,真正执行的逻辑都在 java.lang.reflect.InvocationHandler
中,
可以看到 invoke() 时执行的是 com.dianping.pigeon.remoting.invoker.service.ServiceInvocationProxy#extractResult
,执行的逻辑是一个 FilterChain,成员见 com.dianping.pigeon.remoting.invoker.process.InvokerProcessHandlerFactory#init
,每个 InvocationInvokeFilter 的作用基本见名知意,完成一些统计、上下文准备、监控等功能;
终于到了 RemoteCallInvokeFilter,这是执行远程调用最关键的一步,主要代码如下:
//com.dianping.pigeon.remoting.invoker.process.filter.RemoteCallInvokeFilter#invoke
@Override
public InvocationResponse invoke(ServiceInvocationHandler handler, InvokerContext invocationContext)
throws Throwable {
// 略
try {
switch (callMethod) {
case SYNC:
// do call
break;
case CALLBACK:
// do call
break;
case FUTURE:
//do call
break;
case ONEWAY:
//do call
break;
default:
throw new BadRequestException("Call type[" + callMethod.getName() + "] is not supported!");
}
((DefaultInvokerContext)invocationContext).setResponse(response);
afterInvoke(invocationContext);
} catch (Throwable t) {
afterThrowing(invocationContext, t);
throw t;
}
return response;
}
可以看到,pigeon 支持四种调用类型:sync, callback, future, oneway
这里主要分析 sync 和 future,分别对应一般意义上的同步、异步调用模式,其他两种是在这两种上做了特殊处理;
SYNC-同步调用
同步调用,主要逻辑:
case SYNC:
CallbackFuture future = new CallbackFuture();
response = InvokerUtils.sendRequest(client, invocationContext.getRequest(), future);
invocationContext.getTimeline().add(new TimePoint(TimePhase.Q));
if (response == null) {
response = future.getResponse(request.getTimeout());
}
break;
先初始化了一个 callbackFuture,之后 sendRequest(),最后获取 response
进一步查看 sendRequest() 实现:
com.dianping.pigeon.remoting.invoker.util.InvokerUtils#sendRequest(com.dianping.pigeon.remoting.invoker.Client, com.dianping.pigeon.remoting.common.domain.InvocationRequest, com.dianping.pigeon.remoting.invoker.concurrent.Callback)
--> com.dianping.pigeon.remoting.invoker.AbstractClient#write
实际的 doWrite() 有两种实现,HTTP / netty,这里看 NettyClient
--> com.dianping.pigeon.remoting.netty.invoker.NettyClient#doWrite
// 调用 netty client 写请求
@Override
public InvocationResponse doWrite(InvocationRequest request) throws NetworkException {
NettyChannel channel = null;
try {
channel = channelPool.selectChannel();
ChannelFuture future = channel.write0(request);
afterWrite(request, channel);
if (request.getMessageType() == Constants.MESSAGE_TYPE_SERVICE
|| request.getMessageType() == Constants.MESSAGE_TYPE_HEART) {
future.addListener(new MessageWriteListener(request, channel));
}
} catch (Exception e) {
throw new NetworkException("[doRequest] remote call failed:" + request, e);
}
return null;
}
仔细看这里永远返回 null,然后再看 sync 调用执行的下一步,就是 response = future.getResponse(request.getTimeout());
, 判断 sendRequest() 返回结果为 null,就从 future 获取结果,这两步就联系起来了;
而 getResponse(timeout) 所做的事情只有一件:while 循环等待到 isDone == true || timeout,这里不详细展示,看代码即可;
拓展一下,这里只做等待,那么返回值从哪里来呢?答案是等待 socket 的另一端也就是 server 回写;
回过头来看,remoteCallInvocationFilter 所做的就是写请求,然后等待接受返回值;
FUTURE-异步调用
Future 调用是 pigeon 提供的标准异步调用模式。使用方式如下:
//调用ServiceA的method1
serviceA.method1("aaa");
//获取ServiceA的method1调用future状态
Future future1OfServiceA = InvokerHelper.getFuture();
//调用ServiceA的method2
serviceA.method2("bbb");
//获取ServiceA的method2调用future状态
Future future2OfServiceA = InvokerHelper.getFuture();
//获取ServiceA的method2调用结果
Object result2OfServiceA = future2OfServiceA.get();
//获取ServiceA的method1调用结果
Object result1OfServiceA = future1OfServiceA.get();
简单来说就是:调用 —> 获取 Future —> 获取结果。
因为每次调用不用阻塞等待结果,而是拿到 Future 实例之后由开发者在合适的时机获取返回值,所以合理使用能提高并发性;
来看下实现:
case FUTURE:
ServiceFutureImpl futureImpl = new ServiceFutureImpl(invocationContext, request.getTimeout());
InvokerUtils.sendRequest(client, invocationContext.getRequest(), futureImpl);
FutureFactory.setFuture(futureImpl);
response = InvokerUtils.createFutureResponse(futureImpl);
invocationContext.getTimeline().add(new TimePoint(TimePhase.Q));
break;
可以看到只有 sendRequest(),没有等待返回,开发文档中有介绍,Future 调用之后需要开发者自行通过 future.get(timeout);
获取返回值;
FutureFactory.setFuture(futureImpl);
这一步做了什么?
public class FutureFactory {
private static ThreadLocal<Future<?>> threadFuture = new ThreadLocal<Future<?>>();
public static void setFuture(Future<?> future) {
threadFuture.set(future);
}
其实就是将 Future 实例设置到 ThreadLocal 中,所以每次调用结束之后立即通过 Future future1OfServiceA = InvokerHelper.getFuture();
获取Future实例,否则如果紧接着进行了下一次future调用,就会因为ThreadLocal 中的 future 被覆盖而无法正确获取返回值;
思考 :返回值的 Future 是什么作用?
答案:其实 pigeon 中的 Future 实现了 JDK 的标准语义,即:A Future represents the result of an asynchronous computation .
代表了一次异步计算的结果。简单来说,就是一个暂时代表结果的占位符。
需要结果的时候就去看看有没有 response 了,有就返回,没有就阻塞到超时时间到;
pigeon 同步调用与异步调用处理有哪些不同
同步、异步,都是先发送完请求,然后等待结果。
SYNC 模式下,立即等待直到拿到结果或者超时,然后将结果(超时情况下则是异常)返回给调用方;
FUTURE 模式下,立即返回个占位符 Future,这里的实现类是:com.dianping.pigeon.remoting.invoker.concurrent.ServiceFutureImpl#ServiceFutureImpl
,将真正获取返回值的时机交给调用方控制;
这里也因为 Future 模式实现时使用了 ThreadLocal 暂时存储占位符,所以多次 Future 调用,一定要顺序获取结果;
网友评论