美文网首页
dubbo源码4-protocol

dubbo源码4-protocol

作者: modou1618 | 来源:发表于2019-01-03 14:19 被阅读0次

    一 介绍

    1.1初始化

    初始化.png

    1.2 支持协议

    dubbo协议层支持多种协议类型


    协议层类关系图.png
    • 协议层对<dubbo:reference/>配置的处理是把对接口代理的调用转换成响应的协议报文格式。

    • 协议层对<dubbo:service/>配置的处理是把响应协议报文转换成服务端接口实现类的调用。

    • RedisProtocol和MemcachedProtocol是提供把对redis和memcach的客户端调用访问封装成接口调用格式的协议。

    • InjvmProtocol是提供进程内的接口访问和服务提供。

    • AbstractProxyProtocol下的几个实现类都是基于各自的接口库来实现协议接口转换

    • ThriftProtocol支持thrift协议格式的接口转换

    • DubboProtocol支持dubbo协议格式的接口转换

    本文主要介绍dubbo协议。

    二 DubboProtocol.refer()

    <dubbo:reference/>配置在协议层的处理,主要是创建client和服务端建立连接。

    • 注册配置的序列化类
    • 初始化DubboInvoker,提供给cluster层,在接口调用时使用。
    public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
            optimizeSerialization(url);
            // create rpc invoker.
            DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
            invokers.add(invoker);
            return invoker;
        }
    

    在DubboInvoker中主要看两个成员

    private final ExchangeClient[] clients;
    private final AtomicPositiveInteger index = new AtomicPositiveInteger();
    
    • clients数组表示和服务端的连接结合,用于数据通信.
      connections配置表示和服务端的最大连接数,即clients数组长度。
      connections值为0或未配置则表示和服务端只建立一个连接,所有通信都走这个连接,即共享客户端。
    • index,如果和服务端有多个连接,则依赖index递增轮询的方式选择用于通信的client。
      currentClient = clients[index.getAndIncrement() % clients.length];

    2.1 通信客户端

    2.1.1 共享客户端

    共享客户端涉及到的属性如下:

    //共享客户端存储
    private final Map<String /* url地址*/, ReferenceCountExchangeClient> referenceClientMap = new ConcurrentHashMap<String, ReferenceCountExchangeClient>(); 
    // referenceClientMap变更使用的锁
    private final ConcurrentMap<String, Object> locks = new ConcurrentHashMap<String, Object>();
    // <host:port,Exchanger>
    private final ConcurrentMap<String, LazyConnectExchangeClient> ghostClientMap = new ConcurrentHashMap<String, LazyConnectExchangeClient>();
    
    
    • ReferenceCountExchangeClient 类内部使用引用计数的方式记录共享的数量。
    • ghostClientMap在client连接断开后临时缓存该client,使用的是LazyConnectExchangeClient存储。避免断开后还有数据需要发送。
    • LazyConnectExchangeClient 在每次数据传输前,先判断tcp连接状态,若连接断开则先执行connect建立连接。
        private void initClient() throws RemotingException {
            if (client != null)
                return;
            if (logger.isInfoEnabled()) {
                logger.info("Lazy connect to " + url);
            }
            connectLock.lock();
            try {
                if (client != null)
                    return;
                this.client = Exchangers.connect(url, requestHandler);
            } finally {
                connectLock.unlock();
            }
        }
    
        public ResponseFuture request(Object request) throws RemotingException {
            warning(request);
            initClient();
            return client.request(request);
        }
    
    

    通信使用的ExchangeClient和并发客户端一样都通过initClient()创建
    ExchangeClient exchangeClient = initClient(url);

    2.1.2 并发客户端

    initClient()创建通信的client。

    • 配置通信框架,默认使用netty
    • 配置编解码类型为dubbo, 表示DubboCountCodec。
      dubbo=com.alibaba.dubbo.rpc.protocol.dubbo.DubboCountCodec
    • 配置连接心跳时间,默认60s
    • 若url配置lazy=false, 则调用Exchangers.connect(url, requestHandler)建立tcp连接,获取client。
    • 若url配置lazy=true,则初始化为LazyConnectExchangeClient,内部存储Exchangers.connect(url, requestHandler)获取的client。requestHandler为收包处理函数。
        private ExchangeClient initClient(URL url) {
            // client type setting.
            String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
    
            url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
            // enable heartbeat by default
            url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
    
            // BIO is not allowed since it has severe performance issue.
            if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
                throw new RpcException("Unsupported client type: " + str + "," +
                        " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
            }
    
            ExchangeClient client;
            try {
                // connection should be lazy
                if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
                    client = new LazyConnectExchangeClient(url, requestHandler);
                } else {
                    client = Exchangers.connect(url, requestHandler);
                }
            } catch (RemotingException e) {
                throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
            }
            return client;
        }
    

    三 consumer端调用

    返回给cluster层的是DubboInvoker,类继承关系如下

    类继承.png
    接口调用时InvokerInvocationHandler中
    invoker.invoke(new RpcInvocation(method, args))

    3.1 协议层调用

    • invoke(Invocation inv)
      1.对参数RpcInvocation做一些初始化工作
      2.RpcContext.getContext().getAttachments() 可以向服务端传输一些用户自定义的上下文信息。
      3.method的配置项async表示是否异步调用
      4.调用doInvoke()做实际的调用。
    public Result invoke(Invocation inv) throws RpcException {
            // if invoker is destroyed due to address refresh from registry, let's allow the current invoke to proceed
            if (destroyed.get()) {
                logger.warn("Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, "
                        + ", dubbo version is " + Version.getVersion() + ", this invoker should not be used any longer");
            }
    
            RpcInvocation invocation = (RpcInvocation) inv;
            invocation.setInvoker(this);
            if (attachment != null && attachment.size() > 0) {
                invocation.addAttachmentsIfAbsent(attachment);
            }
            Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
            if (contextAttachments != null && contextAttachments.size() != 0) {
                /**
                 * invocation.addAttachmentsIfAbsent(context){@link RpcInvocation#addAttachmentsIfAbsent(Map)}should not be used here,
                 * because the {@link RpcContext#setAttachment(String, String)} is passed in the Filter when the call is triggered
                 * by the built-in retry mechanism of the Dubbo. The attachment to update RpcContext will no longer work, which is
                 * a mistake in most cases (for example, through Filter to RpcContext output traceId and spanId and other information).
                 */
                invocation.addAttachments(contextAttachments);
            }
            if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) {
                invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
            }
            RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    
            try {
                return doInvoke(invocation);
            } catch (InvocationTargetException e) { // biz exception
                Throwable te = e.getTargetException();
                if (te == null) {
                    return new RpcResult(e);
                } else {
                    if (te instanceof RpcException) {
                        ((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
                    }
                    return new RpcResult(te);
                }
            } catch (RpcException e) {
                if (e.isBiz()) {
                    return new RpcResult(e);
                } else {
                    throw e;
                }
            } catch (Throwable e) {
                return new RpcResult(e);
            }
        }
    
    • doInvoke(final Invocation invocation)


      调用.png

    四 DubboProtocol.expport()

    <dubbo:service/>配置在协议层的处理,主要是建立tcp server;注册协议层的收包处理函数,函数内调用服务提供者;等待客户端连接。

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
            URL url = invoker.getUrl();
    
            // export service.
            String key = serviceKey(url);
            DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
            exporterMap.put(key, exporter);
            ...
            openServer(url);
            optimizeSerialization(url);
            return exporter;
        }
    
    • ConcurrentMap<String/*接口信息*/, Exporter<?>> exporterMap 存储接口的DubboExporter对象,DubboExporter.invoker为接口服务实现者。
    • openServer 创建接口对应的ExchangeServer。
    • optimizeSerialization注册序列化

    4.1 创建server

    • 默认心跳报文间隔60s, 默认传输框架netty,默认编解码dubboCodec,默认连接关闭后发送readonly事件。
    • ExchangeServer server = Exchangers.bind(url, requestHandler); 创建server,绑定请求处理函数requestHandler
    private ExchangeServer createServer(URL url) {
            // send readonly event when server closes, it's enabled by default
            url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
            // enable heartbeat by default
            url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
            String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
    
            if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
                throw new RpcException("Unsupported server type: " + str + ", url: " + url);
    
            url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
            ExchangeServer server;
            try {
                server = Exchangers.bind(url, requestHandler);
            } catch (RemotingException e) {
                throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
            }
            str = url.getParameter(Constants.CLIENT_KEY);
            if (str != null && str.length() > 0) {
                Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
                if (!supportedTypes.contains(str)) {
                    throw new RpcException("Unsupported client type: " + str);
                }
            }
            return server;
        }
    

    五 收包处理函数

    收包处理函数.png

    5.1 连接和断连回调

    • channel连接或断开连接事件处理函数中调用invoker()。参数methodKey为配置的对应事件回调函数的key值,为onconnect和ondisconnect。
    • createInvocation()创建一个对配置的回调函数服务调用的请求体RpcInvocation。
    • 调用正常请求报文处理函数reply()
    @Override
            public void connected(Channel channel) throws RemotingException {
                invoke(channel, Constants.ON_CONNECT_KEY);
            }
    
            @Override
            public void disconnected(Channel channel) throws RemotingException {
                if (logger.isInfoEnabled()) {
                    logger.info("disconnected from " + channel.getRemoteAddress() + ",url:" + channel.getUrl());
                }
                invoke(channel, Constants.ON_DISCONNECT_KEY);
            }
    
            private void invoke(Channel channel, String methodKey) {
                Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey);
                if (invocation != null) {
                    try {
                        received(channel, invocation);
                    } catch (Throwable t) {
                        logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t);
                    }
                }
            }
    
            private Invocation createInvocation(Channel channel, URL url, String methodKey) {
                String method = url.getParameter(methodKey);
                if (method == null || method.length() == 0) {
                    return null;
                }
                RpcInvocation invocation = new RpcInvocation(method, new Class<?>[0], new Object[0]);
                invocation.setAttachment(Constants.PATH_KEY, url.getPath());
                invocation.setAttachment(Constants.GROUP_KEY, url.getParameter(Constants.GROUP_KEY));
                invocation.setAttachment(Constants.INTERFACE_KEY, url.getParameter(Constants.INTERFACE_KEY));
                invocation.setAttachment(Constants.VERSION_KEY, url.getParameter(Constants.VERSION_KEY));
                if (url.getParameter(Constants.STUB_EVENT_KEY, false)) {
                    invocation.setAttachment(Constants.STUB_EVENT_KEY, Boolean.TRUE.toString());
                }
                return invocation;
            }
    

    5.2 报文接收处理

    • Invocation类型表示是请求报文,调用reply处理
    public void received(Channel channel, Object message) throws RemotingException {
                if (message instanceof Invocation) {
                    reply((ExchangeChannel) channel, message);
                } else {
                    super.received(channel, message);
                }
            }
    
    • getInvoker(),从本地缓存exporterMap中获取接口服务提供者。
    • 回调类型的函数,需要接口服务提供对应回调函数的实现,否则报错。
    • 调用method对应的接口服务。
    public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
                if (message instanceof Invocation) {
                    Invocation inv = (Invocation) message;
                    Invoker<?> invoker = getInvoker(channel, inv);
                    // need to consider backward-compatibility if it's a callback
                    if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
                        String methodsStr = invoker.getUrl().getParameters().get("methods");
                        boolean hasMethod = false;
                        if (methodsStr == null || methodsStr.indexOf(",") == -1) {
                            hasMethod = inv.getMethodName().equals(methodsStr);
                        } else {
                            String[] methods = methodsStr.split(",");
                            for (String method : methods) {
                                if (inv.getMethodName().equals(method)) {
                                    hasMethod = true;
                                    break;
                                }
                            }
                        }
                        if (!hasMethod) {
                            logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
                                    + " not found in callback service interface ,invoke will be ignored."
                                    + " please update the api interface. url is:"
                                    + invoker.getUrl()) + " ,invocation is :" + inv);
                            return null;
                        }
                    }
                    RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                    return invoker.invoke(inv);
                }
                throw new RemotingException(channel, "Unsupported request: "
                        + (message == null ? null : (message.getClass().getName() + ": " + message))
                        + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
            }
    

    六 filter与listener

    在registryProtocol和dubboProtocol中间有一层filter和listener,用于添加对调用流程的过滤和监听处理。如之前提到的echofilter,ActiveLimitFilter。


    image.png

    6.1 provider端filter

    • TpsLimitFilter ups限制过滤
    • TokenFilter consumer的token验证, invocation.attachments传递token
    • TimeoutFilter 超时过滤,超时则纪录warn日志
    • GenericFilter 通用调用转换成实际接口调用
    • ExecuteLimitFilter provider端执行接口调用的线程数限制过滤
    • ExceptionFilter 异常过滤处理
    • EchoFilter 回响调用过滤, consumer端AbstractProxyFactory获取接口代理时,添加echoService接口到代理中。
    • ContextFilter 更新provider端的RpcContext,获取invocation的attachments附加参数
    • AccessLogFilter 接口调用日志写入本地,异步线程写日志
    • ClassLoaderFilter 接口调用时更新为invoker的classLoader,调用完成后恢复原来的loader

    6.2 consumer端tilter

    • GenericImplFilter 通用调用过滤,普通接口调用转换成通用调用 $invoke(methodName, paramsType, paramsArgs)
    • ActiveLimitFilter consumer端调用接口的并发数限制,超过则等待直到超时
    • ConsumerContextFilter 更新consumer端的RpcContext

    6.3 共用filter

    • CompatibleFilter 根据序列化类型对结果做转换处理,或做类型转换

    相关文章

      网友评论

          本文标题:dubbo源码4-protocol

          本文链接:https://www.haomeiwen.com/subject/ompxrqtx.html