源码分析基于dubbo 2.6.0
前面讲到,RegistryProtocol.doLocalExport负责启动server端网络通讯服务
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
String key = getCacheKey(originInvoker);
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
// 对Invoker进行了代理
final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
bounds.put(key, exporter);
}
}
}
return exporter;
}
这里invokerDelegete中url的格式为dubbo://(server ip):(server port)/com.dubbo.start.service.HelloService?class=com.dubbo.start.service.HelloServiceImpl&dubbo=2.6.0&interface=com.dubbo.start.service.HelloService&methods=hello2,hello&...
, 所以会调用DubboProtocol,但会先经过两个装饰类ProtocolListenerWrapper,ProtocolFilterWrapper。
先看看DubboProtocol.export
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// 创建DubboExporter
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
...
openServer(url);
optimizeSerialization(url);
return exporter;
}
注意一下,这里会构建DubboExporter,并缓存到exporterMap。(后面会用到)
关键就是openServer
private void openServer(URL url) {
// find server.
String key = url.getAddress();
//client can export a service which's only for server to invoke
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
if (isServer) {
ExchangeServer server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url));
} else {
// server supports reset, use together with override
server.reset(url);
}
}
}
如果ExchangeServer不存在,就createServer
private ExchangeServer createServer(URL url) {
// url 参数处理
...
ExchangeServer server;
try {
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
...
return server;
}
Exchangers.bind会调用HeaderExchanger.bind(注意第二个参数DubboProtocol.requestHandler)
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
出现了一个新的概念Transporter(运输层)。
Transporters同样通过ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension()
找到一个Transporter实现类,默认使用netty3, 会根据url中server/transporter参数转换。
我配置了netty4,所以会使用com.alibaba.dubbo.remoting.transport.netty4包下的netty代码。
NettyTransporter
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
return new NettyServer(url, listener);
}
NettyServer构造方法
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
先看看ChannelHandlers.wrap
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
.getAdaptiveExtension().dispatch(handler, url)));
}
这里出现一个新的概念Dispatcher,它是dubbo中的调度器,默认是AllDispatcher
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
return new AllChannelHandler(handler, url);
}
再回调一下HeaderExchanger.bind方法, Transporters.bind(URL url, ChannelHandler... handlers)
第二个参数ChannelHandler为new DecodeHandler(new HeaderExchangeHandler(handler))
(注意new HeaderExchangeHandler(handler)中的handler为DubboProtocol.requestHandler)
ChannelHandler是一条责任链,到这里,责任链国家完成,节点依次为 MultiMessageHandler > HeartbeatHandler > AllChannelHandler > DecodeHandler > HeaderExchangeHandler > DubboProtocol.requestHandler
NettyServer继承了AbstractServer,AbstractServer的构造方法会调用NettyServer.doOpen
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
bootstrap = new ServerBootstrap();
// bossGroup中线程数为1
bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
new DefaultThreadFactory("NettyServerWorker", true));
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("handler", nettyServerHandler);
}
});
// bind
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
关键是netty handler的实现类NettyServerHandler。注意它的构造方法第二个参数为this,它接受的是ChannelHandler类型,就是说NettyServer也继承了ChannelHandler。
NettyServerHandler继承了netty的ChannelDuplexHandler,这里只关注channelRead方法
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
try {
handler.received(channel, msg);
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
}
handler就是NettyServer, 他会调用上面说的ChannelHandler责任链。会调用到AllChannelHandler.received
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
...
}
}
就是给ExecutorService提交一个任务,异步处理请求。
看看ChannelEventRunnable.run,会根据state进行不同的处理。但实际请求都转发到handler处理(就是继续调用责任链)。
再看看DecodeHandler.received
public void received(Channel channel, Object message) throws RemotingException {
// 解码
if (message instanceof Decodeable) {
decode(message);
}
if (message instanceof Request) {
decode(((Request) message).getData());
}
if (message instanceof Response) {
decode(((Response) message).getResult());
}
// 调用下一个节点
handler.received(channel, message);
}
HeaderExchangeHandler
public void received(Channel channel, Object message) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
if (message instanceof Request) {
// 处理请求
Request request = (Request) message;
if (request.isEvent()) {
handlerEvent(channel, request);
} else {
if (request.isTwoWay()) {
Response response = handleRequest(exchangeChannel, request);
channel.send(response);
} else {
handler.received(exchangeChannel, request.getData());
}
}
} else if (message instanceof Response) {
// 处理响应
handleResponse(channel, (Response) message);
} else if (message instanceof String) {
...
} else {
handler.received(exchangeChannel, message);
}
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
twoWay表示需要响应的请求。这里调用handleRequest方法,将返回一个Response,最后通过channel将它send到client。
Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
Response res = new Response(req.getId(), req.getVersion());
// find handler by message class.
Object msg = req.getData();
try {
// handle data.
Object result = handler.reply(channel, msg);
res.setStatus(Response.OK);
res.setResult(result);
} catch (Throwable e) {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(e));
}
return res;
}
终于调用到DubboProtocol.requestHandler了
public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
Invocation inv = (Invocation) message;
Invoker<?> invoker = getInvoker(channel, inv);
...
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
return invoker.invoke(inv);
}
throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}
这里也是通过invoker调用,逐渐调用到我们的业务方法。
先看看getInvoker方法
Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
...
DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
return exporter.getInvoker();
}
很简单,从exporterMap中获取DubboExporter,再获取对应的Invoker。
这时回顾一下上文说到的DubboProtocol.export方法,它用invoker构建了DubboExporter,缓存到exporterMap中。
invoker也是责任链。那么invoker在哪里创建的呢?
这时要回到dubbo server启动,那里说到ServiceConfig.doExportUrlsFor1Protocol
if (registryURLs != null && registryURLs.size() > 0) {
for (URL registryURL : registryURLs) {
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
}
}
是的,invoker就是在这里创建的。(注意,参数ref就是方法的实现类,栗子中就是HelloServiceImpl)
proxyFactory默认使用的是JavassistProxyFactory。
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
前面也说过了,DubboProtocol处理前,会经过两个包装类ProtocolListenerWrapper,ProtocolFilterWrapper。
ProtocolFilterWrapper.export 会调用buildInvokerChain,为每一个filter创建一个invoker,
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (filters.size() > 0) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {
...
public Result invoke(Invocation invocation) throws RpcException {
return filter.invoke(next, invocation);
}
};
}
}
return last;
}
dubbo默认有如下
- EchoFilter
- ClassLoaderFilter
- GenericFilter
- ContextFilter
- TraceFilter
- TimeoutFilter
- MonitorFilter
- ExceptionFilter
- ValidationFilter
再回顾一下上文的RegistryProtocol.doLocalExport方法,对Invoker进行代理包装,生成了InvokerDelegete。所以invoker责任链节点为ProtocolFilterWrapper&Invoker > RegistryProtocol&InvokerDelegete > DelegateProviderMetaDataInvoker > JavassistProxyFactory&AbstractProxyInvoker。
JavassistProxyFactory中创建的invoke,会通过Wrapper调用到实际的业务方法。
com.alibaba.dubbo.common.bytecode.Wrapper
也是动态生成代码,JavassistProxyFactory正是通过它调用业务方法。
看看getWrapper方法
public static Wrapper getWrapper(Class<?> c) {
while (ClassGenerator.isDynamicClass(c)) // can not wrapper on dynamic class.
c = c.getSuperclass();
if (c == Object.class)
return OBJECT_WRAPPER;
Wrapper ret = WRAPPER_MAP.get(c);
if (ret == null) {
ret = makeWrapper(c);
WRAPPER_MAP.put(c, ret);
}
return ret;
}
关键在makeWrapper,makeWrapper会拼凑代码字符串,再通过javassist生成代理类。
过程比较繁琐,直接看生成的代理类吧。
原接口
public interface HelloService {
String hello(String user) ;
String hello2(String user) ;
}
生成的Wrapper,关键的方法在invokeMethod
public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException{
com.dubbo.start.service.HelloService w;
try{
w = ((com.dubbo.start.service.HelloService)$1);
} catch(Throwable e){
throw new IllegalArgumentException(e);
}
try{
if( "hello".equals( $2 ) && $3.length == 1 ) {
return ($w)w.hello((java.lang.String)$4[0]);
}
if( "hello2".equals( $2 ) && $3.length == 1 ) {
return ($w)w.hello2((java.lang.String)$4[0]);
}
} catch(Throwable e) {
throw new java.lang.reflect.InvocationTargetException(e);
}
}
$1, $2, $3在javassist中表示方法参数。
可以看到,通过方法名和参数数调用对应的逻辑方法。
所以dubbo暴露的接口就尽量不要方法重构了
网友评论