接上文,在客户端发出请求后,首先处理的自然是netty,在IO处理之后,就进入业务处理NettyServerHandler。最终的处理任务就落在了RequestThreadPoolProcessor这个类身上,主要方法是doProcessRequest
public Future<InvocationResponse> doProcessRequest(final InvocationRequest request,
final ProviderContext providerContext) {
requestContextMap.put(request, providerContext);
doMonitorData(request, providerContext);
Callable<InvocationResponse> requestExecutor = new Callable<InvocationResponse>() {
@Override
public InvocationResponse call() throws Exception {
providerContext.getTimeline().add(new TimePoint(TimePhase.T));
try {
ServiceInvocationHandler invocationHandler = ProviderProcessHandlerFactory
.selectInvocationHandler(providerContext.getRequest().getMessageType());
if (invocationHandler != null) {
providerContext.setThread(Thread.currentThread());
return invocationHandler.handle(providerContext);
}
} catch (Throwable t) {
logger.error("Process request failed with invocation handler, you should never be here.", t);
} finally {
requestContextMap.remove(request);
}
return null;
}
};
final ThreadPool pool = selectThreadPool(request);
try {
checkRequest(pool, request);
providerContext.getTimeline().add(new TimePoint(TimePhase.T));
return pool.submit(requestExecutor);
} catch (RejectedExecutionException e) {
requestContextMap.remove(request);
throw new RejectedException(getProcessorStatistics(pool), e);
}
}
requestExecutor是整个请求的执行器,内部和请求一样,通过chain-filter来处理。selectThreadPool方法是来选择一个线程池来处理请求。pigeon提供了方法级别的线程池、服务级别的线程池和用户自定义的线程池。默认是使用共享线程池和慢速线程池。为什么会有这么多种线程池呢?比如默认提供的两种,是考虑如果有请求比较缓慢,那么会将请求扔到慢速线程池,这样不会因为几个慢速task,堵死整个线程池。同理,开发人员也可以用自己的策略来使用不同的线程池处理task。
接下来,看看服务端的filter
registerBizProcessFilter(new TraceFilter());
if (Constants.MONITOR_ENABLE) {
registerBizProcessFilter(new MonitorProcessFilter());
}
registerBizProcessFilter(new WriteResponseProcessFilter());
registerBizProcessFilter(new ContextTransferProcessFilter());
registerBizProcessFilter(new ExceptionProcessFilter());
registerBizProcessFilter(new SecurityFilter());
registerBizProcessFilter(new GatewayProcessFilter());
registerBizProcessFilter(new BusinessProcessFilter());
bizInvocationHandler = createInvocationHandler(bizProcessFilters);
TraceFilter和客户端类似。MonitorProcessFilter是用来处理监控的,这个要看监控开关是否打开。WriteResponseProcessFilter是利用netty将结果返回给客户端。ContextTransferProcessFilter是用来处理请求参数的。ExceptionProcessFilter是用来处理在调用过程中发生的异常,之前的版本是没有这个filter的,就会导致如果服务端的代码没有捕获异常直接抛出,异常信息没有办法正常的传递给客户端。SecurityFilter是安全相关的filter,包括黑名单、白名单、请求秘钥等等。GatewayProcessFilter处理了一些请求数拦截的事情,比如可以设置某个服务最多请求多少次之类的。BusinessProcessFilter核心就是通过反射直接调用请求的方法,完成远程方法调用。
以上两篇就是整个RPC的请求和应答处理的过程,其实比较简单,后面两遍针对一些特定的问题,讲述下pigeon是用了哪些方法解决的,主要有一些很有用的小技巧,赶脚看了源码之后学到不少。
网友评论