美文网首页
详解Dubbo(四):消费端请求发送Exchanger

详解Dubbo(四):消费端请求发送Exchanger

作者: 空挡 | 来源:发表于2020-05-04 22:07 被阅读0次

前言

前两篇文章讲了消费端代理的生成,最终到请求发送操作由Invoker来完成。Invoker同时集成了集群服务发现和路由功能,还集成了调用过程中的自定义扩展Filter。Invoker是业务对象的分水岭,请求到达Invoker之前,都是以业务接口和方法的方式调用,就是说调用方要拿到接口定义的api。Invoker之后就是Exchanger和Transporter层,只存在Request/Response了,在这里接口和方法变成了Request的一个参数。这篇文章先看一下Dubbo是怎么初始化远程通信Client并发送和接收请求的。下一篇将解析通信协议以及序列化等操作的实现。

Client初始化

回顾之前的白话Dubbo系列,Invoker调用的是Exchanger。Exchange层针对消费端和服务提供端分布封装成ExchangeClientExchangeServer。它们大部分接口都是一样的,只是对于Client来说,支持connect()操作来和提供端建立连接;而对于Server端,需要通过bind操作来监听端口,来接收消费端的连接请求。其它的数据发送和接收对于两端来说其实是一样的。

Dubbo Client初始化

还是以Dubbo协议为例,回顾下上一篇DubboProtocol的Invoker初始化操作:

@Override
    public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
        ...
        ...
        // 创建Invoker
        DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
        invokers.add(invoker);
        return invoker;
    }

在构造Invoker的时候,需要传入一个Client的列表,之后Invoker通过client发送请求和接收返回结果,至于请求协议打包、连接建立、异步io的结果接收等操作留给ExchangeClient来实现。下面看下getClients()方法的实现:

private ExchangeClient[] getClients(URL url) {
        boolean useShareConnect = false;
        //连接数配置
        int connections = url.getParameter(CONNECTIONS_KEY, 0);
        List<ReferenceCountExchangeClient> shareClients = null;
        // 如果没设置连接数,说明Consumer希望使用共享连接
        if (connections == 0) {
            useShareConnect = true;
            //共享连接数配置
            String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);
            connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY,
                    DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);
            //获取共享Client
            shareClients = getSharedClient(url, connections);
        }
        //根据配置的连接个数初始化Client
        ExchangeClient[] clients = new ExchangeClient[connections];
        for (int i = 0; i < clients.length; i++) {
            if (useShareConnect) {
                //使用共享client
                clients[i] = shareClients.get(i);
            } else {
                //初始化Client
                clients[i] = initClient(url);
            }
        }
        return clients;
    }

上面的逻辑中涉及到共享Client的概念,因为Dubbo的服务是以接口为粒度的,每个Invoker对应了一个远程接口的调用封装。在实际应用中,一个应用会包含多个接口,如果对应同一个应用的多个Invoker每个都初始化一个Client的话,万一接口过多,会造成每个Consumer和Provider之间建立多个Connection,而且连接数随着consumer的个数增加而成倍数的增加。所以shareClient的意思就是对于同一个ip+port,所有invoker共享client,有点类似于连接池的概念,达到节约资源的目的。共享client最终初始化client的方式和普通的是一样的,所以这里直接看下initClient()方法是如何实现的。

private ExchangeClient initClient(URL url) {
        // client通信框架,默认netty
        String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));
        //使用Dubbo通信协议
        url = url.addParameter(CODEC_KEY, DubboCodec.NAME);
        // 设置发送心跳的间隔
        url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));
        // 检查传输层是否支持该框架
        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
            throw new RpcException("Unsupported client type: " + str + "," +
                    " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
        }

        ExchangeClient client;
        try {
            // client延迟建立连接,即第一次调用时才connect,已经不推荐使用
            if (url.getParameter(LAZY_CONNECT_KEY, false)) {
                client = new LazyConnectExchangeClient(url, requestHandler);
            } else {
                //初始化并连接server
                client = Exchangers.connect(url, requestHandler);
            }
        } catch (RemotingException e) {
            throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
        }
        return client;
    }

上面的初始化过程除了初始化参数外,就是调用工具类Exchangers来初始化client。这里面除了url之外还有一个ExchangeHandler参数,这个是用来处理服务端主动发送来的消息用的,对于Consumer发送请求的场景这里涉及不到。看下Exchangers.connect()是怎么实现的。

public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
        return getExchanger(url).connect(url, handler);
    }

具体实现就是根据url参数获取对应的Exchanger,然后调用它的connect方法。Dubbo中默认实现类是HeaderExchanger

public class HeaderExchanger implements Exchanger {

    public static final String NAME = "header";

    @Override
    public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
    }

    @Override
    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }

}

上面实现中,通过调用connect()方法获取到ExchangeClient的实现HeaderExchangeClient,这里会通过工具类Transporters获取到一个传输层的Client对象。获取Client对象时,对传入的handler,又做了两层封装,DecodeHandler用来对收到的Response中的数据部分解码成对象;而HeaderExchangeHandler作用是对于异步返回的Response找到当时的Request,将结果返给当初的调用方。

HeaderExchangeClient初始化

public HeaderExchangeClient(Client client, boolean startTimer) {
        Assert.notNull(client, "Client can't be null");
        this.client = client;
        this.channel = new HeaderExchangeChannel(client);

        if (startTimer) {
            URL url = client.getUrl();
            startReconnectTask(url);
            startHeartBeatTask(url);
        }
    }

由上面Client的构造函数可以看出主要做了两件事,首先将传入的client实现封装了一层,client的方法比如request()都是直接调用的channel的方法;其次启动了两个定时任务,一个是在和server端的connection断开后重连,一个是定时发送心跳。下面看下HeaderExchangeChannel对client做了一层封装后,主要干了什么。

    @Override
    public void send(Object message, boolean sent) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send message " + message + ", cause: The channel " + this + " is closed!");
        }
        if (message instanceof Request
                || message instanceof Response
                || message instanceof String) {
            channel.send(message, sent);
        } else {
            Request request = new Request();
            request.setVersion(Version.getProtocolVersion());
            request.setTwoWay(false);
            request.setData(message);
            channel.send(request, sent);
        }
    }
    @Override
    public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
        }
        // create request.
        Request req = new Request();
        req.setVersion(Version.getProtocolVersion());
        req.setTwoWay(true);
        req.setData(request);
        DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);
        try {
            channel.send(req);
        } catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        return future;
    }

这个类在发送前,将调用参数封装成一个request,设置版本号和请求类型。如果调用的是request()方法,即需要接收response,以异步请求的方式发出,返回给调用方(也就是Invoker)一个future。

请求发送过程

现在理一下一次请求的发送过程。在系统初始化阶段,创建Invoker的时候会初始化一个和服务端交互的ExchangeClient,对于Dubbo协议来说,就是DubboInvoker中包含一个或者多个HeaderExchangeClient。当代理调用Invoker的invoke()方法发送请求时,invoker选择一个Client发送request。如果是OneWay的请求,调用send方法,直接返回成功或者失败的结果。如果是TwoWay的请求,Client会返回一个Future给invoker,然后client在收到response后,会将收到的结果set到Future中,调用方就可以拿到远程接口的返回值了。

接收请求响应

在上面Invoker初始化Client的时候,需要传入一个ExchangeHandler用来接收异步响应回调。在HeaderExchangeClient初始化的时候,又在handler上面套了两层,所以最终的关系图大概时下面这样:

请求流转

在底层的Transporter收到Server端的数据并处理后,会将数据给到DecodeHandler,这个handler判断返回的数据是否实现了Decodeable接口,是的话就调用decode()方法并把解码出的value设置到Response中。

@Override
    public void received(Channel channel, Object message) throws RemotingException {
        if (message instanceof Decodeable) {
            decode(message);
        }
        //provider端解码Request
        if (message instanceof Request) {
            decode(((Request) message).getData());
        }
       //consumer端解码Response
        if (message instanceof Response) {
            decode(((Response) message).getResult());
        }
       //调用下一个handler
        handler.received(channel, message);
    }

第二个handler是HeaderExchangeHandler,如果是Server端返回的请求响应,最终会到handleResponse()方法中:

static void handleResponse(Channel channel, Response response) throws RemotingException {
        if (response != null && !response.isHeartbeat()) {
            DefaultFuture.received(channel, response);
        }
    }

还记得上面的请求发送过程吗?对于TwoWay的请求,request发出后,调用方会得到一个Future,这个Future就是在这个Handler这里填充结果的。这样一次完整的请求就完成了。

总结

Invoker将一次远程方法的调用封装成Request后,通过ExchangeClient发送出去,并通过传入的ExchangeHandler参数处理异步返回的Response并和之前的Request关联,返回给调用方。Dubbo这里对于Exchange和Transporter的划分使用了MEP设计模式(Message Exchange Pattern),感兴趣的话可以看下这个模式的定义。

相关文章

网友评论

      本文标题:详解Dubbo(四):消费端请求发送Exchanger

      本文链接:https://www.haomeiwen.com/subject/ulbkghtx.html