开篇
-
这篇文章主要是分析Dubbo Consumer在处理Provider的响应的流程,整体思路会按照Dubbo Client的初始化流程和Dubbo Client的响应流程两部分进行分析。
-
Dubbo Client的初始化流程着重分析Client的连接过程以及处理Handler的封装关系。
-
Dubbo Client的响应流程着重分析响应过程的流程,整个处理流程建立在Dubbo Client的初始化流程基础上。
-
这篇文章顺便讲解了Dubbo 2.6.5的版本Client侧线程过多的问题的原因。
Consumer Client 初始化流程
DubboProtocol
public class DubboProtocol extends AbstractProtocol {
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
// 省略相关代码
};
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);
// 创建DubboInvoker对象过程中getClients初始化Client对象
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
private ExchangeClient[] getClients(URL url) {
// whether to share connection
boolean service_share_connect = false;
int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
// if not configured, connection is shared, otherwise, one connection for one service
if (connections == 0) {
service_share_connect = true;
connections = 1;
}
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
if (service_share_connect) {
clients[i] = getSharedClient(url);
} else {
clients[i] = initClient(url);
}
}
return clients;
}
private ExchangeClient initClient(URL url) {
String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
ExchangeClient client;
try {
if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
client = new LazyConnectExchangeClient(url, requestHandler);
} else {
// 由Exchange层负责进行连接操作
client = Exchangers.connect(url, requestHandler);
}
} catch (RemotingException e) {
}
return client;
}
}
- DubboProtocol在refer过程中创建DubboInvoker对象,在创建DubboInvoker对象过程中会初始化ExchangeClient对象。
- 初始化ExchangeClient对象是通过Exchangers层的connect()方法实现。
Exchangers
public class Exchangers {
public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
// getExchanger()返回HeaderExchanger
return getExchanger(url).connect(url, handler);
}
}
public class HeaderExchanger implements Exchanger {
public static final String NAME = "header";
@Override
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
// 封装关系 DecodeHandler => HeaderExchangeHandler => requestHandler
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
}
- HeaderExchanger内部会调用Transporters的connect()方法。
- Handler的封装关系 DecodeHandler => HeaderExchangeHandler => requestHandler。
Transporters
public class Transporters {
public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
ChannelHandler handler;
if (handlers == null || handlers.length == 0) {
handler = new ChannelHandlerAdapter();
} else if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
// 获取NettyTransporter对象执行connect()方法
return getTransporter().connect(url, handler);
}
public static Transporter getTransporter() {
return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
}
}
public class NettyTransporter implements Transporter {
public static final String NAME = "netty";
@Override
public Client connect(URL url, ChannelHandler listener) throws RemotingException {
return new NettyClient(url, listener);
}
}
- Transporters内部获取NettyTransporter对象,执行connect()方法。
- NettyTransporter的connect()方法内部构造NettyClient对象,参数listener为DecodeHandler对象。
NettyClient
public class NettyClient extends AbstractClient {
private static final Logger logger = LoggerFactory.getLogger(NettyClient.class);
private static final NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(Constants.DEFAULT_IO_THREADS, new DefaultThreadFactory("NettyClientWorker", true));
private Bootstrap bootstrap;
private volatile Channel channel; // volatile, please copy reference to use
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
super(url, wrapChannelHandler(url, handler));
}
@Override
protected void doOpen() throws Throwable {
final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
bootstrap = new Bootstrap();
bootstrap.group(nioEventLoopGroup)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
//.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
.channel(NioSocketChannel.class);
if (getConnectTimeout() < 3000) {
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
} else {
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getConnectTimeout());
}
bootstrap.handler(new ChannelInitializer() {
@Override
protected void initChannel(Channel ch) throws Exception {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("handler", nettyClientHandler);
}
});
}
@Override
protected void doConnect() throws Throwable {
long start = System.currentTimeMillis();
ChannelFuture future = bootstrap.connect(getConnectAddress());
try {
boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS);
// 省略其他代码
} finally {
if (!isConnected()) {
//future.cancel(true);
}
}
}
}
public abstract class AbstractClient extends AbstractEndpoint implements Client {
protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler) {
url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME);
url = url.addParameterIfAbsent(Constants.THREADPOOL_KEY, Constants.DEFAULT_CLIENT_THREADPOOL);
return ChannelHandlers.wrap(handler, url);
}
}
public class ChannelHandlers {
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
// ExtensionLoader.getExtensionLoader()返回AllChannelHandler对象
return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
.getAdaptiveExtension().dispatch(handler, url)));
}
}
- NettyClient的构造函数中通过wrapChannelHandler()方法再次封装handler。
- ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch()返回AllChannelHandler对象。
- handler的封装关系为MultiMessageHandler => HeartbeatHandler
=> AllChannelHandler => DecodeHandler => HeaderExchangeHandler => requestHandler。 - NettyClient的NettyClientHandler为NettyClient本身。
- nettyClientHandler会在NettyClient收到响应报文后开始执行。
AbstractPeer
public abstract class AbstractPeer implements Endpoint, ChannelHandler {
private final ChannelHandler handler;
private volatile URL url;
public AbstractPeer(URL url, ChannelHandler handler) {
this.url = url;
this.handler = handler;
}
@Override
public void received(Channel ch, Object msg) throws RemotingException {
if (closed) {
return;
}
handler.received(ch, msg);
}
}
AbstractPeer
- AbstractPeer是NettyClient的基类,在AbstractPeer的构造函数当中handler为MultiMessageHandler,由NettyClient的构造函数传入。
- AbstractPeer作为Client端响应入口,具体的received()方法等执行的入口,其他方法可以在实现类查看。
AllChannelHandler
public class AllChannelHandler extends WrappedChannelHandler {
public AllChannelHandler(ChannelHandler handler, URL url) {
super(handler, url);
}
}
public class WrappedChannelHandler implements ChannelHandlerDelegate {
protected static final ExecutorService SHARED_EXECUTOR = Executors.newCachedThreadPool(new NamedThreadFactory("DubboSharedHandler", true));
protected final ExecutorService executor;
protected final ChannelHandler handler;
protected final URL url;
public WrappedChannelHandler(ChannelHandler handler, URL url) {
this.handler = handler;
this.url = url;
// 构建executor对象
executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) {
componentKey = Constants.CONSUMER_SIDE;
}
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
}
}
- AllChannelHandler的父类WrappedChannelHandler的构造函数中会创建executor对象。
- 每个连接会有一个executor对象,consumer侧的executor是基于连接维度的,每个connection会有对应的executor对象。
Handler封装关系
Handler封装关系Consumer Client 响应流程
Consumer Client 响应阶段一
Consumer 响应阶段一- Consumer响应阶段一的调用栈如上图。
- 按照NettyClientHandler => NettyClient =>MultiMessageHandler => HeartbeatHandler => AllChannelHandler的顺序进行调用。
NettyClientHandler
public class NettyClientHandler extends ChannelDuplexHandler {
private final URL url;
private final ChannelHandler handler;
public NettyClientHandler(URL url, ChannelHandler handler) {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
this.url = url;
this.handler = handler;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelInactive();
}
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise future)
throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
try {
handler.disconnected(channel);
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
try {
handler.received(channel, msg);
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
}
}
- NettyClientHandler的各个方法负责处理各类连接读取事件。
AllChannelHandler
public class AllChannelHandler extends WrappedChannelHandler {
public void received(Channel channel, Object message) throws RemotingException {
// 获取对应的executor线程池对象
ExecutorService executor = getExecutorService();
try {
// 构造ChannelEventRunnable对象并进行投递
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
if(message instanceof Request && t instanceof RejectedExecutionException){
Request request = (Request)message;
if(request.isTwoWay()){
String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
Response response = new Response(request.getId(), request.getVersion());
response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
response.setErrorMessage(msg);
channel.send(response);
return;
}
}
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}
}
public class WrappedChannelHandler implements ChannelHandlerDelegate {
protected static final ExecutorService SHARED_EXECUTOR = Executors.newCachedThreadPool(new NamedThreadFactory("DubboSharedHandler", true));
protected final ExecutorService executor;
protected final ChannelHandler handler;
protected final URL url;
public ExecutorService getExecutorService() {
ExecutorService cexecutor = executor;
if (cexecutor == null || cexecutor.isShutdown()) {
cexecutor = SHARED_EXECUTOR;
}
return cexecutor;
}
}
- AllChannelHandler负责往消费端线程池投递ChannelEventRunnable对象。
- ExecutorService cexecutor = getExecutorService()获取线程池对象,每个连接一个ExecutorService对象。
Consumer Client 响应阶段二
Consumer 响应阶段二- Consumer 响应阶段二的调用栈如上图。
- 调用栈按照ChannelEventRunnable => DecodeHandler => HeaderExchangeHandler => DefaultFuture的顺序调用。
ChannelEventRunnable
public class ChannelEventRunnable implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(ChannelEventRunnable.class);
private final ChannelHandler handler;
private final Channel channel;
private final ChannelState state;
private final Throwable exception;
private final Object message;
public ChannelEventRunnable(Channel channel, ChannelHandler handler, ChannelState state, Object message, Throwable exception) {
this.channel = channel;
this.handler = handler;
this.state = state;
this.message = message;
this.exception = exception;
}
@Override
public void run() {
if (state == ChannelState.RECEIVED) {
try {
handler.received(channel, message);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
+ ", message is " + message, e);
}
} else {
switch (state) {
case CONNECTED:
try {
handler.connected(channel);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
}
break;
case DISCONNECTED:
try {
handler.disconnected(channel);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
}
break;
case SENT:
try {
handler.sent(channel, message);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
+ ", message is " + message, e);
}
case CAUGHT:
try {
handler.caught(channel, exception);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
+ ", message is: " + message + ", exception is " + exception, e);
}
break;
default:
logger.warn("unknown state: " + state + ", message is " + message);
}
}
}
- ChannelEventRunnable的线程内部执行run()方法进行执行流程。
- ChannelEventRunnable的内部的handler对象为DecodeHandler对象。
- 执行DecodeHandler的内部会接着调用HeaderExchangeHandler对象方法。
HeaderExchangeHandler
public class HeaderExchangeHandler implements ChannelHandlerDelegate {
public void received(Channel channel, Object message) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
if (message instanceof Request) {
// 处理请求
Request request = (Request) message;
if (request.isEvent()) {
handlerEvent(channel, request);
} else {
if (request.isTwoWay()) {
Response response = handleRequest(exchangeChannel, request);
channel.send(response);
} else {
handler.received(exchangeChannel, request.getData());
}
}
} else if (message instanceof Response) {
// 处理响应
handleResponse(channel, (Response) message);
} else if (message instanceof String) {
// 处理telnet等请求
if (isClientSide(channel)) {
Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
logger.error(e.getMessage(), e);
} else {
String echo = handler.telnet(channel, (String) message);
if (echo != null && echo.length() > 0) {
channel.send(echo);
}
}
} else {
handler.received(exchangeChannel, message);
}
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
static void handleResponse(Channel channel, Response response) throws RemotingException {
if (response != null && !response.isHeartbeat()) {
DefaultFuture.received(channel, response);
}
}
}
- HeaderExchangeHandler的received()内部区别请求/响应/字符串进行不同的处理。
- Consumer处理响应的逻辑在handleResponse()方法内部。
- handleResponse()方法最终执行的是DefaultFuture的方法。
DefaultFuture
public class DefaultFuture implements ResponseFuture {
public static void received(Channel channel, Response response) {
try {
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
future.doReceived(response);
} else {
logger.warn("The timeout response finally returned at "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
+ ", response " + response
+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()
+ " -> " + channel.getRemoteAddress()));
}
} finally {
CHANNELS.remove(response.getId());
}
}
private void doReceived(Response res) {
lock.lock();
try {
response = res;
if (done != null) {
done.signal();
}
} finally {
lock.unlock();
}
if (callback != null) {
invokeCallback(callback);
}
}
}
- DefaultFuture负责保存响应对象并通过信号量唤醒消费线程。
网友评论