rpc-dubbo

作者: 一只小星_ | 来源:发表于2019-08-14 16:58 被阅读0次

    如果有一种方式能让我们像调用本地服务一样调用远程服务,而让调用者对网络通信这些细节透明。这种方式其实就是RPC(Remote Procedure Call Protocol),在各大互联网公司中被广泛使用,如阿里巴巴的hsf、dubbo(开源)。

    Dubbo

    Dubbo spi

    SPI 是一种服务发现机制。SPI 的本质是将接口实现类的全限定名配置在文件中,并由服务加载器读取配置文件,加载实现类。这样可以在运行时,动态为接口替换实现类。SPI 机制在第三方框架中也有所应用,比如 Dubbo 就是通过 SPI 机制加载所有的组件。不过,Dubbo 并未使用 Java 原生的 SPI 机制,而是对其进行了增强。
    下面是dubbo spi源码分析

    public class DubboSPITest {
        public static void main(String[] args) {
            先通过ExtensionLoader获取一个与扩展类对应的ExtensionLoader的实例
            extensionLoader竟然是放在ConcurrentHashMap里面的,会用类名去concurrenthashmap里面拿
            如果没拿到,就new一个,然后放到map里。
            ExtensionLoader<Robot> extensionLoader =
                    ExtensionLoader.getExtensionLoader(Robot.class);
    
             * optimusPrime和bumblebee是定义在 META-INF/dubbo 路径下
             * dubbo spi用健值对进行配置,配置文件内容为:
             *
             * optimusPrime = org.apache.spi.OptimusPrime
             * bumblebee = org.apache.spi.Bumblebee
    
            /**
             * 通过extensionLoader获取扩展类对象
             */
            Robot optimusPrime = extensionLoader.getExtension("optimusPrime");
    
            Robot bumblebee = extensionLoader.getExtension("bumblebee");
            /**
             * 可以获取到两个Robot的实现类
             */
        }
    }
    

    下面说

     Robot optimusPrime = extensionLoader.getExtension("optimusPrime");
    

    直接通过getExtension获取扩展类对象,输入配置文件里面的key值。
    源码做的事情:首先从一个ConcurrentHashMap里面去拿holder,从holder里面拿instance,也就是如果能拿到的话就已经有optimusPrime对应的对象了,如果拿不到的话instance = createExtension(name);创建扩展对象。

    private T createExtension(String name){
     Class<?> clazz = getExtensionClasses().get(name);
     try {
            T instance = (T) EXTENSION_INSTANCES.get(clazz);
            if (instance == null) {
                // 通过反射创建实例
                EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
                instance = (T) EXTENSION_INSTANCES.get(clazz);
            }
            // 向实例中注入依赖
            injectExtension(instance);
            Set<Class<?>> wrapperClasses = cachedWrapperClasses;
            if (wrapperClasses != null && !wrapperClasses.isEmpty()) {
                // 循环创建 Wrapper 实例
                for (Class<?> wrapperClass : wrapperClasses) {
                    // 将当前 instance 作为参数传给 Wrapper 的构造方法,并通过反射创建 Wrapper 实例。
                    // 然后向 Wrapper 实例中注入依赖,最后将 Wrapper 实例再次赋值给 instance 变量
                    instance = injectExtension(
                        (T) wrapperClass.getConstructor(type).newInstance(instance));
                }
            }
            return instance;
        } catch (Throwable t) {
            throw new IllegalStateException("...");
        }
    
    }
    

    源码做的事情:
    1.通过 getExtensionClasses 从配置文件获取所有的拓展类
    2.通过反射创建拓展对象
    3.向拓展对象中注入依赖
    4.将拓展对象包裹在相应的 Wrapper 对象中

    injectExtension(instance)做了什么?这个用的Dubbo IOC。
    这个IOC的做法是,通过反射获取到类的所有方法,遍历方法是不是以set开头是不是只有一个参数是不是public,若有,则通过 ObjectFactory 获取依赖对象,最后通过反射调用 setter 方法将依赖设置到目标对象中。

    AdaptiveExtensionFactory类 
    public <T> T getExtension(Class<T> type, String name) {
            for (ExtensionFactory factory : factories) {
                T extension = factory.getExtension(type, name);
                if (extension != null) {
                    return extension;
                }
            }
            return null;
        }
    

    objectFactory 变量的类型为 AdaptiveExtensionFactory,AdaptiveExtensionFactory 内部维护了一个 List<ExtensionFactory> factories 列表,用于存储其他类型的 ExtensionFactory,遍历所有factory,从里面找对应的类。Dubbo 目前提供了两种 ExtensionFactory,分别是 SpiExtensionFactory 和 SpringExtensionFactory。前者用于创建自适应的拓展,后者是用于从 Spring 的 IOC 容器中获取所需的拓展。

    Dubbo 自适应扩展机制

    有时,有些拓展并不想在框架启动阶段被加载,而是希望在拓展方法被调用时,根据运行时参数进行加载。这听起来有些矛盾。拓展未被加载,那么拓展方法就无法被调用(静态方法除外)。拓展方法未被调用,拓展就无法被加载。对于这个矛盾的问题,Dubbo 通过自适应拓展机制很好的解决了。
    我的理解就是,dubbo收到一个调用请求,肯定是带着类明和方法名的,那么可以get到,只有在调用的时候,通过 SPI 加载具体的拓展实现类,并调用拓展对象的同名方法。

    为什么自适应扩展不需要入参???

    获取自适应扩展
    extensionLoader.getAdaptiveExtension();
    
    public T getAdaptiveExtension() {
           /从缓存中获取自适应扩展
            Object instance = cachedAdaptiveInstance.get();
            if (instance == null) {
             /缓存中没有的话,其实就是要createAdaptiveExtension(),这个方法就是获取自适应扩展类,并反射一个实例出来。
                if (createAdaptiveInstanceError == null) {
                    synchronized (cachedAdaptiveInstance) {
                        instance = cachedAdaptiveInstance.get();
                        if (instance == null) {
                            try {
                                instance = createAdaptiveExtension();
                                cachedAdaptiveInstance.set(instance);
                            } catch (Throwable t) {
                                createAdaptiveInstanceError = t;
                                throw new IllegalStateException("fail to create adaptive instance: " + t.toString(), t);
                            }
                        }
                    }
                } else {
                    throw new IllegalStateException("fail to create adaptive instance: " + createAdaptiveInstanceError.toString(), createAdaptiveInstanceError);
                }
            }
    
            return (T) instance;
        }
    

    那么自适应扩展类的获取createAdaptiveExtensionClassCode()有200多行,可以简单说一下。

    Dubbo 负载均衡

    LoadBalance 中文意思为负载均衡,它的职责是将网络请求,或者其他形式的负载“均摊”到不同的机器上。避免集群中部分服务器压力过大,而另一些服务器比较空闲的情况。通过负载均衡,可以让每台服务器获取到适合自己处理能力的负载。在为高负载服务器分流的同时,还可以避免资源浪费,一举两得。负载均衡可分为软件负载均衡和硬件负载均衡。在我们日常开发中,一般很难接触到硬件负载均衡。但软件负载均衡还是可以接触到的,比如 Nginx。在 Dubbo 中,也有负载均衡的概念和相应的实现。Dubbo 需要对服务消费者的调用请求进行分配,避免少数服务提供者负载过大。服务提供者负载过大,会导致部分请求超时。因此将负载均衡到每个服务提供者上,是非常必要的。Dubbo 提供了4种负载均衡实现,分别是基于权重随机算法的 RandomLoadBalance、基于最少活跃调用数算法的 LeastActiveLoadBalance、基于 hash 一致性的 ConsistentHashLoadBalance,以及基于加权轮询算法的 RoundRobinLoadBalance。

    RandomLoadBalance

    RandomLoadBalance 是加权随机算法的具体实现。假设我们有一组服务器 servers = [A, B, C],他们对应的权重为 weights = [5, 3, 2],权重总和为10。现在把这些权重值平铺在一维坐标值上,[0, 5) 区间属于服务器 A,[5, 8) 区间属于服务器 B,[8, 10) 区间属于服务器 C。接下来通过随机数生成器生成一个范围在 [0, 10) 之间的随机数,然后计算这个随机数会落到哪个区间上。比如数字3会落到服务器 A 对应的区间上,此时返回服务器 A 即可。权重越大的机器,在坐标轴上对应的区间范围就越大,因此随机数生成器生成的数字就会有更大的概率落到此区间内。只要随机数生成器产生的随机数分布性很好,在经过多次选择后,每个服务器被选中的次数比例接近其权重比例。
    当调用次数比较少时,Random 产生的随机数可能会比较集中,此时多数请求会落到同一台服务器上。这个缺点并不是很严重,多数情况下可以忽略。RandomLoadBalance 是一个简单,高效的负载均衡实现,因此 Dubbo 选择它作为缺省实现。

    LeastActiveLoadBalance

    LeastActiveLoadBalance 翻译过来是最小活跃数负载均衡。活跃调用数越小,表明该服务提供者效率越高,单位时间内可处理更多的请求。此时应优先将请求分配给该服务提供者。在具体实现中,每个服务提供者对应一个活跃数 active。初始情况下,所有服务提供者活跃数均为0。每收到一个请求,活跃数加1,完成请求后则将活跃数减1。在服务运行一段时间后,性能好的服务提供者处理请求的速度更快,因此活跃数下降的也越快,此时这样的服务提供者能够优先获取到新的服务请求、这就是最小活跃数负载均衡算法的基本思想。除了最小活跃数,LeastActiveLoadBalance 在实现上还引入了权重值。所以准确的来说,LeastActiveLoadBalance 是基于加权最小活跃数算法实现的。举个例子说明一下,在一个服务提供者集群中,有两个性能优异的服务提供者。某一时刻它们的活跃数相同,此时 Dubbo 会根据它们的权重去分配请求,权重越大,获取到新请求的概率就越大。如果两个服务提供者权重相同,此时随机选择一个即可。

    ConsistentHashLoadBalance

    它的工作过程是这样的,首先根据 ip 或者其他的信息为服务提供者节点生成一个 hash,并将这个 hash 投射到 [0, 232 - 1] 的圆环上。当有查询时,则根据请求生成一个 hash 值。然后查找第一个大于或等于该 hash 值的服务提供者,并到这个节点中查询。如果当前节点挂了,则在下一次查询时,查找另一个大于其 hash 值的节点即可。大致效果如下图所示,每个缓存节点在圆环上占据一个位置。如果缓存项的 key 的 hash 值小于缓存节点 hash 值,则到该缓存节点中存储或读取缓存项。比如下面绿色点对应的缓存项将会被存储到 cache-2 节点中。由于 cache-3 挂了,原本应该存到该节点中的缓存项最终会存储到 cache-4 节点中。

    RoundRobinLoadBalance

    加权轮询。这里从最简单的轮询开始讲起,所谓轮询是指将请求轮流分配给每台服务器。举个例子,我们有三台服务器 A、B、C。我们将第一个请求分配给服务器 A,第二个请求分配给服务器 B,第三个请求分配给服务器 C,第四个请求再次分配给服务器 A。这个过程就叫做轮询。轮询是一种无状态负载均衡算法,实现简单,适用于每台服务器性能相近的场景下。但现实情况下,我们并不能保证每台服务器性能均相近。如果我们将等量的请求分配给性能较差的服务器,这显然是不合理的。因此,这个时候我们需要对轮询过程进行加权,以调控每台服务器的负载。经过加权后,每台服务器能够得到的请求数比例,接近或等于他们的权重比。比如服务器 A、B、C 权重比为 5:2:1。那么在8次请求中,服务器 A 将收到其中的5次请求,服务器 B 会收到其中的2次请求,服务器 C 则收到其中的1次请求。

    相关文章

      网友评论

          本文标题:rpc-dubbo

          本文链接:https://www.haomeiwen.com/subject/ppvwjctx.html