美文网首页
08-dubbo服务引入和调用源码分析

08-dubbo服务引入和调用源码分析

作者: Coding626 | 来源:发表于2021-07-06 17:28 被阅读0次

    开头

    上一节讲到了服务的导出,即服务端如何将自己的接口提供成dubbo服务的过程,这一节就是讲服务的调用了,消费端是如何调用服务端的接口的呢?

    主要流程

    1.spring启动时,会给@Reference注解的属性赋值,赋值的时候会调用referenceBean.get方法
    2.准备初始化invoker对象,MockClusterInvoker,生成这个是最终目的
    3.在注册中心初始化服务目录RegistryDirectory
    4.将消费端信息注册到zk
    5.构造路由链、服务订阅
    6.根据服务目录得到最终的invoker对象MockClusterInvoker
    8.最终调用MockClusterInvoker.invoke方法执行请求发送数据,里面调用了netty.send方法
    9.通过netty channel,执行nettyServerHandler方法处理请求和结果返回

    源码流程
    流程图地址:https://www.processon.com/view/link/60e02b8d637689510d6c4184

    服务引入.jpg

    1.程序入口

    在spring启动的时候,会对@Reference注解的属性赋值,生成ReferenceBean,在ReferenceAnnotationBeanPostProcessor.doGetInjectedBean方法中
    可以看到,最终调用了 referenceBean.get()方法,这个方法最后返回了一个ref对象,这个ref对象看到最后就是一个Invoke代理对象,也就是主要流程的第二步,准备初始化invoker对象,MockClusterInvoker,生成这个是最终目的

    @Override
        protected Object doGetInjectedBean(AnnotationAttributes attributes, Object bean, String beanName, Class<?> injectedType,
                                           InjectionMetadata.InjectedElement injectedElement) throws Exception {
    
        
            return getOrCreateProxy(referencedBeanName, referenceBeanName, referenceBean, injectedType);
        }
     private Object getOrCreateProxy(String referencedBeanName, String referenceBeanName, ReferenceBean referenceBean, Class<?> serviceInterfaceType) {
            if (existsServiceBean(referencedBeanName)) { // If the local @Service Bean exists, build a proxy of ReferenceBean
                return newProxyInstance(getClassLoader(), new Class[]{serviceInterfaceType},
                        wrapInvocationHandler(referenceBeanName, referenceBean));
            } else {                                    // ReferenceBean should be initialized and get immediately
                // 这里
                return referenceBean.get();
            }
        }
    
    public synchronized T get() {
            checkAndUpdateSubConfigs();
    
            if (destroyed) {
                throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
            }
            if (ref == null) {
                // 入口
                init();
            }
            return ref;  // Invoke代理
        }
    

    2.准备初始化invoker对象,MockClusterInvoker

    由init()->createProxy(map),这个方法太长了,留了三个主要的方法:
    1.加载注册中心url地址

    1. invoker = REF_PROTOCOL.refer调用registry.refer,这里又是spi机制,最终调用了registryProtocol.refer方法
    private T createProxy(Map<String, String> map) {
         List<URL> us = loadRegistries(false);
        invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
         invoker = CLUSTER.join(new StaticDirectory(u, invokers));
    
        }
    

    3.在注册中心初始化服务目录RegistryDirectory
    留下了主要代码,可以看到这里初始化了一个注册目录,也就是我们最终在zk上看到的consumers节点文件夹。
    registry.register(directory.getRegisteredConsumerUrl());这里最终会调用ZookeeperRegistry.doRegister方法,用zk客户端向zk服务端创建节点,将消费端信息注册到zk,可以看到这里创建的是临时节点

    private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
          
            RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
            directory.setRegistry(registry);
            directory.setProtocol(protocol);
    
             
              registry.register(directory.getRegisteredConsumerUrl());
           
    
        
            directory.buildRouterChain(subscribeUrl);
    
          
            directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
             
        
    
            return invoker;
        }
    
     @Override
        public void doRegister(URL url) {
            try {
                zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
            } catch (Throwable e) {
                throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
            }
        }
    

    4.构造路由链、服务订阅
    directory.buildRouterChain(subscribeUrl);
    directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY

    3.生成最终的invoker对象MockClusterInvoker

        Invoker invoker = cluster.join(directory);
    

    这里又是SPI机制,由于Cluster有一个包装类,所以会先调用MockClusterWrapper.join方法,原理可参照我之前单独写的一节SPI源码分析
    可以看到,这里最终生成MockClusterInvoker

    public class MockClusterWrapper implements Cluster {
    
        private Cluster cluster;
    
        public MockClusterWrapper(Cluster cluster) {
            this.cluster = cluster;
        }
    
        @Override
        public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
            return new MockClusterInvoker<T>(directory,
                    this.cluster.join(directory));
        }
    
    }
    

    4.服务调用

    第3步骤中生成了一个MockClusterInvoker对象,所以最终调用服务的方法实际上就是调用MockClusterInvoker.invoke方法,会依次调用AbstractClusterInvoker.invoke->FailoverClusterInvoker.doInvoke->DubboInvoker.doInvoke

    @Override
        public Result invoke(Invocation invocation) throws RpcException {
            Result result = null;
    
            String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim();
            if (value.length() == 0 || "false".equalsIgnoreCase(value)) {
                //no mock
                result = this.invoker.invoke(invocation);
            } else if (value.startsWith("force")) {
                if (logger.isWarnEnabled()) {
                    logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl());
                }
                //force:direct mock
                result = doMockInvoke(invocation, null);
            } else {
                //fail-mock
                try {
                    result = this.invoker.invoke(invocation);
    
                    //fix:#4585
                    if(result.getException() != null && result.getException() instanceof RpcException){
                        RpcException rpcException= (RpcException)result.getException();
                        if(rpcException.isBiz()){
                            throw  rpcException;
                        }else {
                            result = doMockInvoke(invocation, rpcException);
                        }
                    }
    
                }
            return result;
        }
    

    我们直接看DubboInvoker.doInvoke方法
    1.首先会拿到一个 ExchangeClient客户端
    2.异步请求currentClient.request,最终调用HeaderExchangeChannel.request->调用netty的方法channel.send

    @Override
        protected Result doInvoke(final Invocation invocation) throws Throwable {
            RpcInvocation inv = (RpcInvocation) invocation;
            final String methodName = RpcUtils.getMethodName(invocation);
            inv.setAttachment(PATH_KEY, getUrl().getPath());
            inv.setAttachment(VERSION_KEY, version);
    
     
            ExchangeClient currentClient;
            if (clients.length == 1) {
                currentClient = clients[0];
            } else {
               
                currentClient = clients[index.getAndIncrement() % clients.length];
            }
    
            try {
             
                boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
    
                int timeout = getUrl().getMethodPositiveParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);
                if (isOneway) {
                    boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                    currentClient.send(inv, isSent);
    
                    return AsyncRpcResult.newDefaultAsyncResult(invocation);
                } else {
                    AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
    
                    CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
    
                    asyncRpcResult.subscribeTo(responseFuture);
    
                    return asyncRpcResult;
                }
            }
        }
    

    5.服务请求处理

    由于使用的netty通信,所有客户端发送消息后,netty服务端会在NettyServerHandler.channelRead中接到消息,这里调用了很多handler,就不展开看了。
    1.MultiMessageHandler
    2.HeartbeatHandler
    3.AllChannelHandler
    4.DecodeHandler
    5.HeaderExchangeHandler
    6.ExchangeHandlerAdapter

    @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
          
            NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
            try {
                handler.received(channel, msg);
            } finally {
    
                NettyChannel.removeChannelIfDisconnected(ctx.channel());
            }
        }
    

    总结

    服务的引入的目的就是在消费端@Reference标注一个服务端接口,这个注解会去将消费端消息注册到zk,最终会生成一个调用服务端的代理对象invoker,消费端调用服务端接口的时候最后调用的就是invoker.invoke方法,而这个方法采用的通信框架是netty,实现了远程调用。
    dubbo源码写的很好,比如里面的SPI机制运用的很巧妙,还有一些抽象工厂设计模式等,源码值得品读。

    相关文章

      网友评论

          本文标题:08-dubbo服务引入和调用源码分析

          本文链接:https://www.haomeiwen.com/subject/nepzyltx.html