源码分析基于dubbo 2.7.1
前面讲到,RegistryProtocol.doLocalExport会暴露服务
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
String key = getCacheKey(originInvoker);
return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
Invoker<?> invokerDelegete = new InvokerDelegate<>(originInvoker, providerUrl);
return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
});
}
这里providerUrl格式为dubbo://(server ip):(server port)/com.dubbo.start.service.HelloService?...
(它是由registryUrl registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?&export=dubbo...
转化以来),
所以会调用DubboProtocol,但会先经过装饰类ProtocolListenerWrapper,ProtocolFilterWrapper。
先看看DubboProtocol.export
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
String key = serviceKey(url);
// 创建DubboExporter
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
...
openServer(url);
optimizeSerialization(url);
return exporter;
}
注意一下,这里会构建DubboExporter,并缓存到exporterMap。(后面会用到)
关键就是openServer
private void openServer(URL url) {
String key = url.getAddress();
// 客户端可以暴露仅供服务器调用的服务
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
if (isServer) {
ExchangeServer server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url));
} else {
server.reset(url);
}
}
}
如果ExchangeServer不存在,就createServer
private ExchangeServer createServer(URL url) {
// url 参数处理
...
ExchangeServer server;
try {
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
...
return server;
}
Exchange
Exchangers.bind(url, requestHandler)
这里第二个参数requestHandler是DubboProtocol中的内部类,它是一个关键的类,后面会说到。这里会调用HeaderExchanger.bind
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
Transporter
Transporter接口同样提供方法bind/connect
Transporters根据server/transporter参数,通过ExtensionLoader
找到一个Transporter实现类,默认使用netty3。
我配置了netty4,所以会使用com.alibaba.dubbo.remoting.transport.netty4包下的netty代码。
NettyTransporter
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
return new NettyServer(url, listener);
}
NettyServer实现了Server接口,该接口提供isBound/getChannels/reset等方法。
NettyServer构造方法
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
先看看ChannelHandlers.wrap
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
.getAdaptiveExtension().dispatch(handler, url)));
}
Dispatcher
这里出现一个新的概念Dispatcher,它是dubbo中的调度器,默认是AllDispatcher
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
return new AllChannelHandler(handler, url);
}
NettyServer继承了AbstractServer,AbstractServer的构造方法会调用NettyServer.doOpen
protected void doOpen() throws Throwable {
bootstrap = new ServerBootstrap();
// 构建bossGroup, 线程为1
bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
new DefaultThreadFactory("NettyServerWorker", true));
// 构建NettyServerHandler
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) throws Exception {
int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
.addLast("handler", nettyServerHandler);
}
});
// bind
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
.addLast("decoder", adapter.getDecoder())
和.addLast("encoder", adapter.getEncoder())
添加编码器和解码器,将字节码转化为dubbo对象。dubbo协议使用的是DubboCountCodec。
.addLast("handler", nettyServerHandler)
添加了nettyServerHandler处理请求。
new NettyServerHandler(getUrl(), this)
构建NettyServerHandler时,NettyServe将自身this作为参数,它也实现了ChannelHandler,同时构造NettyServe时也需要传入一个ChannelHandler参数。
ChannelHandler是处理请求的重要接口,提供received/sent等方法。
dubbo使用装饰模式,为ChannelHandler添加了不同的功能:
NettyServe ---> MultiMessageHandler(NettyServer) ---> HeartbeatHandler(NettyServer) ---> AllChannelHandler(AllDispatcher) ---> DecodeHandler(HeaderExchanger) ---> HeaderExchangeHandler(HeaderExchanger) ---> DubboProtocol.requestHandler
网友评论