美文网首页dubbo
Dubbo Consumer 响应过程

Dubbo Consumer 响应过程

作者: 晴天哥_王志 | 来源:发表于2020-02-16 12:40 被阅读0次

    开篇

    • 这篇文章主要是分析Dubbo Consumer在处理Provider的响应的流程,整体思路会按照Dubbo Client的初始化流程和Dubbo Client的响应流程两部分进行分析。

    • Dubbo Client的初始化流程着重分析Client的连接过程以及处理Handler的封装关系。

    • Dubbo Client的响应流程着重分析响应过程的流程,整个处理流程建立在Dubbo Client的初始化流程基础上。

    • 这篇文章顺便讲解了Dubbo 2.6.5的版本Client侧线程过多的问题的原因。

    Consumer Client 初始化流程

    DubboProtocol

    public class DubboProtocol extends AbstractProtocol {
    
        private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
                // 省略相关代码
        };
    
        public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
            optimizeSerialization(url);
            // 创建DubboInvoker对象过程中getClients初始化Client对象
            DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
            invokers.add(invoker);
            return invoker;
        }
    
        private ExchangeClient[] getClients(URL url) {
            // whether to share connection
            boolean service_share_connect = false;
            int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
            // if not configured, connection is shared, otherwise, one connection for one service
            if (connections == 0) {
                service_share_connect = true;
                connections = 1;
            }
    
            ExchangeClient[] clients = new ExchangeClient[connections];
            for (int i = 0; i < clients.length; i++) {
                if (service_share_connect) {
                    clients[i] = getSharedClient(url);
                } else {
                    clients[i] = initClient(url);
                }
            }
            return clients;
        }
    
        private ExchangeClient initClient(URL url) {
    
            String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
            url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
            url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
    
            ExchangeClient client;
            try {
                if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
                    client = new LazyConnectExchangeClient(url, requestHandler);
                } else {
                    // 由Exchange层负责进行连接操作
                    client = Exchangers.connect(url, requestHandler);
                }
            } catch (RemotingException e) {
            }
            return client;
        }
    }
    
    • DubboProtocol在refer过程中创建DubboInvoker对象,在创建DubboInvoker对象过程中会初始化ExchangeClient对象。
    • 初始化ExchangeClient对象是通过Exchangers层的connect()方法实现。

    Exchangers

    public class Exchangers {
    
        public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
            url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
            // getExchanger()返回HeaderExchanger
            return getExchanger(url).connect(url, handler);
        }
    }
    
    
    
    public class HeaderExchanger implements Exchanger {
    
        public static final String NAME = "header";
    
        @Override
        public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
            // 封装关系 DecodeHandler => HeaderExchangeHandler => requestHandler
            return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
        }
    }
    
    • HeaderExchanger内部会调用Transporters的connect()方法。
    • Handler的封装关系 DecodeHandler => HeaderExchangeHandler => requestHandler。

    Transporters

    public class Transporters {
    
        public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
            if (url == null) {
                throw new IllegalArgumentException("url == null");
            }
            ChannelHandler handler;
            if (handlers == null || handlers.length == 0) {
                handler = new ChannelHandlerAdapter();
            } else if (handlers.length == 1) {
                handler = handlers[0];
            } else {
                handler = new ChannelHandlerDispatcher(handlers);
            }
            // 获取NettyTransporter对象执行connect()方法
            return getTransporter().connect(url, handler);
        }
    
        public static Transporter getTransporter() {
            return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
        }
    
    }
    
    public class NettyTransporter implements Transporter {
    
        public static final String NAME = "netty";
    
        @Override
        public Client connect(URL url, ChannelHandler listener) throws RemotingException {
            return new NettyClient(url, listener);
        }
    
    }
    
    • Transporters内部获取NettyTransporter对象,执行connect()方法。
    • NettyTransporter的connect()方法内部构造NettyClient对象,参数listener为DecodeHandler对象。

    NettyClient

    public class NettyClient extends AbstractClient {
    
        private static final Logger logger = LoggerFactory.getLogger(NettyClient.class);
    
        private static final NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(Constants.DEFAULT_IO_THREADS, new DefaultThreadFactory("NettyClientWorker", true));
    
        private Bootstrap bootstrap;
    
        private volatile Channel channel; // volatile, please copy reference to use
    
        public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
            super(url, wrapChannelHandler(url, handler));
        }
    
        @Override
        protected void doOpen() throws Throwable {
            final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
            bootstrap = new Bootstrap();
            bootstrap.group(nioEventLoopGroup)
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                    //.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
                    .channel(NioSocketChannel.class);
    
            if (getConnectTimeout() < 3000) {
                bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
            } else {
                bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getConnectTimeout());
            }
    
            bootstrap.handler(new ChannelInitializer() {
    
                @Override
                protected void initChannel(Channel ch) throws Exception {
                    NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
                    ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                            .addLast("decoder", adapter.getDecoder())
                            .addLast("encoder", adapter.getEncoder())
                            .addLast("handler", nettyClientHandler);
                }
            });
        }
    
        @Override
        protected void doConnect() throws Throwable {
            long start = System.currentTimeMillis();
            ChannelFuture future = bootstrap.connect(getConnectAddress());
            try {
                boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS);
                // 省略其他代码
            } finally {
                if (!isConnected()) {
                    //future.cancel(true);
                }
            }
        }
    
    }
    
    
    public abstract class AbstractClient extends AbstractEndpoint implements Client {
    
        protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler) {
            url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME);
            url = url.addParameterIfAbsent(Constants.THREADPOOL_KEY, Constants.DEFAULT_CLIENT_THREADPOOL);
            return ChannelHandlers.wrap(handler, url);
        }
    }
    
    
    public class ChannelHandlers {
    
        protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
            // ExtensionLoader.getExtensionLoader()返回AllChannelHandler对象
            return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                    .getAdaptiveExtension().dispatch(handler, url)));
        }
    }
    
    • NettyClient的构造函数中通过wrapChannelHandler()方法再次封装handler。
    • ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch()返回AllChannelHandler对象。
    • handler的封装关系为MultiMessageHandler => HeartbeatHandler
      => AllChannelHandler => DecodeHandler => HeaderExchangeHandler => requestHandler。
    • NettyClient的NettyClientHandler为NettyClient本身。
    • nettyClientHandler会在NettyClient收到响应报文后开始执行。

    AbstractPeer

    public abstract class AbstractPeer implements Endpoint, ChannelHandler {
    
        private final ChannelHandler handler;
        private volatile URL url;
    
        public AbstractPeer(URL url, ChannelHandler handler) {
            this.url = url;
            this.handler = handler;
        }
    
        @Override
        public void received(Channel ch, Object msg) throws RemotingException {
            if (closed) {
                return;
            }
            handler.received(ch, msg);
        }
    }
    
    AbstractPeer
    • AbstractPeer是NettyClient的基类,在AbstractPeer的构造函数当中handler为MultiMessageHandler,由NettyClient的构造函数传入。
    • AbstractPeer作为Client端响应入口,具体的received()方法等执行的入口,其他方法可以在实现类查看。

    AllChannelHandler

    public class AllChannelHandler extends WrappedChannelHandler {
    
        public AllChannelHandler(ChannelHandler handler, URL url) {
            super(handler, url);
        }
    }
    
    public class WrappedChannelHandler implements ChannelHandlerDelegate {
    
        protected static final ExecutorService SHARED_EXECUTOR = Executors.newCachedThreadPool(new NamedThreadFactory("DubboSharedHandler", true));
        protected final ExecutorService executor;
        protected final ChannelHandler handler;
        protected final URL url;
    
        public WrappedChannelHandler(ChannelHandler handler, URL url) {
            this.handler = handler;
            this.url = url;
            // 构建executor对象
            executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
    
            String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
            if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) {
                componentKey = Constants.CONSUMER_SIDE;
            }
            DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
            dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
        }
    }
    
    • AllChannelHandler的父类WrappedChannelHandler的构造函数中会创建executor对象。
    • 每个连接会有一个executor对象,consumer侧的executor是基于连接维度的,每个connection会有对应的executor对象。

    Handler封装关系

    Handler封装关系

    Consumer Client 响应流程

    Consumer Client 响应阶段一
    Consumer 响应阶段一
    • Consumer响应阶段一的调用栈如上图。
    • 按照NettyClientHandler => NettyClient =>MultiMessageHandler => HeartbeatHandler => AllChannelHandler的顺序进行调用。
    NettyClientHandler
    public class NettyClientHandler extends ChannelDuplexHandler {
    
        private final URL url;
    
        private final ChannelHandler handler;
    
        public NettyClientHandler(URL url, ChannelHandler handler) {
            if (url == null) {
                throw new IllegalArgumentException("url == null");
            }
            if (handler == null) {
                throw new IllegalArgumentException("handler == null");
            }
            this.url = url;
            this.handler = handler;
        }
    
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            ctx.fireChannelActive();
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            ctx.fireChannelInactive();
        }
    
        @Override
        public void disconnect(ChannelHandlerContext ctx, ChannelPromise future)
                throws Exception {
            NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
            try {
                handler.disconnected(channel);
            } finally {
                NettyChannel.removeChannelIfDisconnected(ctx.channel());
            }
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
            try {
                handler.received(channel, msg);
            } finally {
                NettyChannel.removeChannelIfDisconnected(ctx.channel());
            }
        }
    }
    
    • NettyClientHandler的各个方法负责处理各类连接读取事件。
    AllChannelHandler
    public class AllChannelHandler extends WrappedChannelHandler {
    
        public void received(Channel channel, Object message) throws RemotingException {
            // 获取对应的executor线程池对象
            ExecutorService executor = getExecutorService();
            try {
                // 构造ChannelEventRunnable对象并进行投递
                cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
            } catch (Throwable t) {
                if(message instanceof Request && t instanceof RejectedExecutionException){
                    Request request = (Request)message;
                    if(request.isTwoWay()){
                        String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
                        Response response = new Response(request.getId(), request.getVersion());
                        response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
                        response.setErrorMessage(msg);
                        channel.send(response);
                        return;
                    }
                }
                throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
            }
        }
    }
    
    
    public class WrappedChannelHandler implements ChannelHandlerDelegate {
    
        protected static final ExecutorService SHARED_EXECUTOR = Executors.newCachedThreadPool(new NamedThreadFactory("DubboSharedHandler", true));
        protected final ExecutorService executor;
        protected final ChannelHandler handler;
        protected final URL url;
    
        public ExecutorService getExecutorService() {
            ExecutorService cexecutor = executor;
            if (cexecutor == null || cexecutor.isShutdown()) {
                cexecutor = SHARED_EXECUTOR;
            }
            return cexecutor;
        }
    }
    
    • AllChannelHandler负责往消费端线程池投递ChannelEventRunnable对象。
    • ExecutorService cexecutor = getExecutorService()获取线程池对象,每个连接一个ExecutorService对象。
    Consumer Client 响应阶段二
    Consumer 响应阶段二
    • Consumer 响应阶段二的调用栈如上图。
    • 调用栈按照ChannelEventRunnable => DecodeHandler => HeaderExchangeHandler => DefaultFuture的顺序调用。

    ChannelEventRunnable

    public class ChannelEventRunnable implements Runnable {
        private static final Logger logger = LoggerFactory.getLogger(ChannelEventRunnable.class);
    
        private final ChannelHandler handler;
        private final Channel channel;
        private final ChannelState state;
        private final Throwable exception;
        private final Object message;
    
    
    
        public ChannelEventRunnable(Channel channel, ChannelHandler handler, ChannelState state, Object message, Throwable exception) {
            this.channel = channel;
            this.handler = handler;
            this.state = state;
            this.message = message;
            this.exception = exception;
        }
    
        @Override
        public void run() {
            if (state == ChannelState.RECEIVED) {
                try {
                    handler.received(channel, message);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                            + ", message is " + message, e);
                }
            } else {
                switch (state) {
                case CONNECTED:
                    try {
                        handler.connected(channel);
                    } catch (Exception e) {
                        logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
                    }
                    break;
                case DISCONNECTED:
                    try {
                        handler.disconnected(channel);
                    } catch (Exception e) {
                        logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
                    }
                    break;
                case SENT:
                    try {
                        handler.sent(channel, message);
                    } catch (Exception e) {
                        logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                                + ", message is " + message, e);
                    }
                case CAUGHT:
                    try {
                        handler.caught(channel, exception);
                    } catch (Exception e) {
                        logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                                + ", message is: " + message + ", exception is " + exception, e);
                    }
                    break;
                default:
                    logger.warn("unknown state: " + state + ", message is " + message);
                }
            }
    
        }
    
    • ChannelEventRunnable的线程内部执行run()方法进行执行流程。
    • ChannelEventRunnable的内部的handler对象为DecodeHandler对象。
    • 执行DecodeHandler的内部会接着调用HeaderExchangeHandler对象方法。

    HeaderExchangeHandler

    public class HeaderExchangeHandler implements ChannelHandlerDelegate {
    
        public void received(Channel channel, Object message) throws RemotingException {
            channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
            ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
            try {
                if (message instanceof Request) {
                    // 处理请求
                    Request request = (Request) message;
                    if (request.isEvent()) {
                        handlerEvent(channel, request);
                    } else {
                        if (request.isTwoWay()) {
                            Response response = handleRequest(exchangeChannel, request);
                            channel.send(response);
                        } else {
                            handler.received(exchangeChannel, request.getData());
                        }
                    }
                } else if (message instanceof Response) {
                    // 处理响应
                    handleResponse(channel, (Response) message);
                } else if (message instanceof String) {
                    // 处理telnet等请求
                    if (isClientSide(channel)) {
                        Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                        logger.error(e.getMessage(), e);
                    } else {
                        String echo = handler.telnet(channel, (String) message);
                        if (echo != null && echo.length() > 0) {
                            channel.send(echo);
                        }
                    }
                } else {
                    handler.received(exchangeChannel, message);
                }
            } finally {
                HeaderExchangeChannel.removeChannelIfDisconnected(channel);
            }
        }
    
    
        static void handleResponse(Channel channel, Response response) throws RemotingException {
            if (response != null && !response.isHeartbeat()) {
                DefaultFuture.received(channel, response);
            }
        }
    }
    
    • HeaderExchangeHandler的received()内部区别请求/响应/字符串进行不同的处理。
    • Consumer处理响应的逻辑在handleResponse()方法内部。
    • handleResponse()方法最终执行的是DefaultFuture的方法。

    DefaultFuture

    public class DefaultFuture implements ResponseFuture {
    
        public static void received(Channel channel, Response response) {
            try {
                DefaultFuture future = FUTURES.remove(response.getId());
                if (future != null) {
                    future.doReceived(response);
                } else {
                    logger.warn("The timeout response finally returned at "
                            + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
                            + ", response " + response
                            + (channel == null ? "" : ", channel: " + channel.getLocalAddress()
                            + " -> " + channel.getRemoteAddress()));
                }
            } finally {
                CHANNELS.remove(response.getId());
            }
        }
    
    
        private void doReceived(Response res) {
            lock.lock();
            try {
                response = res;
                if (done != null) {
                    done.signal();
                }
            } finally {
                lock.unlock();
            }
            if (callback != null) {
                invokeCallback(callback);
            }
        }
    }
    
    • DefaultFuture负责保存响应对象并通过信号量唤醒消费线程。

    相关文章

      网友评论

        本文标题:Dubbo Consumer 响应过程

        本文链接:https://www.haomeiwen.com/subject/imrefhtx.html