美文网首页
Java进阶-Dubbo-进阶

Java进阶-Dubbo-进阶

作者: GIT提交不上 | 来源:发表于2022-02-08 23:51 被阅读0次

    一、服务调用过程

    1.1 服务调用方式

      Dubbo 服务调用过程:

    image.png

      Dubbo 支持同步和异步两种调用方式,其中异步调用还可细分为“有返回值”的异步调用和“无返回值”的异步调用。默认情况下,Dubbo 使用同步调用方式。

      Dubbo 实现同步和异步调用比较关键的一点就在于由谁调用 ResponseFuture 的 get 方法。同步调用模式下,由框架自身调用 ResponseFuture 的 get 方法。异步调用模式下,则由用户调用该方法。

      代理类:将运行时参数存储到数组中,然后调用 InvocationHandler 接口实现类的 invoke 方法,得到调用结果,最后将结果转型并返回给调用方。

    public class proxy0 implements ClassGenerator.DC, EchoService, DemoService {
        // 方法数组
        public static Method[] methods;
        private InvocationHandler handler;
    
        public proxy0(InvocationHandler invocationHandler) {
            this.handler = invocationHandler;
        }
    
        public proxy0() {
        }
    
        public String sayHello(String string) {
            // 将参数存储到 Object 数组中
            Object[] arrobject = new Object[]{string};
            // 调用 InvocationHandler 实现类的 invoke 方法得到调用结果
            Object object = this.handler.invoke(this, methods[0], arrobject);
            // 返回调用结果
            return (String)object;
        }
    
        /** 回声测试方法 */
        public Object $echo(Object object) {
            Object[] arrobject = new Object[]{object};
            Object object2 = this.handler.invoke(this, methods[1], arrobject);
            return object2;
        }
    }
    
    服务调用方式.png
    public class DefaultFuture implements ResponseFuture {
        
        private static final Map<Long, Channel> CHANNELS = 
            new ConcurrentHashMap<Long, Channel>();
    
        private static final Map<Long, DefaultFuture> FUTURES = 
            new ConcurrentHashMap<Long, DefaultFuture>();
        
        private final long id;
        private final Channel channel;
        private final Request request;
        private final int timeout;
        private final Lock lock = new ReentrantLock();
        private final Condition done = lock.newCondition();
        private volatile Response response;
        
        public DefaultFuture(Channel channel, Request request, int timeout) {
            this.channel = channel;
            this.request = request;
            
            // 获取请求 id,这个 id 很重要,后面还会见到
            this.id = request.getId();
            this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            // 存储 <requestId, DefaultFuture> 映射关系到 FUTURES 中
            FUTURES.put(id, this);
            CHANNELS.put(id, channel);
        }
        
        @Override
        public Object get() throws RemotingException {
            return get(timeout);
        }
    
        @Override
        public Object get(int timeout) throws RemotingException {
            if (timeout <= 0) {
                timeout = Constants.DEFAULT_TIMEOUT;
            }
            
            // 检测服务提供方是否成功返回了调用结果
            if (!isDone()) {
                long start = System.currentTimeMillis();
                lock.lock();
                try {
                    // 循环检测服务提供方是否成功返回了调用结果
                    while (!isDone()) {
                        // 如果调用结果尚未返回,这里等待一段时间
                        done.await(timeout, TimeUnit.MILLISECONDS);
                        // 如果调用结果成功返回,或等待超时,此时跳出 while 循环,执行后续的逻辑
                        if (isDone() || System.currentTimeMillis() - start > timeout) {
                            break;
                        }
                    }
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                } finally {
                    lock.unlock();
                }
                
                // 如果调用结果仍未返回,则抛出超时异常
                if (!isDone()) {
                    throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
                }
            }
            
            // 返回调用结果
            return returnFromResponse();
        }
        
        @Override
        public boolean isDone() {
            // 通过检测 response 字段为空与否,判断是否收到了调用结果
            return response != null;
        }
        
        private Object returnFromResponse() throws RemotingException {
            Response res = response;
            if (res == null) {
                throw new IllegalStateException("response cannot be null");
            }
            
            // 如果调用结果的状态为 Response.OK,则表示调用过程正常,服务提供方成功返回了调用结果
            if (res.getStatus() == Response.OK) {
                return res.getResult();
            }
            
            // 抛出异常
            if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
                throw new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage());
            }
            throw new RemotingException(channel, res.getErrorMessage());
        }
        
        // 省略其他方法
    }
    

    1.2 服务消费方发送请求

    1.2.1 发送请求

    发送请求.png
    public void send(Object message, boolean sent) throws RemotingException {
        super.send(message, sent);
    
        boolean success = true;
        int timeout = 0;
        try {
            // 发送消息(包含请求和响应消息)
            ChannelFuture future = channel.write(message);
            
            // sent 的值源于 <dubbo:method sent="true/false" /> 中 sent 的配置值,有两种配置值:
            //   1. true: 等待消息发出,消息发送失败将抛出异常
            //   2. false: 不等待消息发出,将消息放入 IO 队列,即刻返回
            // 默认情况下 sent = false;
            if (sent) {
                timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
                // 等待消息发出,若在规定时间没能发出,success 会被置为 false
                success = future.await(timeout);
            }
            Throwable cause = future.getCause();
            if (cause != null) {
                throw cause;
            }
        } catch (Throwable e) {
            throw new RemotingException(this, "Failed to send message ...");
        }
    
        // 若 success 为 false,这里抛出异常
        if (!success) {
            throw new RemotingException(this, "Failed to send message ...");
        }
    }
    

    1.2.2 请求编码

      Dubbo 数据包分为消息头和消息体,消息头用于存储一些元信息,比如魔数(Magic),数据包类型(Request/Response),消息体长度(Data Length)等。消息体中用于存储具体的调用消息,比如方法名称,参数列表等。

    image.png image.png

      ExchangeCodec#encode

    1.3 服务提供方接收请求

    1.3.1 请求解码

      ExchangeCodec#decode

      检测消息头中的魔数是否与规定的魔数相等,提前拦截掉非常规数据包,比如通过 telnet 命令行发出的数据包。接着再对消息体长度,以及可读字节数进行检测,最后调用 decodeBody 方法进行后续的解码工作。

      DecodeableRpcInvocation#decode

      通过反序列化将诸如 path、version、调用方法名、参数列表等信息依次解析出来,并设置到相应的字段中,最终得到一个具有完整调用信息的 DecodeableRpcInvocation 对象。

    1.3.2 调用服务

      线程派发模型:Dubbo 将底层通信框架中接收请求的线程称为 IO 线程。如果一些事件处理逻辑可以很快执行完,比如只在内存打一个标记,此时直接在 IO 线程上执行该段逻辑即可。但如果事件的处理逻辑比较耗时,比如该段逻辑会发起数据库查询或者 HTTP 请求。此时我们就不应该让事件处理逻辑在 IO 线程上执行,而是应该派发到线程池中去执行。

      Dispatcher 真实的职责创建具有线程派发能力的 ChannelHandler,其本身并不具备线程派发能力。Dubbo 支持 5 种不同的线程派发策略,默认为all。

    image.png

    AllChannelHandler#connected/disconnected/received/caught

      请求对象会被封装 ChannelEventRunnable 中,ChannelEventRunnable 仅是一个中转站,它的 run 方法中并不包含具体的调用逻辑,仅用于将参数传给其他 ChannelHandler 对象进行处理,该对象类型为 DecodeHandler。DecodeHandler 存在的意义就是保证请求或响应对象可在线程池中被解码。解码完毕后,完全解码后的 Request 对象会继续向后传递,下一站是HeaderExchangeHandler。

      DubboProtocol 类中的匿名类对象逻辑:获取与指定服务对应的 Invoker 实例,并通过 Invoker 的 invoke 方法调用服务逻辑。

    ChannelEventRunnable#run()
      —> DecodeHandler#received(Channel, Object)
        —> HeaderExchangeHandler#received(Channel, Object)
          —> HeaderExchangeHandler#handleRequest(ExchangeChannel, Request)
            —> DubboProtocol.requestHandler#reply(ExchangeChannel, Object)
              —> Filter#invoke(Invoker, Invocation)
                —> AbstractProxyInvoker#invoke(Invocation)
                  —> Wrapper0#invokeMethod(Object, String, Class[], Object[])
                    —> DemoServiceImpl#sayHello(String)
    

    1.4 服务提供方返回调用结果

      服务提供方调用指定服务后,会将调用结果封装到 Response 对象中,并将该对象返回给服务消费方。服务提供方也是通过 NettyChannel 的 send 方法将 Response 对象返回。

    1.5 服务消费方接收调用结果

      服务消费方在收到响应数据后,首先要做的事情是对响应数据进行解码,得到 Response 对象。然后再将该对象传递给下一个入站处理器,这个入站处理器就是 NettyHandler。接下来 NettyHandler 会将这个对象继续向下传递,最后 AllChannelHandler 的 received 方法会收到这个对象,并将这个对象派发到线程池中。

      线程池中的线程并非用户的调用线程,所以要想办法将响应对象从线程池线程传递到用户线程上。用户线程在发送完请求后会调用 DefaultFuture 的 get 方法等待响应对象的到来。当响应对象到来后,用户线程会被唤醒,并通过调用编号获取属于自己的响应对象。

      DefaultFuture 被创建时,会要求传入一个 Request 对象。此时 DefaultFuture 可从 Request 对象中获取调用编号,并将 <调用编号, DefaultFuture 对象> 映射关系存入到静态 Map 中,即 FUTURES。线程池中的线程在收到 Response 对象后,会根据 Response 对象中的调用编号到 FUTURES 集合中取出相应的 DefaultFuture 对象,然后再将 Response 对象设置到 DefaultFuture 对象中。最后再唤醒用户线程,这样用户线程即可从 DefaultFuture 对象中获取调用结果了。

    image.png
    public class HeaderExchangeHandler implements ChannelHandlerDelegate {
        
        @Override
        public void received(Channel channel, Object message) throws RemotingException {
            channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
            ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
            try {
                if (message instanceof Request) {
                    // 处理请求,前面已分析过,省略
                } else if (message instanceof Response) {
                    // 处理响应
                    handleResponse(channel, (Response) message);
                } else if (message instanceof String) {
                    // telnet 相关,忽略
                } else {
                    handler.received(exchangeChannel, message);
                }
            } finally {
                HeaderExchangeChannel.removeChannelIfDisconnected(channel);
            }
        }
    
        static void handleResponse(Channel channel, Response response) throws RemotingException {
            if (response != null && !response.isHeartbeat()) {
                // 继续向下调用
                DefaultFuture.received(channel, response);
            }
        }
    }
    
    public class DefaultFuture implements ResponseFuture {  
        
        private final Lock lock = new ReentrantLock();
        private final Condition done = lock.newCondition();
        private volatile Response response;
        
        public static void received(Channel channel, Response response) {
            try {
                // 根据调用编号从 FUTURES 集合中查找指定的 DefaultFuture 对象
                DefaultFuture future = FUTURES.remove(response.getId());
                if (future != null) {
                    // 继续向下调用
                    future.doReceived(response);
                } else {
                    logger.warn("The timeout response finally returned at ...");
                }
            } finally {
                CHANNELS.remove(response.getId());
            }
        }
    
        private void doReceived(Response res) {
            lock.lock();
            try {
                // 保存响应对象
                response = res;
                if (done != null) {
                    // 唤醒用户线程
                    done.signal();
                }
            } finally {
                lock.unlock();
            }
            if (callback != null) {
                invokeCallback(callback);
            }
        }
    }
    

    二、服务目录

      集群容错源码包含四个部分:

    • 服务目录 Directory
    • 服务路由 Router
    • 集群 Cluster
    • 负载均衡 LoadBalance

      服务目录在获取注册中心的服务配置信息后,会为每条配置信息生成一个 Invoker 对象,并把这个 Invoker 对象存储起来,这个 Invoker 才是服务目录最终持有的对象。

      服务目录:可以看做是Invoker 集合,且这个集合中的元素会随注册中心的变化而进行动态调整。

      继承体系:StaticDirectory & RegistryDirectory

    image.png 服务目录.png

    三、服务路由

      服务目录在刷新 Invoker 列表的过程中,会通过 Router 进行服务路由,筛选出符合路由规则的服务提供者。服务路由包含一条路由规则,路由规则决定了服务消费者的调用目标,即规定了服务消费者可调用哪些服务提供者。

    • 条件路由 ConditionRouter
    • 脚本路由 ScriptRouter
    • 标签路由 TagRouter

      条件路由规则的格式如下:

    [服务消费者匹配条件] => [服务提供者匹配条件]

      如果服务消费者匹配条件为空,表示不对服务消费者进行限制。如果服务提供者匹配条件为空,表示对某些服务消费者禁用服务。

    3.1 表达式解析

      条件表达式的解析过程始于 ConditionRouter 的构造方法。ConditionRouter 构造方法先是对路由规则做预处理,然后调用 parseRule 方法分别对服务提供者和消费者规则进行解析,最后将解析结果赋值给 whenCondition 和 thenCondition 成员变量。

    public ConditionRouter(URL url) {
        this.url = url;
        // 获取 priority 和 force 配置
        this.priority = url.getParameter(Constants.PRIORITY_KEY, 0);
        this.force = url.getParameter(Constants.FORCE_KEY, false);
        try {
            // 获取路由规则
            String rule = url.getParameterAndDecoded(Constants.RULE_KEY);
            if (rule == null || rule.trim().length() == 0) {
                throw new IllegalArgumentException("Illegal route rule!");
            }
            rule = rule.replace("consumer.", "").replace("provider.", "");
            // 定位 => 分隔符
            int i = rule.indexOf("=>");
            // 分别获取服务消费者和提供者匹配规则
            String whenRule = i < 0 ? null : rule.substring(0, i).trim();
            String thenRule = i < 0 ? rule.trim() : rule.substring(i + 2).trim();
            // 解析服务消费者匹配规则
            Map<String, MatchPair> when = 
                StringUtils.isBlank(whenRule) || "true".equals(whenRule) 
                    ? new HashMap<String, MatchPair>() : parseRule(whenRule);
            // 解析服务提供者匹配规则
            Map<String, MatchPair> then = 
                StringUtils.isBlank(thenRule) || "false".equals(thenRule) 
                    ? null : parseRule(thenRule);
            // 将解析出的匹配规则分别赋值给 whenCondition 和 thenCondition 成员变量
            this.whenCondition = when;
            this.thenCondition = then;
        } catch (ParseException e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
    }
    

    3.2 服务路由实现

      服务路由的入口方法是 ConditionRouter 的 route 方法。route 方法先是调用 matchWhen 对服务消费者进行匹配,如果匹配失败,直接返回 Invoker 列表。如果匹配成功,再对服务提供者进行匹配,匹配逻辑封装在了 matchThen 方法中。

    public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
        if (invokers == null || invokers.isEmpty()) {
            return invokers;
        }
        try {
            // 先对服务消费者条件进行匹配,如果匹配失败,表明服务消费者 url 不符合匹配规则,
            // 无需进行后续匹配,直接返回 Invoker 列表即可。比如下面的规则:
            //     host = 10.20.153.10 => host = 10.0.0.10
            // 这条路由规则希望 IP 为 10.20.153.10 的服务消费者调用 IP 为 10.0.0.10 机器上的服务。
            // 当消费者 ip 为 10.20.153.11 时,matchWhen 返回 false,表明当前这条路由规则不适用于
            // 当前的服务消费者,此时无需再进行后续匹配,直接返回即可。
            if (!matchWhen(url, invocation)) {
                return invokers;
            }
            List<Invoker<T>> result = new ArrayList<Invoker<T>>();
            // 服务提供者匹配条件未配置,表明对指定的服务消费者禁用服务,也就是服务消费者在黑名单中
            if (thenCondition == null) {
                logger.warn("The current consumer in the service blacklist...");
                return result;
            }
            // 这里可以简单的把 Invoker 理解为服务提供者,现在使用服务提供者匹配规则对 
            // Invoker 列表进行匹配
            for (Invoker<T> invoker : invokers) {
                // 若匹配成功,表明当前 Invoker 符合服务提供者匹配规则。
                // 此时将 Invoker 添加到 result 列表中
                if (matchThen(invoker.getUrl(), url)) {
                    result.add(invoker);
                }
            }
            
            // 返回匹配结果,如果 result 为空列表,且 force = true,表示强制返回空列表,
            // 否则路由结果为空的路由规则将自动失效
            if (!result.isEmpty()) {
                return result;
            } else if (force) {
                logger.warn("The route result is empty and force execute ...");
                return result;
            }
        } catch (Throwable t) {
            logger.error("Failed to execute condition router rule: ...");
        }
        
        // 原样返回,此时 force = false,表示该条路由规则失效
        return invokers;
    }
    

    四、集群

      集群 Cluster 用途是将多个服务提供者合并为一个 Cluster Invoker,并将这个 Invoker 暴露给服务消费者。这样一来,服务消费者只需通过这个 Invoker 进行远程调用即可,至于具体调用哪个服务提供者,以及调用失败后如何处理等问题,现在都交给集群模块去处理。集群模块是服务提供者和服务消费者的中间层,为服务消费者屏蔽了服务提供者的情况,这样服务消费者就可以专心处理远程调用相关事宜。

      集群容错组件:

    image.png

      集群工作流程:集群工作过程可分为两个阶段,第一个阶段是在服务消费者初始化期间,集群 Cluster 实现类为服务消费者创建 Cluster Invoker 实例,即上图中的 merge 操作。第二个阶段是在服务消费者进行远程调用时。以 FailoverClusterInvoker 为例,该类型 Cluster Invoker 首先会调用 Directory 的 list 方法列举 Invoker 列表(可将 Invoker 简单理解为服务提供者)。Directory 的用途是保存 Invoker,可简单类比为 List<Invoker>。其实现类 RegistryDirectory 是一个动态服务目录,可感知注册中心配置的变化,它所持有的 Invoker 列表会随着注册中心内容的变化而变化。每次变化后,RegistryDirectory 会动态增删 Invoker,并调用 Router 的 route 方法进行路由,过滤掉不符合路由规则的 Invoker。当 FailoverClusterInvoker 拿到 Directory 返回的 Invoker 列表后,它会通过 LoadBalance 从 Invoker 列表中选择一个 Invoker。最后 FailoverClusterInvoker 会将参数传给 LoadBalance 选择出的 Invoker 实例的 invoke 方法,进行真正的远程调用。

      容错方式:

    • Failover Cluster - 失败自动切换
    • Failfast Cluster - 快速失败
    • Failsafe Cluster - 失败安全
    • Failback Cluster - 失败自动恢复
    • Forking Cluster - 并行调用多个服务提供者

    4.1 源码分析

      Cluster 接口和相关实现类:仅用于生成 Cluster Invoker。

    public class FailoverCluster implements Cluster {
    
        public final static String NAME = "failover";
    
        @Override
        public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
            // 创建并返回 FailoverClusterInvoker 对象
            return new FailoverClusterInvoker<T>(directory);
        }
    }
    
    public class FailbackCluster implements Cluster {
    
        public final static String NAME = "failback";
    
        @Override
        public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
            // 创建并返回 FailbackClusterInvoker 对象
            return new FailbackClusterInvoker<T>(directory);
        }
    
    }
    
    Cluster Invoker.png
    public Result invoke(final Invocation invocation) throws RpcException {
        checkWhetherDestroyed();
        LoadBalance loadbalance = null;
    
        // 绑定 attachments 到 invocation 中.
        Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
        if (contextAttachments != null && contextAttachments.size() != 0) {
            ((RpcInvocation) invocation).addAttachments(contextAttachments);
        }
    
        // 列举 Invoker
        List<Invoker<T>> invokers = list(invocation);
        if (invokers != null && !invokers.isEmpty()) {
            // 加载 LoadBalance
            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                    .getMethodParameter(RpcUtils.getMethodName(invocation), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
        }
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
        
        // 调用 doInvoke 进行后续操作
        return doInvoke(invocation, invokers, loadbalance);
    }
    
    // 抽象方法,由子类实现
    protected abstract Result doInvoke(Invocation invocation, List<Invoker<T>> invokers,
                                           LoadBalance loadbalance) throws RpcException;
    

    五、负载均衡

    @Override
    public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        if (invokers == null || invokers.isEmpty())
            return null;
        // 如果 invokers 列表中仅有一个 Invoker,直接返回即可,无需进行负载均衡
        if (invokers.size() == 1)
            return invokers.get(0);
        
        // 调用 doSelect 方法进行负载均衡,该方法为抽象方法,由子类实现
        return doSelect(invokers, url, invocation);
    }
    
    protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);
    

      服务提供者权重计算,该过程主要用于保证当服务运行时长小于服务预热时间时,对服务进行降权,避免让服务在启动之初就处于高负载状态。服务预热是一个优化手段,与此类似的还有 JVM 预热。主要目的是让服务启动后“低功率”运行一段时间,使其效率慢慢提升至最佳状态。

    protected int getWeight(Invoker<?> invoker, Invocation invocation) {
        // 从 url 中获取权重 weight 配置值
        int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);
        if (weight > 0) {
            // 获取服务提供者启动时间戳
            long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L);
            if (timestamp > 0L) {
                // 计算服务提供者运行时长
                int uptime = (int) (System.currentTimeMillis() - timestamp);
                // 获取服务预热时间,默认为10分钟
                int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP);
                // 如果服务运行时间小于预热时间,则重新计算服务权重,即降权
                if (uptime > 0 && uptime < warmup) {
                    // 重新计算服务权重
                    weight = calculateWarmupWeight(uptime, warmup, weight);
                }
            }
        }
        return weight;
    }
    
    static int calculateWarmupWeight(int uptime, int warmup, int weight) {
        // 计算权重,下面代码逻辑上形似于 (uptime / warmup) * weight。
        // 随着服务运行时间 uptime 增大,权重计算值 ww 会慢慢接近配置值 weight
        int ww = (int) ((float) uptime / ((float) warmup / (float) weight));
        return ww < 1 ? 1 : (ww > weight ? weight : ww);
    }
    

      LeastActiveLoadBalance:最小活跃数负载均衡。活跃调用数越小,表明该服务提供者效率越高,单位时间内可处理更多的请求。此时应优先将请求分配给该服务提供者。在具体实现中,每个服务提供者对应一个活跃数 active。初始情况下,所有服务提供者活跃数均为0。每收到一个请求,活跃数加1,完成请求后则将活跃数减1。在服务运行一段时间后,性能好的服务提供者处理请求的速度更快,因此活跃数下降的也越快,此时这样的服务提供者能够优先获取到新的服务请求、这就是最小活跃数负载均衡算法的基本思想。除了最小活跃数,LeastActiveLoadBalance 在实现上还引入了权重值。

    负载均衡.png

       ConsistentHashLoadBalance:一致性 hash 算法由麻省理工学院的 Karger 及其合作者于1997年提出的,算法提出之初是用于大规模缓存系统的负载均衡。它的工作过程是这样的,首先根据 ip 或者其他的信息为缓存节点生成一个 hash,并将这个 hash 投射到 [0, 232 - 1] 的圆环上。当有查询或写入请求时,则为缓存项的 key 生成一个 hash 值。然后查找第一个大于或等于该 hash 值的缓存节点,并到这个节点中查询或写入缓存项。如果当前节点挂了,则在下一次查询或写入缓存时,为缓存项查找另一个大于其 hash 值的缓存节点即可。大致效果如下图所示,每个缓存节点在圆环上占据一个位置。如果缓存项的 key 的 hash 值小于缓存节点 hash 值,则到该缓存节点中存储或读取缓存项。

    image.png

       相同颜色的节点均属于同一个服务提供者,比如 Invoker1-1,Invoker1-2,……, Invoker1-160。这样做的目的是通过引入虚拟节点,让 Invoker 在圆环上分散开来,避免数据倾斜问题。

    image.png

    六、其他

    深入理解Dubbo核心模型Invoke
    Dubbo Adaptive机制详解

    相关文章

      网友评论

          本文标题:Java进阶-Dubbo-进阶

          本文链接:https://www.haomeiwen.com/subject/aknbkrtx.html