美文网首页Dubbo
3-dubbo源码分析之服务引用

3-dubbo源码分析之服务引用

作者: 致虑 | 来源:发表于2018-08-30 18:55 被阅读0次
    • 先看官网两张图【引用来自官网】:


      image.png
    image.png image.png

    官网说明:

    • 1.首先 ReferenceConfig 类的 init 方法调用 Protocol 的 refer 方法生成 Invoker 实例(如上图中的红色部分),这是服务消费的关键。接下来把 Invoker 转换为客户端需要的接口(如:HelloWorld)。
    • 2.关于每种协议如 RMI/Dubbo/Web service 等它们在调用 refer 方法生成 Invoker 实例的细节和上一章节所描述的类似。

    一.概览
    • 先看几个入口配置

      <dubbo:reference id="demoService" init="true" check="false" sticky="true" mock="fail-mock" interface="com.alibaba.dubbo.demo.DemoService"/>
      
      ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-consumer.xml"});
      context.start();
      DemoService demoService = (DemoService) context.getBean("demoService"); // get remote service proxy
      
      while (true) {
          try {
              Thread.sleep(1000);
              String hello = demoService.sayHello("world"); // call remote method
              System.out.println(hello); // get result
          } catch (Throwable throwable) {
          }
      }
      

      第一个是consumer的配置,第二个是代码入口;当然至于registry等其他配置跟provider相似,这里不再贴出。reference的相关配置细节后面一一道出,先笼统概叙下引用初始化大致做的几件事

      • 1.检查初始化所有的配置信息
      • 2.监听注册中心
      • 3.连接服务提供者端进行服务引用
      • 4.创建服务代理并返回(消费者最终得到的是服务的代理。初始化主要做的事情就是引用对应的远程服务)

      看上面的图应该就知道其中过程了。再借用官网一张就截图:


      image.png
    二.容器初始化
    • 既然是容器,肯定离不开spring了,看看ReferenceBean继承的体系:


      image.png

      容器触发:

    • 1.ApplicationContextAware --> setApplicationContext

    • 2.InitializingBean --> afterPropertySet

    • 3.FactoryBean --> xxx

    • 4.DisposableBean --> destory

    • 5.ReferenceConfig 引用的核心类了

      既然继承了这些,那么就应当发挥各自的作用,这也就是在 reference配置中的init相关了,init配置决定是否延迟加载,默认为延迟初始化consumer的,也就是获取bean的时候才会初始化。比如HelloService helloService = (HelloService) applicationContext.getBean("helloService");这个触发执行时getBean的时候,才会初始化。
      当init为true时,就会立即初始化了,也就是FactoryBean接口的意义所在

    • ReferenceBean初始化,看他的父类ReferenceConfig中有几个核心的变量


      image.png

      这里一看就是SPI扩展出来的,直接看一下cluster的字节码:

     import com.alibaba.dubbo.common.extension.ExtensionLoader;
    public class Cluster$Adpative implements com.alibaba.dubbo.rpc.cluster.Cluster {
         public com.alibaba.dubbo.rpc.Invoker join(com.alibaba.dubbo.rpc.cluster.Directory arg0) throws com.alibaba.dubbo.rpc.cluster.Directory {
           if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument == null");
           if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl();
           String extName = url.getParameter("cluster", "failover");
           if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.cluster.Cluster) name from url(" + url.toString() + ") use keys([cluster])");
           com.alibaba.dubbo.rpc.cluster.Cluster extension = (com.alibaba.dubbo.rpc.cluster.Cluster)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.cluster.Cluster.class).getExtension(extName);
           return extension.join(arg0);
         }
    }
    

    看到“failover”就知道后面使用的是FailoverCluster就好,具体的分析后面再讲。

    • 加载1:ReferenceBean实现FactoryBean的getObject()方法:

      @Override
      public Object getObject() throws Exception {
          logger.info(Thread.currentThread().getId() + "%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%");
          return get();
      }
      
    • 加载2:ReferenceBean实现InitializingBean的afterPropertiesSet()方法:

      /** 实现了InitializingBean接口,接着会回调afterPropertySet方法 */
      @Override
      @SuppressWarnings({"unchecked"})
      public void afterPropertiesSet() throws Exception {
      
          ...
      
          Boolean b = isInit();
          if (b == null && getConsumer() != null) {
              b = getConsumer().isInit();
          }
          if (b != null && b.booleanValue()) {
              getObject();
          }
      }
      

      看到这里也回调到第一个加载点:getObject()

    • 上面两处初始化的点就是依赖你的init配置选择了,那么核心的方法已经出现,跟着这里往里走就是具体分析了。此处整个容器的加载初始化就在这里进入。


    二.服务引用
    • 直接跟着容器的入口进来后会发现到了ReferenceBean的父类ReferenceConfig,此处看核心调用:

      /**
       * consumer初始化入口
       *
       * 初始化的过程:主要做了注册到注册中心,监听注册中心,连接到服务提供者端,创建代理。这些都是为了下面消费者和提供者之间的通信做准备。
       *
       * 关于Zookeeper作为注册中心时,服务消费者启动时要做的事情有:
       * 1.订阅/dubbo/.../providers目录下的提供者URL地址。
       * 2.向/dubbo/.../consumers目录下写入自己的URL地址。
       *
       * */
      public synchronized T get() {
          if (destroyed) {
              throw new IllegalStateException("Already destroyed!");
          }
          if (ref == null) {
              init();
          }
          return ref;
      }
      

      划重点init()

      /**
       * init()方法会先检查初始化所有的配置信息,然后调用ref = createProxy(map);创建代理,消费者最终得到的是服务的代理。初始化主要做的事情就是引用对应的远程服务,大概的步骤:
       * 1.监听注册中心
       * 2.连接服务提供者端进行服务引用
       * 3.创建服务代理并返回
       */
      private void init() {
          if (initialized) {
              return;
          }
          initialized = true;
          if (interfaceName == null || interfaceName.length() == 0) {
              throw new IllegalStateException("<dubbo:reference interface=\"\" /> interface not allow null!");
          }
                  
          ... ...
          // get consumer's global configuration
      
          /** 创建代理 */
          ref = createProxy(map);
          ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref, interfaceClass.getMethods());
          ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel);
      }
      
      private transient volatile boolean initialized;
      

      前面省略的一大坨跟provider初始化类似,都是相关检查配置及初始化配置,重点在创建代理这里createProxy(map),消费者最终得到的是服务的代理。引用到远程服务就要做得肯定是如下几件事:

      • 1.对注册中心进行监听
      • 2.连接provider进行服务引用
      • 3.创建服务代理,并返回

      consumer关于注册中心做了主要是两件事:

      • 1.订阅provider节点下的地址
      • 2.将自己的信息以URl形式写入注册中心

    • 1.创建代理
      createProxy(Map map)

      /**
       * 完成了schemas标签信息到invoker的转换,那么下面就是创建代理对象了
       *
       * 1.先判断是否是本地服务引用injvm
       * 2.判断是否是点对点直连
       * 3.判断是否是通过注册中心连接
       * 4.然后是服务的引用
       *
       * 这里url为: registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=dubbo-consumer&dubbo=2.5.3&pid=12272&refer=application%3Ddubbo-consumer%26dubbo%3D2.5.3%26interface%3Ddubbo.common.hello.service.HelloService%26methods%3DsayHello%26pid%3D12272%26sid=consumer%26timeout%3D100000%26timestamp%3D1489318676447registry=zookeeper&timestamp=1489318676641
       */
      @SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
      private T createProxy(Map<String, String> map) {
          URL tmpUrl = new URL("temp", "localhost", 0, map);
          final boolean isJvmRefer;
          if (isInjvm() == null) {
              // 指定URL的情况下,不做本地引用
              if (url != null && url.length() > 0) { // if a url is specified, don't do local reference
                  isJvmRefer = false;
              }
      
              // 默认情况下如果本地有服务暴露,则引用本地服务.
              else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
                  // by default, reference local service if there is
                  isJvmRefer = true;
              } else {
                  isJvmRefer = false;
              }
          } else {
              isJvmRefer = isInjvm().booleanValue();
          }
      
          if (isJvmRefer) {
              URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
              invoker = refprotocol.refer(interfaceClass, url);
              if (logger.isInfoEnabled()) {
                  logger.info("Using injvm service " + interfaceClass.getName());
              }
          } else {
              // 用户指定URL,指定的URL可能是对点对直连地址,也可能是注册中心URL
              if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
                  String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
                  if (us != null && us.length > 0) {
                      for (String u : us) {
                          URL url = URL.valueOf(u);
                          if (url.getPath() == null || url.getPath().length() == 0) {
                              url = url.setPath(interfaceName);
                          }
                          if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                              urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                          } else {
                              urls.add(ClusterUtils.mergeUrl(url, map));
                          }
                      }
                  }
              } else { // assemble URL from register center's configuration
      
                  // 通过注册中心配置拼装URL
                  List<URL> us = loadRegistries(false);
                  if (us != null && !us.isEmpty()) {
                      for (URL u : us) {
                          URL monitorUrl = loadMonitor(u);
                          if (monitorUrl != null) {
                              map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                          }
                          urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                      }
                  }
                  if (urls.isEmpty()) {
                      throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
                  }
              }
      
              if (urls.size() == 1) {
                  //引用远程服务由Protocol的实现来处理
                  invoker = refprotocol.refer(interfaceClass, urls.get(0));
              } else {
                  List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                  URL registryURL = null;
                  for (URL url : urls) {
                      invokers.add(refprotocol.refer(interfaceClass, url));
                      if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                          registryURL = url; // use last registry url
                      }
                  }
                  if (registryURL != null) { // registry url is available
                      // use AvailableCluster only when register's cluster is available
                      // 对有注册中心的Cluster 只用 AvailableCluster
                      URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
                      invoker = cluster.join(new StaticDirectory(u, invokers));
                  } else { // not a registry url
                      invoker = cluster.join(new StaticDirectory(invokers));
                  }
              }
          }
      
          Boolean c = check;
          if (c == null && consumer != null) {
              c = consumer.isCheck();
          }
          if (c == null) {
              c = true; // default true
          }
          if (c && !invoker.isAvailable()) {
              throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
          }
          if (logger.isInfoEnabled()) {
              logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
          }
          // create service proxy
          // 返回服务代理
          /** 首先经过AbstractProxyFactory的处理,然后经过StubProxyFactoryWrapper的处理 */
          return (T) proxyFactory.getProxy(invoker);
      }
      

      注释很清楚,记得有两个核心的方法调用 -> 引用服务 + 创建代理:

      • 1.invoker = refprotocol.refer(interfaceClass, urls.get(0));
      • 2.return (T) proxyFactory.getProxy(invoker);

      debug看下url:


      image.png

      同时这里的refprotocol是扩展点,顺便看下字节码

        import com.alibaba.dubbo.common.extension.ExtensionLoader;
    public class Protocol$Adpative implements com.alibaba.dubbo.rpc.Protocol {
      public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws java.lang.Class {
            if (arg1 == null) throw new IllegalArgumentException("url == null");
            com.alibaba.dubbo.common.URL url = arg1;
            String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
            if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])"); 
            com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
            return extension.refer(arg0, arg1);
          }
          
          public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.Invoker {
            if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
            if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl();  
            String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
            if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
            com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
            return extension.export(arg0);
          }
          
          public void destroy() {
            throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
          }
          
          public int getDefaultPort() {
            throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
          }
    } 
    

    这里如果url里面没有指定protocol,便取dubbo,那么看看url中的protocol此时是啥:


    image.png

    很明显是Registry,那么此时跟踪到RegistryProtocol中的refer,那么继续跟踪会发现:


    image.png
    image.png
    这两个wrapper在provider中就遇见了,此处的协议是register,所以直接绕过,进入到RegisterProtocol:
    
    • 2.注册与引用RegisterProtocol.referrefer(Class<T> type, URL url)

      /**
       * 注册消费者到注册中心
       * 完成schemas标签信息到invoker的转换,那么下面就是创建代理对象了(序号5)
       * */
      @Override
      @SuppressWarnings("unchecked")
      public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
          url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
      
          /**
           * 根据url获取Registry对象
           * 先连接注册中心,把消费者注册到注册中心
           **/
          Registry registry = registryFactory.getRegistry(url);
      
          /**
           * 判断引用是否是注册中心RegistryService,如果是直接返回刚得到的注册中心服务
           */
          if (RegistryService.class.equals(type)) {
              return proxyFactory.getInvoker((T) registry, type, url);
          }
      
          /** 以下是普通服务,需要进入注册中心和集群下面的逻辑 */
          // group="a,b" or group="*"
          /** 获取ref的各种属性 */
          Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
      
          /** 获取分组属性 */
          String group = qs.get(Constants.GROUP_KEY);
      
          /** 先判断引用服务是否需要合并不同实现的返回结果 */
          if (group != null && group.length() > 0) {
              if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
                  return doRefer(getMergeableCluster(), registry, type, url);
              }
          }
      
          /**
           * 选择配置的集群策略(cluster="failback")或者默认策略
           * ref方法中最后一步,服务的引用,返回的是一个Invoker
           */
          return doRefer(cluster, registry, type, url);
      }
      

      url情况已具体注册中心为准:


      image.png

      看上面的核心逻辑就是:

      • 1.获取注册中心 -> registryFactory.getRegistry(url);
      • 2.引用服务并且注册到注册中心 -> doRefer(cluster, registry, type, url);

      继续跟踪


      image.png

      找到ZookeeperRegisterFactory

      /**
       * ZookeeperRegistryFactory.
       *
       */
      public class ZookeeperRegistryFactory extends AbstractRegistryFactory {
      
          private ZookeeperTransporter zookeeperTransporter;
      
          public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
              this.zookeeperTransporter = zookeeperTransporter;
          }
      
          @Override
          public Registry createRegistry(URL url) {
              /** 这里的zookeeperTransporter代码在下面,动态生成的适配类 */
              return new ZookeeperRegistry(url, zookeeperTransporter);
          }
      }
      

      看看具体的client扩展点取的情况

        package com.alibaba.dubbo.remoting.zookeeper;
    import com.alibaba.dubbo.common.extension.ExtensionLoader;
    public class ZookeeperTransporter$Adpative implements com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter {
        public com.alibaba.dubbo.remoting.zookeeper.ZookeeperClient connect(com.alibaba.dubbo.common.URL arg0) {
            if (arg0 == null) throw new IllegalArgumentException("url == null");
            
            com.alibaba.dubbo.common.URL url = arg0;
            String extName = url.getParameter("client", url.getParameter("transporter", "zkclient"));
            
            if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter) name from url(" + url.toString() + ") use keys([client, transporter])");
            
            com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter extension = (com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter.class).getExtension(extName);
            
            return extension.connect(arg0);
        }
    }
    

    很明显是zkClient,此处还有个点:

     @SPI("curator")
     public interface ZookeeperTransporter {
         @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
         ZookeeperClient connect(URL url);
     }
    

    方法级别的Adaptive,动态寻找自适应扩展点中的一种机制,这里很简单不多说。

    • 继续回到流程

      /**
       * dubbo中zookeeper做注册中心,如果注册中心集群都挂掉,那发布者和订阅者还能通信吗?  在這裡就能体现
       */
      public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
      
          /** 此处父类构造逻辑是关键 -->  AbstractRegistry*/
          super(url);
          if (url.isAnyHost()) {
              throw new IllegalStateException("registry address == null");
          }
      
          /** 获得到注册中心中的分组,默认dubbo */
          String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
          if (!group.startsWith(Constants.PATH_SEPARATOR)) {
              group = Constants.PATH_SEPARATOR + group;
          }
      
          /** 注册到注册中心的节点 */
          this.root = group;
      
          /**
           * zk 客户端
           *
           * ZkclientZookeeperTransporter的connect方法
           * 直接返回一个ZkclientZookeeperClient实例
           * 具体的步骤是,new一个ZkClient实例,然后订阅了一个状态变化的监听器
           */
          zkClient = zookeeperTransporter.connect(url);
      
          //添加一个状态改变的监听器
          zkClient.addStateListener(new StateListener() {
              @Override
              public void stateChanged(int state) {
                  if (state == RECONNECTED) {
                      try {
                          recover();
                      } catch (Exception e) {
                          logger.error(e.getMessage(), e);
                      }
                  }
              }
          });
      }
      

      继续super到super --> FailbackRegistry -> AbstractRegistry
      FailbackRegistry:

      public FailbackRegistry(URL url) {
      
          /** 此处父类构造逻辑是关键 -->  AbstractRegistry*/
          super(url);
      
          /** ,建立线程池,定时的检测并连接注册中心,如果失败了就重连 */
          this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
          this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
              @Override
              public void run() {
                  // 检测并连接注册中心
                  // Check and connect to the registry
                  try {
                      //重试方法由每个具体子类实现
                      //获取到注册失败的,然后尝试注册
                      retry();
                  } catch (Throwable t) { // Defensive fault tolerance
                      logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
                  }
              }
          }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
      }
      

      启动一个线程池,用以失败重试,其他核心逻辑继续在父类中
      AbstractRegistry:

      /**
       * dubbo中zookeeper做注册中心,如果注册中心集群都挂掉,那发布者和订阅者还能通信吗? 当然可以, 答案如下:
       * zookeeper的信息会缓存到本地作为一个缓存文件,并且转换成properties对象方便使用
       */
      public AbstractRegistry(URL url) {
      
          //设置registryUrl
          setUrl(url);
          // Start file save timer
      
          // 启动文件保存定时器
          syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false);
      
          //会先去用户主目录下的.dubbo目录下加载缓存注册中心的缓存文件比如:dubbo-registry-127.0.0.1.cache
          String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter(Constants.APPLICATION_KEY) + "-" + url.getAddress() + ".cache");
          File file = null;
          if (ConfigUtils.isNotEmpty(filename)) {
              file = new File(filename);
              if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) {
                  if (!file.getParentFile().mkdirs()) {
                      throw new IllegalArgumentException("Invalid registry store file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!");
                  }
              }
          }
          this.file = file;
      
          /** 缓存到本地作为一个缓存文件     缓存文件存在的话就把文件读进内存中 */
          loadProperties();
      
          //先获取backup url  然后通知订阅
          notify(url.getBackupUrls());
      }
      

      做了一下两件事:

      • 1.加载属性
      • 2.通知订阅,通知的是什么就看consumeer订阅了什么,然后回调事件做了什么

      继续看notify

      protected void notify(List<URL> urls) {
          if (urls == null || urls.isEmpty()) return;
      
          //getSubscribed()方法获取订阅者列表
          //订阅者Entry里每个URL都对应着n个NotifyListener
          for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
              URL url = entry.getKey();
      
              if (!UrlUtils.isMatch(url, urls.get(0))) {
                  continue;
              }
      
              Set<NotifyListener> listeners = entry.getValue();
              if (listeners != null) {
                  for (NotifyListener listener : listeners) {
                      try {
                          //通知每个监听器
                          notify(url, listener, filterEmpty(url, urls));
                      } catch (Throwable t) {
                          logger.error("Failed to notify registry event, urls: " + urls + ", cause: " + t.getMessage(), t);
                      }
                  }
              }
          }
      }
      

      划重点:

      /** 开始更新本地缓存文件的信息 */
      protected void notify(URL url, NotifyListener listener, List<URL> urls) {
          if (url == null) {
              throw new IllegalArgumentException("notify url == null");
          }
          if (listener == null) {
              throw new IllegalArgumentException("notify listener == null");
          }
          if ((urls == null || urls.isEmpty()) && !Constants.ANY_VALUE.equals(url.getServiceInterface())) {
              logger.warn("Ignore empty notify urls for subscribe url " + url);
              return;
          }
          if (logger.isInfoEnabled()) {
              logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
          }
          Map<String, List<URL>> result = new HashMap<String, List<URL>>();
      
          // 获取catagory列表,providers,routers,configurators
          for (URL u : urls) {
              if (UrlUtils.isMatch(url, u)) {
      
                  // 不同类型的数据分开通知,providers,consumers,routers,overrides
                  // 允许只通知其中一种类型,但该类型的数据必须是全量的,不是增量的。
                  String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
                  List<URL> categoryList = result.get(category);
                  if (categoryList == null) {
                      categoryList = new ArrayList<URL>();
                      result.put(category, categoryList);
                  }
                  categoryList.add(u);
              }
          }
          if (result.size() == 0) {
              return;
          }
      
          // 已经通知过
          Map<String, List<URL>> categoryNotified = notified.get(url);
          if (categoryNotified == null) {
              notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
              categoryNotified = notified.get(url);
          }
      
          //对这里得到的providers,configurators,routers分别进行通知
          for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
              String category = entry.getKey();
              List<URL> categoryList = entry.getValue();
              categoryNotified.put(category, categoryList);
      
              /** 更新本地缓存文件,用以保证procider 与 consumer 的通信 */
              saveProperties(url);
      
              //上面获取到的监听器进行通知 --> 到RegistryDirectory中查看notify方法
              /**
               * 对于消费者来说这里listener是RegistryDirectory
               * 而对于服务提供者来说这里是OverrideListener,是RegistryProtocol的内部类
               */
              listener.notify(categoryList);
          }
      }
      

      上叙核心点:

      • 1.缓存信息到本地,也就是注册中心挂了为啥还能通信
      • 2.注册中心变化通知,至于这里如何通知或者如何触发通知后面详细叙说,先贴出后面的监听器代码逻辑,此处获取注册中心还不会调用此处,真正触发逻辑是在后面有相关变动时;
      /**
       * @see NotifyListener
       * 注册中心的回调,也就是它之所以能根据注册中心动态变化的根源所在.
       *
       * 该方法执行完成 代表着: 有关消费者端注册到注册中心和订阅注册中心就完事儿了
       */
      @Override
      public synchronized void notify(List<URL> urls) {
      
          //三种类型分开
          List<URL> invokerUrls = new ArrayList<URL>();
          List<URL> routerUrls = new ArrayList<URL>();
          List<URL> configuratorUrls = new ArrayList<URL>();
          for (URL url : urls) {
              String protocol = url.getProtocol();
              String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
              if (Constants.ROUTERS_CATEGORY.equals(category) || Constants.ROUTE_PROTOCOL.equals(protocol)) {
                  routerUrls.add(url);
              } else if (Constants.CONFIGURATORS_CATEGORY.equals(category) || Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
                  configuratorUrls.add(url);
              } else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
                  invokerUrls.add(url);
              } else {
                  logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
              }
          }
          // configurators 更新缓存的服务提供方配置
          if (configuratorUrls != null && !configuratorUrls.isEmpty()) {
              this.configurators = toConfigurators(configuratorUrls);
          }
          // routers  更新缓存的路由规则配置
          if (routerUrls != null && !routerUrls.isEmpty()) {
              List<Router> routers = toRouters(routerUrls);
              if (routers != null) { // null - do nothing
                  setRouters(routers);
              }
          }
          List<Configurator> localConfigurators = this.configurators; // local reference
          // merge override parameters
          // 合并override参数
          this.overrideDirectoryUrl = directoryUrl;
          if (localConfigurators != null && !localConfigurators.isEmpty()) {
              for (Configurator configurator : localConfigurators) {
                  this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
              }
          }
          // providers
          // 重建invoker实例,根据传入的url就行创建或者更新invoker
          refreshInvoker(invokerUrls);
      }
      

      看到这里就知道分类型全量去刷新invoker信息了,此处也就是说注册中心被注册的信息一旦变化会通知到这里触发相关监听者动作。

      /**
       * 注册中心有变化,则更新methodInvokerMap和urlInvokerMap的值
       * 这就是官网提到的它的值可能是动态变化的,比如注册中心推送变更的原因所在.
       *
       *
       * 根据invokerURL列表转换为invoker列表。转换规则如下:
       * 1.如果url已经被转换为invoker,则不在重新引用,直接从缓存中获取,注意如果url中任何一个参数变更也会重新引用
       * 2.如果传入的invoker列表不为空,则表示最新的invoker列表
       * 3.如果传入的invokerUrl列表是空,则表示只是下发的override规则或route规则,需要重新交叉对比,决定是否需要重新引用。
       * @param invokerUrls 传入的参数不能为null
       *
       * Convert the invokerURL list to the Invoker Map. The rules of the conversion are as follows:
       * 1.If URL has been converted to invoker, it is no longer re-referenced and obtained directly from the cache, and notice that any parameter changes in the URL will be re-referenced.
       * 2.If the incoming invoker list is not empty, it means that it is the latest invoker list
       * 3.If the list of incoming invokerUrl is empty, It means that the rule is only a override rule or a route rule, which needs to be re-contrasted to decide whether to re-reference.
       *
       * @param invokerUrls this parameter can't be null
       */
      // TODO: 2017/8/31 FIXME The thread pool should be used to refresh the address, otherwise the task may be accumulated.
      private void refreshInvoker(List<URL> invokerUrls) {
          if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
                  && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
              this.forbidden = true; // Forbid to access
              this.methodInvokerMap = null; // Set the method invoker map to null
              destroyAllInvokers(); // Close all invokers
          } else {
              this.forbidden = false; // Allow to access
              Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
              if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
                  invokerUrls.addAll(this.cachedInvokerUrls);
              } else {
                  this.cachedInvokerUrls = new HashSet<URL>();
                  this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
              }
              if (invokerUrls.isEmpty()) {
                  return;
              }
      
              //会重新走一遍服务的引用过程
              //给每个提供者创建一个Invoker,
              // 将传入的invokerUrls转成具体的invoker,也就是providerurl转成invoker的过程。
      
              Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
      
              // 方法名映射Invoker列表
              Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map
      
              // state change
              // If the calculation is wrong, it is not processed.
              //如果计算错误,则不进行处理.
              if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
                  logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
                  return;
              }
      
              //服务提供者Invoker保存在这个map中
              this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
              this.urlInvokerMap = newUrlInvokerMap;
              try {
                  destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
              } catch (Exception e) {
                  logger.warn("destroyUnusedInvokers error. ", e);
              }
          }
      }
      
    image.png
    看到核心逻辑,无非就是触发下相关缓存的更新。也就是获取注册中心过程中所隐藏的动作已经被指明了,就等着后面触发了。
    

    • 3.服务引用
      这里先引出一个问题:

      • 1.provider先初始化OK,再初始化consumer,整个监听触发过程怎样?
      • 2.consumer先初始化OK,再初始化provider,整个监听触发过程又怎样?

      先看一段代码:

      /***
       * ref方法中最后一步,服务的引用,返回的是一个Invoker
       */
      private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
      
          // 初始化Directory
          // 组装Directory,可以看成一个消费端的List,可以随着注册中心的消息推送而动态的变化服务的Invoker
          // 封装了所有服务真正引用逻辑,覆盖配置,路由规则等逻辑
          // 初始化时只需要向注册中心发起订阅请求,其他逻辑均是异步处理,包括服务的引用等
          // 缓存接口所有的提供者端Invoker以及注册中心接口相关的配置等
          RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
          directory.setRegistry(registry);
          directory.setProtocol(protocol);
      
          // all attributes of REFER_KEY
          Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
      
      
          URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
          if (!Constants.ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(Constants.REGISTER_KEY, true)) {
      
              // 到注册中心注册服务
              // 此处regist是上面一步获得的registry,即是ZookeeperRegistry,包含zkClient的实例
              // 会先经过AbstractRegistry的处理,然后经过FailbackRegistry的处理
              registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY, Constants.CHECK_KEY, String.valueOf(false)));
          }
      
          // 订阅服务
          // 有服务提供的时候,注册中心会推送服务消息给消费者,消费者再进行服务的引用。
          directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, Constants.PROVIDERS_CATEGORY + "," + Constants.CONFIGURATORS_CATEGORY + "," + Constants.ROUTERS_CATEGORY));
      
          // Cluster会把多个Invoker伪装成一个Invoker
          // 服务的引用与变更全部由Directory异步完成
          // 合并所有相同的invoker
          // cluster 适配类的代码如下:
          Invoker invoker = cluster.join(directory);
          ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
          return invoker;
      }
      
    image.png
    image.png
    回答上面问题最好的出发点就是这里:
    directory通用认知就是consumer中所维护的invokers的一个List,但这个invokers列表是实时变化的。也就是:
    
    - 1.如果是consumer先初始化,这个directory中的methodInvokerMap和urlInvokerMap为空,待provider初始化OK 之后,借助注册中心进行notify,就会触发consumer linstener 刷新directory中的invokers列表了。
    - 2.如果是provider先初始化,consumer本身初始化是就会订阅注册中心,在订阅的同时就会触发注册中心进行一次notify,此时就会触发consumer linstener 刷新directory中的invokers列表了。
    
    因此consumer不管初始化顺序如何,相关的服务信息更新都是交由cluster中的directory去做的。而过程就是通过监听方式异步去实施。很好理解这里的directory中的invokers缓存
    
    • 详细解析
      进入核心入口:订阅服务


      image.png

      继续


      image.png
    • ZookeeperRegister:
      ZookeeperRegister没有覆盖父类的方法,进入父类FailbackRegister:

      /** 发送订阅请求 */
      @Override
      public void subscribe(URL url, NotifyListener listener) {
          super.subscribe(url, listener);
          removeFailedSubscribed(url, listener);
          try {
      
              /** 订阅 向服务器端发送订阅请求 */
              // Sending a subscription request to the server side
              doSubscribe(url, listener);
          } catch (Exception e) {
              Throwable t = e;
      
              List<URL> urls = getCacheUrls(url);
              if (urls != null && !urls.isEmpty()) {
                  notify(url, listener, urls);
                  logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
              } else {
                  /** 如果开启了启动时检测,则抛出异常 */
                  // If the startup detection is opened, the Exception is thrown directly.
                  boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) && url.getParameter(Constants.CHECK_KEY, true);
                  boolean skipFailback = t instanceof SkipFailbackWrapperException;
                  if (check || skipFailback) {
                      if (skipFailback) {
                          t = t.getCause();
                      }
                      throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
                  } else {
                      logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
                  }
              }
      
              /** 记录失败订阅,并重试 */
              // Record a failed registration request to a failed list, retry regularly
              addFailedSubscribed(url, listener);
          }
      }
      

      进入super AbstractRegistry方法:

       /**
       * 往本地存储映射关系,key是url,value是一个Set<Listener>
       */
      @Override
      public void subscribe(URL url, NotifyListener listener) {
          if (url == null) {
              throw new IllegalArgumentException("subscribe url == null");
          }
          if (listener == null) {
              throw new IllegalArgumentException("subscribe listener == null");
          }
          if (logger.isInfoEnabled()) {
              logger.info("Subscribe: " + url);
          }
      
          //先根据url获取已注册的监听器
          Set<NotifyListener> listeners = subscribed.get(url);
      
          //没有监听器,就创建,并添加进去
          if (listeners == null) {
              subscribed.putIfAbsent(url, new ConcurrentHashSet<NotifyListener>());
              listeners = subscribed.get(url);
          }
      
          //有监听器,直接把当前RegistryDirectory添加进去
          listeners.add(listener);
      }
      

      没啥,缓存下listener而已,继续回到failbackRegister:
      在进入核心:doSubscribe(url, listener);方法前,异常体里也有一个很重要的点:

      • 1.重试
      • 2.check检查,也就是reference中是否配置了check检查。

      doSubscribe又是钩子,具体到ZookeeperRegiste中:

      @Override
      protected void doSubscribe(final URL url, final NotifyListener listener) {
          try {
              if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
                  String root = toRootPath();
                  ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                  if (listeners == null) {
                      zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                      listeners = zkListeners.get(url);
                  }
      
                  //将zkClient的事件IZkChildListener转换到registry事件NotifyListener
                  ChildListener zkListener = listeners.get(listener);
                  if (zkListener == null) {
                      listeners.putIfAbsent(listener, new ChildListener() {
                          @Override
                          public void childChanged(String parentPath, List<String> currentChilds) {
                              for (String child : currentChilds) {
                                  child = URL.decode(child);
                                  if (!anyServices.contains(child)) {
                                      anyServices.add(child);
                                      subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child, Constants.CHECK_KEY, String.valueOf(false)), listener);
                                  }
                              }
                          }
                      });
                      zkListener = listeners.get(listener);
                  }
                  zkClient.create(root, false);
                  List<String> services = zkClient.addChildListener(root, zkListener);
                  if (services != null && !services.isEmpty()) {
                      for (String service : services) {
                          service = URL.decode(service);
                          anyServices.add(service);
                          subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service, Constants.CHECK_KEY, String.valueOf(false)), listener);
                      }
                  }
              } else {
                  List<URL> urls = new ArrayList<URL>();
                  // 这里的path分别为providers,routers,configurators三种
                  /** 分别对providers,routers,configurators三种不同类型的进行订阅,也就是往zookeeper中注册节点,注册之前先给url添加监听器。最后是订阅完之后进行通知 */
                  for (String path : toCategoriesPath(url)) {
                      ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                      if (listeners == null) {
                          zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                          listeners = zkListeners.get(url);
                      }
                      ChildListener zkListener = listeners.get(listener);
                      if (zkListener == null) {
                          listeners.putIfAbsent(listener, new ChildListener() {
      
                              /**
                               * 这里设置了监听回调的地址,即回调给FailbackRegistry中的notify
                               * 当关注的路径的下增减节点,就会触发回调,然后通过notify方法,进行业务数据的变更逻辑
                               */
                              @Override
                              public void childChanged(String parentPath, List<String> currentChilds) {
                                  ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
                              }
                          });
                          zkListener = listeners.get(listener);
                      }
                      /** 创建持久节点 */
                      //创建三个节点
                      // /dubbo/.../providers/
                      // /dubbo/.../configurators/
                      // /dubbo/.../routers/
                      //上面三个路径会被消费者端监听,当提供者,配置,路由发生变化之后,
                      //注册中心会通知消费者刷新本地缓存。
                      zkClient.create(path, false);
                      /** 开始对该节点设置监听 */
                      List<String> children = zkClient.addChildListener(path, zkListener);
                      if (children != null) {
                          urls.addAll(toUrlsWithEmpty(url, path, children));
                      }
                  }
      
                  /** 下面要开始更新新的服务信息,服务启动和节点更新回调(前面设置了回调到这里)都会调用到这里 */
                  notify(url, listener, urls);
              }
          } catch (Throwable e) {
              throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
          }
      }
      

      核心事件: 分别对providers,routers,configurators三种不同类型的进行订阅,也就是往zookeeper中注册节点,注册之前先给url添加监听器。最后是订阅完之后进行通知
      [图片上传失败...(image-386dd1-1535626534271)]
      这里有两个notify,同样的逻辑,但一个是留给未来触发通知的【第一个匿名类里的】(也就是注册中心主动推送),一个是当前要触发通知的【最后一个notify】(就是consumer自己主动去触发注册中心推送)

    • 第一个匿名类中的notify:


      image.png
      image.png
      image.png
    这里很清楚了,zkListener就是对path的一个自定义监听器,将监听器转换成zk自己的监听器对象然后实现具体路径节点的监听,进而触发后续回调到自定义监听器的逻辑,也就是这里的ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
    
    • 第二个notify


      image.png

      又回到了FailbackRegistry中的方法


      image.png
      到了核心的notify了
      image.png
      /** 开始更新本地缓存文件的信息 */
      protected void notify(URL url, NotifyListener listener, List<URL> urls) {
          if (url == null) {
              throw new IllegalArgumentException("notify url == null");
          }
          if (listener == null) {
              throw new IllegalArgumentException("notify listener == null");
          }
          if ((urls == null || urls.isEmpty()) && !Constants.ANY_VALUE.equals(url.getServiceInterface())) {
              logger.warn("Ignore empty notify urls for subscribe url " + url);
              return;
          }
          if (logger.isInfoEnabled()) {
              logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
          }
          Map<String, List<URL>> result = new HashMap<String, List<URL>>();
      
          // 获取catagory列表,providers,routers,configurators
          for (URL u : urls) {
              if (UrlUtils.isMatch(url, u)) {
      
                  // 不同类型的数据分开通知,providers,consumers,routers,overrides
                  // 允许只通知其中一种类型,但该类型的数据必须是全量的,不是增量的。
                  String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
                  List<URL> categoryList = result.get(category);
                  if (categoryList == null) {
                      categoryList = new ArrayList<URL>();
                      result.put(category, categoryList);
                  }
                  categoryList.add(u);
              }
          }
          if (result.size() == 0) {
              return;
          }
      
          // 已经通知过
          Map<String, List<URL>> categoryNotified = notified.get(url);
          if (categoryNotified == null) {
              notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
              categoryNotified = notified.get(url);
          }
      
          //对这里得到的providers,configurators,routers分别进行通知
          for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
              String category = entry.getKey();
              List<URL> categoryList = entry.getValue();
              categoryNotified.put(category, categoryList);
      
              /** 更新本地缓存文件,用以保证procider 与 consumer 的通信 */
              saveProperties(url);
      
              //上面获取到的监听器进行通知 --> 到RegistryDirectory中查看notify方法
              /**
               * 对于消费者来说这里listener是RegistryDirectory
               * 而对于服务提供者来说这里是OverrideListener,是RegistryProtocol的内部类
               */
              listener.notify(categoryList);
          }
      }
      

      划重点:

      • 1.缓存
      • 2.通知本地linstener作出处理
    image.png
    image.png
    image.png

    这里补充一个核心点:


    image.png
    image.png
    整体代码如下:
    ```
    private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
        Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
        if (urls == null || urls.isEmpty()) {
            return newUrlInvokerMap;
        }
        Set<String> keys = new HashSet<String>();
        String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
        for (URL providerUrl : urls) {
    
            // If protocol is configured at the reference side, only the matching protocol is selected
            // 从注册中心获取到的携带提供者信息的url
            // 如果reference端配置了protocol,则只选择匹配的protocol
            if (queryProtocols != null && queryProtocols.length() > 0) {
                boolean accept = false;
                String[] acceptProtocols = queryProtocols.split(",");
                for (String acceptProtocol : acceptProtocols) {
                    if (providerUrl.getProtocol().equals(acceptProtocol)) {
                        accept = true;
                        break;
                    }
                }
                if (!accept) {
                    continue;
                }
            }
            if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
                continue;
            }
            if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
                logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() + " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost()
                        + ", supported protocol: " + ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
                continue;
            }
            URL url = mergeUrl(providerUrl);
    
            String key = url.toFullString(); // The parameter urls are sorted
            if (keys.contains(key)) { // Repeated url
                continue;
            }
            keys.add(key);
            // Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again
            // 缓存key为没有合并消费端参数的URL,不管消费端如何合并参数,如果服务端URL发生变化,则重新refer
            /**
             * urlInvokerMap缓存详解:
             *
             * A: 消费端先启动,服务端后启动:
             * 1.消费端启动时,先从本地或者注册中心拉取 XXX./dubbo.cache文件中取一份provider列表
             * 2.如果本地缓存不存在,直接绕过远端provider的服务信息;
             * 3.若本地缓存存在,生成相关的远端provider的服务信息,置入directory中
             * 4.启动服务端,notify到消费端,从缓存中按服务端url信息取出远端相关provider信息,若相同则不从新置入信息,否则重新置入【多个服务端时,若此时只启动一个服务端,则只需要置入当前服务端信息,其他信息保留】
             *
             * A: 服务端先启动,消费端后启动:
             * 1.消费端启动时,先从本地或者注册中心拉取 XXX./dubbo.cache文件中取一份provider列表
             * 2.将服务端信息从注册中心或者本地拉取生成相关的远端provider的服务信息,置入directory中
             *
             */
            Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
            Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
            if (invoker == null) { // Not in the cache, refer again
                try {
                    boolean enabled = true;
                    if (url.hasParameter(Constants.DISABLED_KEY)) {
                        enabled = !url.getParameter(Constants.DISABLED_KEY, false);
                    } else {
                        enabled = url.getParameter(Constants.ENABLED_KEY, true);
                    }
                    if (enabled) {
    
                        /**
                         * 先使用DubboProtocol的refer方法,这一步会依次调用ProtocolFIlterListenerWrapper,ProtocolFilterWrapper,DubboProtocol中的refer方法。
                         * 经过两个Wrapper中,会添加对应的InvokerListener并构建Invoker Filter链,在DubboProtocol中会创建一个DubboInvoker对象,该Invoker对象持有服务Class,providerUrl,负责和服务提供端通信的ExchangeClient
                         * 接着使用得到的Invoker创建一个InvokerDelegete
                         */
                        invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
                    }
                } catch (Throwable t) {
                    logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
                }
                if (invoker != null) { // Put new invoker in cache
                    newUrlInvokerMap.put(key, invoker);
                }
            } else {
                newUrlInvokerMap.put(key, invoker);
            }
        }
        keys.clear();
        return newUrlInvokerMap;
    }
    ```
    
    最终调用到DubboProtocol中的refer了:
    
    image.png
    image.png
    在进入DubboProtocol的refer方法之前,会依次调用ProtocolFIlterListenerWrapper,ProtocolFilterWrapper。两个wrapper会添加对应的InvokerListener并构建Invoker Filter链,在DubboProtocol中会创建一个DubboInvoker对象,从上面可以看出该Invoker对象持有服务Class,providerUrl,负责和服务端通信的ExchangeClient。然后使用该Invoker创建一个InvokerDelegete并返回
    
    在创建ExchangeClient时有个配置是开头就提到的:延迟加载:
    
    image.png
    至于具体的连接也是交给netty去做的,这里不细说,继续看主流程:
    
    • 这里就开始更新Directory中的invokers列表了
    • 用下面的截图结束服务引用的注册机监听


      image.png

    三.发起远程调用
    • 远程调用就是将拿到的代理发起调用,此处会涉及到远程通信等一系列操作。


      image.png

      走代理进行invoke发起


      image.png

      继续跟进


      image.png
      这里为啥是MockClusterInvoker,因为在注册返回invoker的时候,Invoker invoker = cluster.join(directory);里面返回的就是MockClusterWrapper中的MockClusterInvoker,这里涉及到集群相关知识,下一章集中分析。
      切一下MockClusterInvoker主体代码:
      /**
       * 降级处理方案
       * 原理就是改变注册在zookeeper上的节点信息.从而zookeeper通知重新生成invoker
       */
      @Override
      public Result invoke(Invocation invocation) throws RpcException {
          Result result = null;
      
          String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();
          if (value.length() == 0 || value.equalsIgnoreCase("false")) {
              /**
               * 无降级: no mock
               * 这里的invoker是FailoverClusterInvoker
               */
              result = this.invoker.invoke(invocation);
          } else if (value.startsWith("force")) {
              if (logger.isWarnEnabled()) {
                  logger.info("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl());
              }
              /**
               * 屏蔽: force:direct mock
               * mock=force:return+null
               * 表示消费方对方法的调用都直接返回null,不发起远程调用
               * 可用于屏蔽不重要服务不可用的时候,对调用方的影响
               */
              //
              result = doMockInvoke(invocation, null);
          } else {
              /**
               * 容错: fail-mock
               * mock=fail:return+null
               * 表示消费方对该服务的方法调用失败后,再返回null,不抛异常
               * 可用于对不重要服务不稳定的时候,忽略对调用方的影响 */
              try {
                  result = this.invoker.invoke(invocation);
              } catch (RpcException e) {
                  if (e.isBiz()) {
                      throw e;
                  } else {
                      if (logger.isWarnEnabled()) {
                          logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e);
                      }
                      result = doMockInvoke(invocation, e);
                  }
              }
          }
          return result;
      }
      
    image.png

    继续


    image.png
    image.png

    继续往下看,前面一系列容错及负载均衡后面一章详细细说


    image.png image.png

    恩 这里就是真正发起远程调用了,timeout配置也出现了,但此处还不是真正使用timeout的时候,继续往下:


    image.png
    image.png

    到这里很明显是发送一个request出去,具体发送流程如下:
    AbstractPeer->AbstractClient->AbstractChannel->NettyChannel->AbstractChannel


    image.png

    到这里,请求已经通过netty发出去了,那么怎么做到请求结果返回,并且做到超时检测的呢;
    其实这里dubbo我认为它的思想是异步转同步的一个思想,也就是发出请求后,服务端异步执行,但消费端会用Future去同步轮训服务端返回结果,并且在轮训中做到超时检查的。此处继续看下代码就清楚了。

    • 请求调用异步转同步

      image.png

      看到最后一个get(),划重点:


      image.png

      看到这里去future里面循环取值了,因此也就是上面说的那样这里同步等待的道理。


      image.png
      private volatile Response response;
      

      到这里就会自然猜想,provider执行之后会发送结果到consumer,consumer收到返回后会另启线程填充这个response,这里循环监听就会监听到结果了。
      看看consumer怎么消费返回结果的:


      image.png
      image.png
      image.png
      image.png
      image.png
      image.png

      看到这里,response赋值完毕;远程调用过程结束。
      最后补充一个远程调用filter链:

      InvokerWrapper -> ProtocolFilterWrapper -> ConsumerContextFilter -> FutureFilter -> MonitorFilter -> AbstractInvoker -> DubboInvoker

    相关文章

      网友评论

        本文标题:3-dubbo源码分析之服务引用

        本文链接:https://www.haomeiwen.com/subject/ppvdwftx.html