美文网首页dubbo
Dubbo Consumer 响应过程

Dubbo Consumer 响应过程

作者: 晴天哥_王志 | 来源:发表于2020-02-16 12:40 被阅读0次

开篇

  • 这篇文章主要是分析Dubbo Consumer在处理Provider的响应的流程,整体思路会按照Dubbo Client的初始化流程和Dubbo Client的响应流程两部分进行分析。

  • Dubbo Client的初始化流程着重分析Client的连接过程以及处理Handler的封装关系。

  • Dubbo Client的响应流程着重分析响应过程的流程,整个处理流程建立在Dubbo Client的初始化流程基础上。

  • 这篇文章顺便讲解了Dubbo 2.6.5的版本Client侧线程过多的问题的原因。

Consumer Client 初始化流程

DubboProtocol

public class DubboProtocol extends AbstractProtocol {

    private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
            // 省略相关代码
    };

    public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
        optimizeSerialization(url);
        // 创建DubboInvoker对象过程中getClients初始化Client对象
        DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
        invokers.add(invoker);
        return invoker;
    }

    private ExchangeClient[] getClients(URL url) {
        // whether to share connection
        boolean service_share_connect = false;
        int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
        // if not configured, connection is shared, otherwise, one connection for one service
        if (connections == 0) {
            service_share_connect = true;
            connections = 1;
        }

        ExchangeClient[] clients = new ExchangeClient[connections];
        for (int i = 0; i < clients.length; i++) {
            if (service_share_connect) {
                clients[i] = getSharedClient(url);
            } else {
                clients[i] = initClient(url);
            }
        }
        return clients;
    }

    private ExchangeClient initClient(URL url) {

        String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
        url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));

        ExchangeClient client;
        try {
            if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
                client = new LazyConnectExchangeClient(url, requestHandler);
            } else {
                // 由Exchange层负责进行连接操作
                client = Exchangers.connect(url, requestHandler);
            }
        } catch (RemotingException e) {
        }
        return client;
    }
}
  • DubboProtocol在refer过程中创建DubboInvoker对象,在创建DubboInvoker对象过程中会初始化ExchangeClient对象。
  • 初始化ExchangeClient对象是通过Exchangers层的connect()方法实现。

Exchangers

public class Exchangers {

    public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
        // getExchanger()返回HeaderExchanger
        return getExchanger(url).connect(url, handler);
    }
}



public class HeaderExchanger implements Exchanger {

    public static final String NAME = "header";

    @Override
    public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        // 封装关系 DecodeHandler => HeaderExchangeHandler => requestHandler
        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
    }
}
  • HeaderExchanger内部会调用Transporters的connect()方法。
  • Handler的封装关系 DecodeHandler => HeaderExchangeHandler => requestHandler。

Transporters

public class Transporters {

    public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        ChannelHandler handler;
        if (handlers == null || handlers.length == 0) {
            handler = new ChannelHandlerAdapter();
        } else if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            handler = new ChannelHandlerDispatcher(handlers);
        }
        // 获取NettyTransporter对象执行connect()方法
        return getTransporter().connect(url, handler);
    }

    public static Transporter getTransporter() {
        return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
    }

}

public class NettyTransporter implements Transporter {

    public static final String NAME = "netty";

    @Override
    public Client connect(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyClient(url, listener);
    }

}
  • Transporters内部获取NettyTransporter对象,执行connect()方法。
  • NettyTransporter的connect()方法内部构造NettyClient对象,参数listener为DecodeHandler对象。

NettyClient

public class NettyClient extends AbstractClient {

    private static final Logger logger = LoggerFactory.getLogger(NettyClient.class);

    private static final NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(Constants.DEFAULT_IO_THREADS, new DefaultThreadFactory("NettyClientWorker", true));

    private Bootstrap bootstrap;

    private volatile Channel channel; // volatile, please copy reference to use

    public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
        super(url, wrapChannelHandler(url, handler));
    }

    @Override
    protected void doOpen() throws Throwable {
        final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
        bootstrap = new Bootstrap();
        bootstrap.group(nioEventLoopGroup)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                //.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
                .channel(NioSocketChannel.class);

        if (getConnectTimeout() < 3000) {
            bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
        } else {
            bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getConnectTimeout());
        }

        bootstrap.handler(new ChannelInitializer() {

            @Override
            protected void initChannel(Channel ch) throws Exception {
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
                ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                        .addLast("decoder", adapter.getDecoder())
                        .addLast("encoder", adapter.getEncoder())
                        .addLast("handler", nettyClientHandler);
            }
        });
    }

    @Override
    protected void doConnect() throws Throwable {
        long start = System.currentTimeMillis();
        ChannelFuture future = bootstrap.connect(getConnectAddress());
        try {
            boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS);
            // 省略其他代码
        } finally {
            if (!isConnected()) {
                //future.cancel(true);
            }
        }
    }

}


public abstract class AbstractClient extends AbstractEndpoint implements Client {

    protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler) {
        url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME);
        url = url.addParameterIfAbsent(Constants.THREADPOOL_KEY, Constants.DEFAULT_CLIENT_THREADPOOL);
        return ChannelHandlers.wrap(handler, url);
    }
}


public class ChannelHandlers {

    protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
        // ExtensionLoader.getExtensionLoader()返回AllChannelHandler对象
        return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                .getAdaptiveExtension().dispatch(handler, url)));
    }
}
  • NettyClient的构造函数中通过wrapChannelHandler()方法再次封装handler。
  • ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch()返回AllChannelHandler对象。
  • handler的封装关系为MultiMessageHandler => HeartbeatHandler
    => AllChannelHandler => DecodeHandler => HeaderExchangeHandler => requestHandler。
  • NettyClient的NettyClientHandler为NettyClient本身。
  • nettyClientHandler会在NettyClient收到响应报文后开始执行。

AbstractPeer

public abstract class AbstractPeer implements Endpoint, ChannelHandler {

    private final ChannelHandler handler;
    private volatile URL url;

    public AbstractPeer(URL url, ChannelHandler handler) {
        this.url = url;
        this.handler = handler;
    }

    @Override
    public void received(Channel ch, Object msg) throws RemotingException {
        if (closed) {
            return;
        }
        handler.received(ch, msg);
    }
}
AbstractPeer
  • AbstractPeer是NettyClient的基类,在AbstractPeer的构造函数当中handler为MultiMessageHandler,由NettyClient的构造函数传入。
  • AbstractPeer作为Client端响应入口,具体的received()方法等执行的入口,其他方法可以在实现类查看。

AllChannelHandler

public class AllChannelHandler extends WrappedChannelHandler {

    public AllChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }
}

public class WrappedChannelHandler implements ChannelHandlerDelegate {

    protected static final ExecutorService SHARED_EXECUTOR = Executors.newCachedThreadPool(new NamedThreadFactory("DubboSharedHandler", true));
    protected final ExecutorService executor;
    protected final ChannelHandler handler;
    protected final URL url;

    public WrappedChannelHandler(ChannelHandler handler, URL url) {
        this.handler = handler;
        this.url = url;
        // 构建executor对象
        executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);

        String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
        if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) {
            componentKey = Constants.CONSUMER_SIDE;
        }
        DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
        dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
    }
}
  • AllChannelHandler的父类WrappedChannelHandler的构造函数中会创建executor对象。
  • 每个连接会有一个executor对象,consumer侧的executor是基于连接维度的,每个connection会有对应的executor对象。

Handler封装关系

Handler封装关系

Consumer Client 响应流程

Consumer Client 响应阶段一
Consumer 响应阶段一
  • Consumer响应阶段一的调用栈如上图。
  • 按照NettyClientHandler => NettyClient =>MultiMessageHandler => HeartbeatHandler => AllChannelHandler的顺序进行调用。
NettyClientHandler
public class NettyClientHandler extends ChannelDuplexHandler {

    private final URL url;

    private final ChannelHandler handler;

    public NettyClientHandler(URL url, ChannelHandler handler) {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        this.url = url;
        this.handler = handler;
    }


    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelActive();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelInactive();
    }

    @Override
    public void disconnect(ChannelHandlerContext ctx, ChannelPromise future)
            throws Exception {
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
        try {
            handler.disconnected(channel);
        } finally {
            NettyChannel.removeChannelIfDisconnected(ctx.channel());
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
        try {
            handler.received(channel, msg);
        } finally {
            NettyChannel.removeChannelIfDisconnected(ctx.channel());
        }
    }
}
  • NettyClientHandler的各个方法负责处理各类连接读取事件。
AllChannelHandler
public class AllChannelHandler extends WrappedChannelHandler {

    public void received(Channel channel, Object message) throws RemotingException {
        // 获取对应的executor线程池对象
        ExecutorService executor = getExecutorService();
        try {
            // 构造ChannelEventRunnable对象并进行投递
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            if(message instanceof Request && t instanceof RejectedExecutionException){
                Request request = (Request)message;
                if(request.isTwoWay()){
                    String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
                    Response response = new Response(request.getId(), request.getVersion());
                    response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
                    response.setErrorMessage(msg);
                    channel.send(response);
                    return;
                }
            }
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }
}


public class WrappedChannelHandler implements ChannelHandlerDelegate {

    protected static final ExecutorService SHARED_EXECUTOR = Executors.newCachedThreadPool(new NamedThreadFactory("DubboSharedHandler", true));
    protected final ExecutorService executor;
    protected final ChannelHandler handler;
    protected final URL url;

    public ExecutorService getExecutorService() {
        ExecutorService cexecutor = executor;
        if (cexecutor == null || cexecutor.isShutdown()) {
            cexecutor = SHARED_EXECUTOR;
        }
        return cexecutor;
    }
}
  • AllChannelHandler负责往消费端线程池投递ChannelEventRunnable对象。
  • ExecutorService cexecutor = getExecutorService()获取线程池对象,每个连接一个ExecutorService对象。
Consumer Client 响应阶段二
Consumer 响应阶段二
  • Consumer 响应阶段二的调用栈如上图。
  • 调用栈按照ChannelEventRunnable => DecodeHandler => HeaderExchangeHandler => DefaultFuture的顺序调用。

ChannelEventRunnable

public class ChannelEventRunnable implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(ChannelEventRunnable.class);

    private final ChannelHandler handler;
    private final Channel channel;
    private final ChannelState state;
    private final Throwable exception;
    private final Object message;



    public ChannelEventRunnable(Channel channel, ChannelHandler handler, ChannelState state, Object message, Throwable exception) {
        this.channel = channel;
        this.handler = handler;
        this.state = state;
        this.message = message;
        this.exception = exception;
    }

    @Override
    public void run() {
        if (state == ChannelState.RECEIVED) {
            try {
                handler.received(channel, message);
            } catch (Exception e) {
                logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                        + ", message is " + message, e);
            }
        } else {
            switch (state) {
            case CONNECTED:
                try {
                    handler.connected(channel);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
                }
                break;
            case DISCONNECTED:
                try {
                    handler.disconnected(channel);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
                }
                break;
            case SENT:
                try {
                    handler.sent(channel, message);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                            + ", message is " + message, e);
                }
            case CAUGHT:
                try {
                    handler.caught(channel, exception);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                            + ", message is: " + message + ", exception is " + exception, e);
                }
                break;
            default:
                logger.warn("unknown state: " + state + ", message is " + message);
            }
        }

    }
  • ChannelEventRunnable的线程内部执行run()方法进行执行流程。
  • ChannelEventRunnable的内部的handler对象为DecodeHandler对象。
  • 执行DecodeHandler的内部会接着调用HeaderExchangeHandler对象方法。

HeaderExchangeHandler

public class HeaderExchangeHandler implements ChannelHandlerDelegate {

    public void received(Channel channel, Object message) throws RemotingException {
        channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
        ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
        try {
            if (message instanceof Request) {
                // 处理请求
                Request request = (Request) message;
                if (request.isEvent()) {
                    handlerEvent(channel, request);
                } else {
                    if (request.isTwoWay()) {
                        Response response = handleRequest(exchangeChannel, request);
                        channel.send(response);
                    } else {
                        handler.received(exchangeChannel, request.getData());
                    }
                }
            } else if (message instanceof Response) {
                // 处理响应
                handleResponse(channel, (Response) message);
            } else if (message instanceof String) {
                // 处理telnet等请求
                if (isClientSide(channel)) {
                    Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                    logger.error(e.getMessage(), e);
                } else {
                    String echo = handler.telnet(channel, (String) message);
                    if (echo != null && echo.length() > 0) {
                        channel.send(echo);
                    }
                }
            } else {
                handler.received(exchangeChannel, message);
            }
        } finally {
            HeaderExchangeChannel.removeChannelIfDisconnected(channel);
        }
    }


    static void handleResponse(Channel channel, Response response) throws RemotingException {
        if (response != null && !response.isHeartbeat()) {
            DefaultFuture.received(channel, response);
        }
    }
}
  • HeaderExchangeHandler的received()内部区别请求/响应/字符串进行不同的处理。
  • Consumer处理响应的逻辑在handleResponse()方法内部。
  • handleResponse()方法最终执行的是DefaultFuture的方法。

DefaultFuture

public class DefaultFuture implements ResponseFuture {

    public static void received(Channel channel, Response response) {
        try {
            DefaultFuture future = FUTURES.remove(response.getId());
            if (future != null) {
                future.doReceived(response);
            } else {
                logger.warn("The timeout response finally returned at "
                        + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
                        + ", response " + response
                        + (channel == null ? "" : ", channel: " + channel.getLocalAddress()
                        + " -> " + channel.getRemoteAddress()));
            }
        } finally {
            CHANNELS.remove(response.getId());
        }
    }


    private void doReceived(Response res) {
        lock.lock();
        try {
            response = res;
            if (done != null) {
                done.signal();
            }
        } finally {
            lock.unlock();
        }
        if (callback != null) {
            invokeCallback(callback);
        }
    }
}
  • DefaultFuture负责保存响应对象并通过信号量唤醒消费线程。

相关文章

网友评论

    本文标题:Dubbo Consumer 响应过程

    本文链接:https://www.haomeiwen.com/subject/imrefhtx.html