美文网首页
dubbo源码分析-Cluster

dubbo源码分析-Cluster

作者: 圣村的希望 | 来源:发表于2019-12-23 17:47 被阅读0次

         Cluster主要就是用来应对出错情况采取的策略,可以看下在dubbo官网中对Cluster的定位:

    cluster-key.png

         Cluster是整个集群的抽象,Cluster 将 Directory 中的多个 Invoker 伪装成一个 Invoker,对上层透明,伪装过程包含了容错逻辑,调用失败后,重试另一个。list Directory、route Router、select LoadBalance最后选择一个具体Invoker,代理通过具体Invoker进行远程调用获取结果。

         接下来看下Cluster类的继承关系:

    Cluster-Struct.png
    @SPI(FailoverCluster.NAME)
    public interface Cluster {
    ​
        /**
         * Merge the directory invokers to a virtual invoker.
         * 合并多个directory形成一个invoker
         * @param <T>
         * @param directory
         * @return cluster invoker
         * @throws RpcException
         */
        @Adaptive
        <T> Invoker<T> join(Directory<T> directory) throws RpcException;
    ​
    }
    

         可以看到Cluster的具体实现类很多,每个不同的实现类对应不同的集群实现策略:

    • Failsafe Cluster:失败安全,出现异常时,直接忽略。失败安全就是当调用过程中出现异常时,FailsafeClusterInvoker 仅会打印异常,而不会抛出异常。适用于写入审计日志等操作

    • Failover Cluster:失败自动切换,当调用出现失败的时候,会自动切换集群中其他服务器,来获得invoker重试,通常用于读操作,但重试会带来更长延迟。一般都会设置重试次数。默认策略

    • Failfast Cluster:只会进行一次调用,失败后立即抛出异常。适用于幂等操作,比如新增记录。

    • Failback Cluster:失败自动恢复,在调用失败后,返回一个空结果给服务提供者。并通过定时任务对失败的调用记录并且重传,适合执行消息通知等操作。

    • Forking Cluster:会在线程池中运行多个线程,来调用多个服务器,只要一个成功即返回。通常用于实时性要求较高的读操作,但需要浪费更多服务资源。一般会设置最大并行数。

    • Available Cluster:调用第一个可用的服务器,仅仅应用于多注册中心。

    • Broadcast Cluster:广播调用所有提供者,逐个调用,在循环调用结束后,只要任意一台报错就报错。通常用于通知所有提供者更新缓存或日志等本地资源信息

    • Mergeable Cluster:分组聚合。

    • MockClusterWrapper:本地伪装。

      下面以FailoverCluster为例,以服务调用为线进行源码分析:

    public class FailoverCluster implements Cluster {
    ​
        public final static String NAME = "failover";
    ​
        @Override
        public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
            return new FailoverClusterInvoker<T>(directory);
        }
    }
    

         FailoverCluster对join的实现就是创建一个FailoverClusterInvoker,交给FailoverClusterInvoker来进行实现处理。下面开始以服务引用为线来进行分析Cluster,服务引用是给对应的接口创建了代理类,这点有点类似mybatis的Mapper接口的实现,只不过是在dubbo中使用的是javasisst实现的动态代理,说起代理类,那就少不了对InvocationHandler的叙述,他是代理的行为表现。

    public class InvokerInvocationHandler implements InvocationHandler {
    ​
        private final Invoker<?> invoker;
    ​
        public InvokerInvocationHandler(Invoker<?> handler) {
            this.invoker = handler;
        }
    ​
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            String methodName = method.getName();
            Class<?>[] parameterTypes = method.getParameterTypes();
            if (method.getDeclaringClass() == Object.class) {
                return method.invoke(invoker, args);
            }
            if ("toString".equals(methodName) && parameterTypes.length == 0) {
                return invoker.toString();
            }
            if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
                return invoker.hashCode();
            }
            if ("equals".equals(methodName) && parameterTypes.length == 1) {
                return invoker.equals(args[0]);
            }
            return invoker.invoke(new RpcInvocation(method, args)).recreate();
        }
    }
    

         在代理行为中,只是把对应的方法和参数封装成一次方法调用RpcInvocation,然后调用Invoker的invoker方法,默认的集群策略是FailoverCluster,所以这里的Invoker是FailoverClusterInvoker,所以一切又都回到了FailoverClusterInvoker,FailoverClusterInvoker继承自AbstractClusterInvoker,所以这里也是调用AbstractClusterInvoker的invoke方法:

    public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {
        @Override
        @SuppressWarnings({"unchecked", "rawtypes"})
        public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
            List<Invoker<T>> copyinvokers = invokers;
            checkInvokers(copyinvokers, invocation);
            //获取重试次数,默认2次,总共最多调用3次
            int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
            if (len <= 0) {
                len = 1;
            }
            // retry loop.
            RpcException le = null; // last exception.
            List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
            Set<String> providers = new HashSet<String>(len);
            for (int i = 0; i < len; i++) {
                //Reselect before retry to avoid a change of candidate `invokers`.
                //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
                //这里是为了及时感应invokers的变化
                if (i > 0) {
                    checkWhetherDestroyed();
                    copyinvokers = list(invocation);
                    // check again
                    checkInvokers(copyinvokers, invocation);
                }
                //根据负载均衡策略获取可用Invoker
                Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
                invoked.add(invoker);
                RpcContext.getContext().setInvokers((List) invoked);
                try {
                    //调用具体Invoker的invoke方法
                    Result result = invoker.invoke(invocation);
                    if (le != null && logger.isWarnEnabled()) {
                        logger.warn("Although retry the method " + invocation.getMethodName()
                                + " in the service " + getInterface().getName()
                                + " was successful by the provider " + invoker.getUrl().getAddress()
                                + ", but there have been failed providers " + providers
                                + " (" + providers.size() + "/" + copyinvokers.size()
                                + ") from the registry " + directory.getUrl().getAddress()
                                + " on the consumer " + NetUtils.getLocalHost()
                                + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                                + le.getMessage(), le);
                    }
                    return result;
                } catch (RpcException e) {
                    //如果是用户异常直接抛出
                    if (e.isBiz()) { // biz exception.
                        throw e;
                    }
                    le = e;
                } catch (Throwable e) {
                    le = new RpcException(e.getMessage(), e);
                } finally {
                    providers.add(invoker.getUrl().getAddress());
                }
            }
            throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method "
                    + invocation.getMethodName() + " in the service " + getInterface().getName()
                    + ". Tried " + len + " times of the providers " + providers
                    + " (" + providers.size() + "/" + copyinvokers.size()
                    + ") from the registry " + directory.getUrl().getAddress()
                    + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
                    + Version.getVersion() + ". Last error is: "
                    + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le);
        }
    }
    

         FailoverClusterInvoker是失败重试策略,默认重试3次,重试次数可以具体配置。根据负载均衡获取可用Invoker进行具体invoke调用。Cluster策略不难,也没什么东西,就是对集群出错策略的抽象处理,封装了对Directory、Router和Balance的处理,不同的策略对应不同的实现类,具体通过SPI进行加载,默认为FailoverCluster,可以通过如下配置改变集群容错策略:

    <dubbo:service interface="org.apache.dubbo.demo.DemoService" ref="demoService" cluster="failover" retries="2" />
    

    画一个时序图(待续)

    相关文章

      网友评论

          本文标题:dubbo源码分析-Cluster

          本文链接:https://www.haomeiwen.com/subject/nrctoctx.html