美文网首页
Dubbo2.7的泛化调用和超时机制

Dubbo2.7的泛化调用和超时机制

作者: 九点半的马拉 | 来源:发表于2021-03-19 16:58 被阅读0次

    路途虽遥远,将来更美好
    微信公号号:九点半的马拉

    在传统模式下,Dubbo消费端需要调用某一远程服务器端的方法时,消费端也需要额外导入服务类接口API,Dubbo也由此实现了面向接口代理的高性能RPC调用。

    但是当服务消费端没有服务接口或方法参数类型时,无法使用上述的方式进行服务调用,针对该场景,Dubbo使用泛化调用方法进行服务调用。

    Dubbo在进行泛化调用时,将相关信息封装到Map对象中,并利用GenericService接口处理。

    举个例子

    服务器端配置:

    <bean id="helloserviceimpl" class="org.apache.dubbo.samples.generic.call.impl.HelloServiceImpl"/>
    
    <dubbo:service interface="org.apache.dubbo.samples.generic.call.api.HelloService" ref="helloserviceimpl"/>
    

    服务器端服务具体实现类:

    public class HelloServiceImpl implements HelloService {
        @Override
        public CompletableFuture<String> sayHelloAsync(String name) {
            CompletableFuture<String> future = new CompletableFuture<>();
            new Thread(() -> {
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                future.complete("sayHelloAsync: " + name);
            }).start();
            return future;
        }
    

    消费端配置:

    public class GenericCallConsumer {
        private static GenericService genericService;
        public static void main(String[] args) throws Exception {
            ApplicationConfig applicationConfig = new ApplicationConfig();
            applicationConfig.setName("generic-call-consumer");
            RegistryConfig registryConfig = new RegistryConfig();
            registryConfig.setAddress("zookeeper://127.0.0.1:2181");
            ReferenceConfig<GenericService> referenceConfig = new ReferenceConfig<>();
        referenceConfig.setInterface("org.apache.dubbo.samples.generic.call.api.HelloService");
            applicationConfig.setRegistry(registryConfig);
            referenceConfig.setApplication(applicationConfig);
            // 开启泛化
            referenceConfig.setGeneric(true);
            referenceConfig.setAsync(true);
            referenceConfig.setTimeout(7000);
            genericService = referenceConfig.get();
            invokeAsyncSayHelloAsync();
    
    public static void invokeAsyncSayHelloAsync() throws Exception {
            CompletableFuture<Object> future = genericService.$invokeAsync("sayHelloAsync",
                    new String[]{"java.lang.String"}, new Object[]{"world"});
            CountDownLatch latch = new CountDownLatch(1);
            future.whenComplete((value, t) -> {
               System.err.println("invokeAsyncSayHelloAsync(whenComplete): " + value);
                latch.countDown();
            });
            latch.await();
    }
    

    原理分析

    消费端通过一个代理对象进行服务调用,

    1)执行InvokerInvocationHandler#invoke方法,

    2)调用MockClusterInvoker#invoke方法

    3)调用AbstractCluster$InterceptorInvokerNode#invoke方法

    code1.png

    这里新增了一个ClusterInterceptor,与Filter不同,它在一个特定的地址或invoker被选择之前的较外层执行逻辑,在服务发现之前拦截请求。

    3.1)调用ConsumerContextClusterInterceptor#before方法。
    从RpcContext设置invocation,并设置localAddress和invoker(默认FailoverClusterInvoker),清除RpcContext内部的SERVER_LOCAL上下文内容。

    3.2)调用ClusterInterceptor#intercept方法

    code2.png

    默认调用FailoverClusterInvoker#doinvoke方法。

    在该方法中从RegistryDirectory中获取invoker列表,然后获取负载均衡LoadBalance(默认RandomLoadBalance), 选择一个invoker,进行服务调用。

    4)调用InvokerWrapper#invoke方法,之后执行一个Filter链

    4.1)调用ConsumerContextFilter#invoke方法,

    设置RpcContext中的invoker变量问当前invoker(默认是ProtocolFilterWrapper),设置invocation。

    从RpcContext中获取timeout-countdown变量,如果存在,则转化为TimeCountDown对象,判断该调用是否超时,如果超时,则返回一个AsyncRpcResult对象,记录一个异常。

    4.2)调用FutureFilter#invoke方法和MonitorFilter#invoke方法。

    4.3)调用GenericImplFilter#invoke方法,这里是泛化调用在客户端的主要核心步骤

    4.3.1)从url中获取generic字段,调用方法不是$invoke,也不是$invokeAysnc时:

    4.3.1.1)重新创建一个RpcInvocation,在attributes变量中添加GENERIC_IMPL_MARKER值,设置为true,其中:attributes变量参数类型为Map<Object,Object>,并且该变量只在调用者端,不会出现在线路上。

    4.3.1.2)获取调用的方法名,调用的参数类型和参数值,对参数类型进行解析修改,效果如下:

    java.lang.Object[][].class => "java.lang.Object[][]"
    

    4.3.1.3)如果泛化调用方式为bean方式,遍历参数值,并序列化为JavaBeanDescriptor类型数据;

    如果是其他调用方式,深入对象,将复杂类型转化为简单类型。

    4.3.1.4)如果方法返回类型是CompletableFuture,则设置方法名为$invokeAysnc;其他情况设置方法名为$invoke

    4.3.1.5)将参数类型设置为new Class<?>[]{String.class, String[].class, Object[].class};,这样转化为传统的泛化调用方式,并将参数值设置为类似new Object[]{methodName, types, args}的格式。

    4.3.2)当调用方法为$invoke或者$invokeAysnc,并且方法参数变量数量为3个时,首先获取泛化参数,然后判断泛化调用方式:

    4.3.2.1)如果是nativejava方式,判断参数是否为byte[]类型;如果不是,则说明参数传递异常。

    4.3.2.2)如果是bean方式,则判断参数是否为JavaBeanDescriptor类型;如果不是,则说明参数传递异常。

    4.3.3)在RpcInvocation中的attachment中设置是否泛化调用。

    4.4) 调用invoker#invoke方法

    当远程调用返回结果时,会触发onResponse方法。

    从url中获取generic参数值,从invocation中获取方法名,方法参数类型,参数值GENERIC_IMPL是否存在。

    如果参数值GENERIC_IMPL存在,并且为true:
    从invoker中获取接口类型,当方法不是$invoke也不是$invokeAysnc,并且接口父类型为GenericService时,从invoker中的interface参数中获取真实的interface,并转化为Class类型。

    之后,是所有不同调用方式的统一处理。

    获取调用的方法Method,如果调用方式是Bean方式:
    判断appReponse的value是否为JavaBeanDescriptor类型,如果是,将该value进行反序列化,重新赋值;如果不是,则抛出异常。

    如果是其他调用方式,则使用PojoUtils工具类进行反序列化。

    5)调用AsyncToSyncInvoker#invoke方法

    6)调用DubboInvoker#invoke方法,发起远程调用。

    服务端收到请求后,在最终调用AbstractProxyInvoker#invoke方法之前,会先执行一个过滤器链,和上述的消费端的类似,其中会经过一个GenericFilter,该类是服务端实现泛化调用功能的重要步骤

    GenericFilter中,首先判断方法名是否为$invoke或者$invokeAsync,由此来判定是否为泛化调用,如果方法名不是这两个,则直接调用下一个invoker;如果是,则执行下面的逻辑。

    获取参数名称、参数类型、参数值,通过反射获取调用方法,根据不同的调用方式进行反序列化,获取实际调用方法的相关信息,然后将RpcInvocation中的相关信息进行替换:

    code3.png

    Dubbo2.7下的超时机制

    在上述中的ConsumerContextFilter#invoke中涉及到了超时情况的处理,使用了TimeoutCountDown类,是2020.5.1日提交的信息。

    Dubbo2.6版本中,在HeaderExchangeChannel中进行远程调用前,会创建一个DefaultFuture对象,里面有一个静态代码块,创建一个线程,执行RemotingInvocationTimeoutScan任务,轮询FUTURES集合,通过DefaultFuture记录的开始时间与当前时间进行计算,判断是否超时,如果超时,则直接创建一个超时的Response,并将该DefaultFuture从FUTURES集合中移除。

    当服务端在一定时间内执行完逻辑后,会发送给客户端,在此之前,客户端通过定时任务已经将相关信息从FUTURES集合中移除,所以这次服务端发送过来的信息在FUTURES集合中查找不到,所以不做处理,服务端的这次发送显得有些多余,对于客户端来说是无用的。

    所以在2020.5.1日,提交了上述代码来解决这一问题。

    code4.png

    DubboInvoker#doInvoke方法中进行远程远程调用前,会计算timeout。

    code5.png

    首先从RpcContext中获取timeout-countdown变量值

    如果为空:
    ----》 从url中通过timeout参数获取超时时间,默认是1000,
    ----》从url中获取enable-timeout-countdown参数值,默认是false,通过该参数开启新的超时机制(使用了上述的TimeoutCountDown)
    ----》如果开启了,在attachments变量里添加_TO变量,值为计算后的timeout

    如果不为空:
    将其转化为 TimeoutCountDown对象,计算剩余的有效时间,将其设置为新的timeout,并将其添加到_TO变量,值为计算后的timeout。

    code6.png

    那TimeoutCountDown对象在什么时候被创建的呢?

    在服务端的ContextFilter。

    从RpcInvocation中获取“_TO”变量的值,如果不为-1,则在RpcContext中创建一个TimeoutCountDown。

    code7.png

    在后续的TimeoutFilter中,从RpcContext中获取TimeoutCountDown,如果超时了,则清空处理的结果。

    code8.png

    在消费端的ConsumerContextFilter中,在进行远程调用前,同样从RpcContext中获取TimeOutCountDown,当过期时,直接返回一个异常,而不再进行远程调用。

    code10.png

    疑问点

    但是最后有一个疑问,即当超时后,服务端仍然会发送给客户端,虽然结果已经被清空,(可能自己的理解问题)。

    code11.png

    下面的这个建议感觉挺好的,但是在Dubbo在没有发现类似的机制。


    code12.png

    相关文章

      网友评论

          本文标题:Dubbo2.7的泛化调用和超时机制

          本文链接:https://www.haomeiwen.com/subject/xsfbcltx.html