美文网首页JAVA相关
dubbo源码愫读(4)dubbo服务发布流程分析

dubbo源码愫读(4)dubbo服务发布流程分析

作者: 桥头放牛娃 | 来源:发表于2019-01-29 20:20 被阅读182次

    说明:本文以以下配置进行服务发布流程分析:
    注册中心:zookeeper;
    发布协议:dobbo

    1、服务发布流程解析

    服务发布流程图如下:

    dubbo服务发布流程.jpg

    (1)、应用使用<dubbo:service>配置

        <!-- 声明需要暴露的服务接口 -->
        <dubbo:service id="dubboTestApi" interface="dubbo.demo.api.DubboTestApi" ref="dubboTestApiImpl" cache="false" />
    

    (2)、DubboNamespaceHandler

    模块:dubbo-config -> dubbo-config-spring
    包:org.apache.dubbo.config.spring.schema
    主要方法:init()
    主要源码:

    @Override
    public void init() {
        registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
        registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
        registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
        registerBeanDefinitionParser("config-center", new DubboBeanDefinitionParser(ConfigCenterBean.class, true));
        registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
        registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
        registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
        registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
        registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
        registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
        registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser());
    }
    

    本处主要定义dubbo的自定义标签的解析处理。

    (3)、ServiceBean.export()

    模块:dubbo-config -> dubbo-config-spring
    包:org.apache.dubbo.config.spring.schema
    主要方法:export()
    主要源码:

    public synchronized void export() {
        checkAndUpdateSubConfigs();
    
        if (provider != null) {
            if (export == null) {
                export = provider.getExport();
            }
            if (delay == null) {
                delay = provider.getDelay();
            }
        }
        if (export != null && !export) {
            return;
        }
    
        if (delay != null && delay > 0) {
            delayExportExecutor.schedule(this::doExport, delay, TimeUnit.MILLISECONDS);
        } else {
            doExport();
        }
    }
    

    主要流程:

    • 获取provider的export属性及delay属性;
    • 若export=false表示不对当前服务进行暴露,则直接返回;
    • 若delay有合理值,表示需要延迟暴露,则将暴露操作放到任务队列中;否则立即调用doExport()进行暴露处理。

    (4)、ServiceBean.doExport()

    模块:dubbo-config -> dubbo-config-spring
    包:org.apache.dubbo.config.spring.schema
    主要方法:doExport()
    主要源码:

    protected synchronized void doExport() {
        if (unexported) {
            throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
        }
        if (exported) {
            return;
        }
        exported = true;
    
        if (StringUtils.isEmpty(path)) {
            path = interfaceName;
        }
        ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), ref, interfaceClass);
        ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel);
        doExportUrls();
    }
    

    (5)、ServiceBean.doExportUrls()

    模块:dubbo-config -> dubbo-config-spring
    包:org.apache.dubbo.config.spring.schema
    主要方法:doExportUrls()
    主要源码:

    private void doExportUrls() {
        List<URL> registryURLs = loadRegistries(true);
        for (ProtocolConfig protocolConfig : protocols) {
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
    }
    

    此处主要是根据根据配置的协议及注册中心进行分别暴露;

    (6)、ServiceBean.doExportUrlsFor1Protocol()

    模块:dubbo-config -> dubbo-config-spring
    包:org.apache.dubbo.config.spring.schema
    主要方法:doExportUrlsFor1Protocol()
    主要源码:

    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
    
    Exporter<?> exporter = protocol.export(wrapperInvoker);
    exporters.add(exporter);
    

    主要处理流程:

    • 通过proxyFactory生成实现的代理类,proxyFactory实现为:JavassistProxyFactory;
    • 通过protocol对代理类进行暴露处理,protocol实现为:RegistryProtocol。具体为何为RegistryProtocol实现类请参照:

    (7)、JavassistProxyFactory.getInvoker()

    模块:dubbo-rpc-> dubbo-rpc-api
    包:org.apache.dubbo.rpc.proxy.javassist
    主要方法:getInvoker()
    主要源码:

    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }
    

    处理流程:

    • 通过Wrapper.getWrapper()动态生成接口的实现类;
    • 创建代理的invoker实现;

    (8)、Wrapper.getWrapper()

    模块:dubbo-common
    包:org.apache.dubbo.common.bytecode
    主要方法:getWrapper()
    主要源码:

    public static Wrapper getWrapper(Class<?> c) {
        while (ClassGenerator.isDynamicClass(c)) // can not wrapper on dynamic class.
        {
            c = c.getSuperclass();
        }
    
        if (c == Object.class) {
            return OBJECT_WRAPPER;
        }
    
        Wrapper ret = WRAPPER_MAP.get(c);
        if (ret == null) {
            ret = makeWrapper(c);
            WRAPPER_MAP.put(c, ret);
        }
        return ret;
    }
    

    主要实现是通过makeWrapper()动态生成接口的代理类,makeWrapper()主要是通过动态拼接编译实现。主要实现方法为invokeMethod(),此方法根据调用的具体的实现、方法、参数类型、参数等调用其具体的实现类。

    (9)、RegistryProtocol.export()

    模块:dubbo-registry->dubbo-registry-api
    包:org.apache.dubbo.registry.integration
    主要方法:export()
    主要源码:

    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        URL registryUrl = getRegistryUrl(originInvoker);
        // url to export locally
        URL providerUrl = getProviderUrl(originInvoker);
    
        // Subscribe the override data
        // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
        //  the same service. Because the subscribed is cached key with the name of the service, it causes the
        //  subscription information to cover.
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
    
        providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
        //export invoker
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
    
        // url to registry
        final Registry registry = getRegistry(originInvoker);
        final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
        ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
                registryUrl, registeredProviderUrl);
        //to judge if we need to delay publish
        boolean register = registeredProviderUrl.getParameter("register", true);
        if (register) {
            register(registryUrl, registeredProviderUrl);
            providerInvokerWrapper.setReg(true);
        }
    
        // Deprecated! Subscribe to override rules in 2.6.x or before.
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
    
        exporter.setRegisterUrl(registeredProviderUrl);
        exporter.setSubscribeUrl(overrideSubscribeUrl);
        //Ensure that a new exporter instance is returned every time export
        return new DestroyableExporter<>(exporter);
    }
    

    主要处理流程:

    • getRegistryUrl()获取注册中心的url,以“zeekeeper://xxx.xxx.xx.xxx:2181/”开头,后面跟随接口参数等信息;
    • getProviderUrl()获取服务提供者的url,以“dubbo://xxx.xxx.xxx.xxx:20880/"开头,后面跟随接口参数等信息;
    • getSubscribedOverrideUrl()获取需订阅的url;
    • doLocalExport()主要进行本地暴露处理;
    • getRegistry()获取注册中心;
    • register为true表示配置需要进行注册,则调用register()将接口信息注册到注册中心;
    • subscribe()进行接口信息订阅;

    注:此处为何调用的是RegistryProtocol请参照:https://www.jianshu.com/p/336310167bcb

    (10)、RegistryProtocol.doLocalExport()

    模块:dubbo-registry->dubbo-registry-api
    包:org.apache.dubbo.registry.integration
    主要方法:doLocalExport()
    主要源码:

    private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
        String key = getCacheKey(originInvoker);
        ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
        if (exporter == null) {
            synchronized (bounds) {
                exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
                if (exporter == null) {
    
                    final Invoker<?> invokerDelegete = new InvokerDelegate<T>(originInvoker, providerUrl);
                    exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
                    bounds.put(key, exporter);
                }
            }
        }
        return exporter;
    }
    

    此处主要是调用DubboProtocol.export()进行接口暴露。
    注:此处为何调用的是DubboProtocol,请参考:https://www.jianshu.com/p/336310167bcb

    (11)、DubboProtocol.export()

    模块:dubbo-rpc->dubbo-rpc-dubbo
    包:org.apache.dubbo.rpc.protocol.dubbo
    主要方法:doLocalExport()
    主要源码:

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();
    
        // export service.
        String key = serviceKey(url);
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        exporterMap.put(key, exporter);
    
        //export an stub service for dispatching event
        Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
        if (isStubSupportEvent && !isCallbackservice) {
            String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                if (logger.isWarnEnabled()) {
                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }
            } else {
                stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
            }
        }
    
        openServer(url);
        optimizeSerialization(url);
        return exporter;
    }
    

    主要调用openServer()进行本地服务暴露。

    (12)、DubboProtocol.openServer()

    模块:dubbo-rpc->dubbo-rpc-dubbo
    包:org.apache.dubbo.rpc.protocol.dubbo
    主要方法:openServer()
    主要源码:

    private void openServer(URL url) {
        // find server.
        String key = url.getAddress();
        //client can export a service which's only for server to invoke
        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
        if (isServer) {
            ExchangeServer server = serverMap.get(key);
            if (server == null) {
                synchronized (this) {
                    server = serverMap.get(key);
                    if (server == null) {
                        serverMap.put(key, createServer(url));
                    }
                }
            } else {
                // server supports reset, use together with override
                server.reset(url);
            }
        }
    }
    

    主要是通过createServer创建并初始化ExchangeServer对象。

    (13)、DubboProtocol.createServer()

    模块:dubbo-rpc->dubbo-rpc-dubbo
    包:org.apache.dubbo.rpc.protocol.dubbo
    主要方法:createServer()
    主要源码:

    private ExchangeServer createServer(URL url) {
        // send readonly event when server closes, it's enabled by default
        url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
        // enable heartbeat by default
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
        String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
    
        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
            throw new RpcException("Unsupported server type: " + str + ", url: " + url);
        }
    
        url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
        ExchangeServer server;
        try {
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }
        str = url.getParameter(Constants.CLIENT_KEY);
        if (str != null && str.length() > 0) {
            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            if (!supportedTypes.contains(str)) {
                throw new RpcException("Unsupported client type: " + str);
            }
        }
        return server;
    }
    

    主要是通过Exchangers.bind()生成服务端的ExchangeServer。

    (14)、Exchangers.bind()

    模块:dubbo-remoting->dubbo-remoting-api
    包:org.apache.dubbo.remoting.exchange
    主要方法:bind()
    主要源码:

    public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
        return getExchanger(url).bind(url, handler);
    }
    

    getExchanger通过spi动态加载HeaderExchanger类型的Exchanger,故bind()实际调用的是HeaderExchanger.bind()。

    (15)、HeaderExchanger.bind()

    模块:dubbo-remoting->dubbo-remoting-api
    包:org.apache.dubbo.remoting.exchange.support.header
    主要方法:bind()
    主要源码:

    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }
    

    此处创建HeaderExchangeServer类型ExchangeServer,实际bind()操作由Transporters.bind()实现,解码器为DecodeHandler,解码器为包装handler的HeaderExchangeHandler类;hander的实现为DubboProtocol中的requestHandler对象。

    (16)、Transporters.bind()

    模块:dubbo-remoting->dubbo-remoting-api
    包:org.apache.dubbo.remoting
    主要方法:bind()
    主要源码:

    public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handlers == null || handlers.length == 0) {
            throw new IllegalArgumentException("handlers == null");
        }
        ChannelHandler handler;
        if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            handler = new ChannelHandlerDispatcher(handlers);
        }
        return getTransporter().bind(url, handler);
    }
    

    此处最终调用的是NettyTransporter.bind()对底层连接进程处理的。

    (17)、RegistryProtocol.getRegistry()

    模块:dubbo-registry->dubbo-registry-api
    包:org.apache.dubbo.registry
    主要方法:getRegistry()
    主要源码:

    private Registry getRegistry(final Invoker<?> originInvoker) {
        URL registryUrl = getRegistryUrl(originInvoker);
        return registryFactory.getRegistry(registryUrl);
    }
    

    此处主要是获取注册中心的url,通过RegistryFactory.getRegistry()获取注册中心接口的实现。

    (18)、ZookeeperRegistryFactory.getRegistry()

    模块:dubbo-registry->dubbo-registry-api
    包:org.apache.dubbo.registry.support
    主要方法:getRegistry()
    主要源码:

    public Registry getRegistry(URL url) {
        url = url.setPath(RegistryService.class.getName())
                .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
                .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
        String key = url.toServiceStringWithoutResolving();
        // Lock the registry access process to ensure a single instance of the registry
        LOCK.lock();
        try {
            Registry registry = REGISTRIES.get(key);
            if (registry != null) {
                return registry;
            }
            //create registry by spi/ioc
            registry = createRegistry(url);
            if (registry == null) {
                throw new IllegalStateException("Can not create registry " + url);
            }
            REGISTRIES.put(key, registry);
            return registry;
        } finally {
            // Release the lock
            LOCK.unlock();
        }
    }
    

    此处主要是通过createRegistry()创建实际的Registry,即ZookeeperRegistry。

    (19)、ZookeeperRegistryFactory.getRegistry()

    模块:dubbo-registry->dubbo-registry-zookeeper
    包:org.apache.dubbo.registry.zookeeper
    主要方法:createRegistry()
    主要源码:

    @Override
    public Registry createRegistry(URL url) {
        return new ZookeeperRegistry(url, zookeeperTransporter);
    }
    

    此处通过工厂模式创建实际的Registry类ZookeeperRegistry。

    (20)、RegistryProtocol.register()

    模块:dubbo-registry->dubbo-registry-api
    包:org.apache.dubbo.registry
    主要方法:register()
    主要源码:

    public void register(URL registryUrl, URL registeredProviderUrl) {
        Registry registry = registryFactory.getRegistry(registryUrl);
        registry.register(registeredProviderUrl);
    }
    

    此处主要是获取注册中心的实现,并将服务提供者注册到注册中心。

    (21)、ZookeeperRegistry.register()

    模块:dubbo-registry->dubbo-registry-zookeeper
    包:org.apache.dubbo.registry.zookeeper
    主要方法:register()
    主要源码:

    public void register(URL url) {
        super.register(url);
        removeFailedRegistered(url);
        removeFailedUnregistered(url);
        try {
            // Sending a registration request to the server side
            doRegister(url);
        } catch (Exception e) {
            Throwable t = e;
    
            // If the startup detection is opened, the Exception is thrown directly.
            boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                    && url.getParameter(Constants.CHECK_KEY, true)
                    && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
            boolean skipFailback = t instanceof SkipFailbackWrapperException;
            if (check || skipFailback) {
                if (skipFailback) {
                    t = t.getCause();
                }
                throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
            } else {
                logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
            }
    
            // Record a failed registration request to a failed list, retry regularly
            addFailedRegistered(url);
        }
    }
    

    处理流程:

    • 将服务提供者的url放入注册队列;
    • 从注册失败队列及注销失败队列中移除本url;
    • 调用doRegistery()进行实际的注册中心注册操作;
    • 若注册失败则将url放入注册失败队列中,等待一定延迟后继续注册;

    (22)、ZookeeperRegistry.doRegister()

    模块:dubbo-registry->dubbo-registry-zookeeper
    包:org.apache.dubbo.registry.zookeeper
    主要方法:doRegister()
    主要源码:

    public void doRegister(URL url) {
        try {
            zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
        } catch (Throwable e) {
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
    

    本处主要是调用ZookeeperClient.create()在zookeeper集群中创建一个节点;

    (23)、CuratorZookeeperClient.create()

    模块:dubbo-remoting->dubbo-remoting-zookeeper
    包:org.apache.dubbo.remoting.zookeeper.curator
    主要方法:create()
    主要源码:

    public void create(String path, boolean ephemeral) {
        if (!ephemeral) {
            if (checkExists(path)) {
                return;
            }
        }
        int i = path.lastIndexOf('/');
        if (i > 0) {
            create(path.substring(0, i), false);
        }
        if (ephemeral) {
            createEphemeral(path);
        } else {
            createPersistent(path);
        }
    }
    

    本处主要是在zookeeper集群上创建目录及节点。

    (24)、ZookeeperRegistry.subscribe()

    模块:dubbo-registry->dubbo-registry-api
    包:org.apache.dubbo.registry.zookeeper
    主要方法:subscribe()
    主要源码:

    public void subscribe(URL url, NotifyListener listener) {
        super.subscribe(url, listener);
        removeFailedSubscribed(url, listener);
        try {
            // Sending a subscription request to the server side
            doSubscribe(url, listener);
        } catch (Exception e) {
            Throwable t = e;
    
            List<URL> urls = getCacheUrls(url);
            if (CollectionUtils.isNotEmpty(urls)) {
                notify(url, listener, urls);
                logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
            } else {
                // If the startup detection is opened, the Exception is thrown directly.
                boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                        && url.getParameter(Constants.CHECK_KEY, true);
                boolean skipFailback = t instanceof SkipFailbackWrapperException;
                if (check || skipFailback) {
                    if (skipFailback) {
                        t = t.getCause();
                    }
                    throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
                } else {
                    logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
                }
            }
    
            // Record a failed registration request to a failed list, retry regularly
            addFailedSubscribed(url, listener);
        }
    }
    

    主要处理流程:

    • 将服务提供者的url及监听器放入订阅队列中;
    • 从订阅失败队列中移除本url;
    • 创建对当前url的监听器;
    • 若订阅失败,则将当前url及监听器添加到订阅失败队列中;

    2、服务端对客户端请求的处理流程

    2.1、传输层相关的处理分析

    (1)、Netty服务端创建

    以下源码为NettyServer中打开服务端网络监听的实现:

    protected void doOpen() throws Throwable {
        bootstrap = new ServerBootstrap();
    
        bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
        workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
                new DefaultThreadFactory("NettyServerWorker", true));
    
        final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
        channels = nettyServerHandler.getChannels();
    
        bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                        ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                                .addLast("decoder", adapter.getDecoder())
                                .addLast("encoder", adapter.getEncoder())
                                .addLast("handler", nettyServerHandler);
                    }
                });
        // bind
        ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
        channelFuture.syncUninterruptibly();
        channel = channelFuture.channel();
    
    }
    
    
    
    protected static Codec2 getChannelCodec(URL url) {
        String codecName = url.getParameter(Constants.CODEC_KEY, "telnet");
        if (ExtensionLoader.getExtensionLoader(Codec2.class).hasExtension(codecName)) {
            return ExtensionLoader.getExtensionLoader(Codec2.class).getExtension(codecName);
        } else {
            return new CodecAdapter(ExtensionLoader.getExtensionLoader(Codec.class)
                    .getExtension(codecName));
        }
    }
    

    由以上代码可知:

    • 编解码器为getCodec()函数获取的,而实际是通过getChannelCodec()获取的编解码器实现;在getChannelCodec中根据SPI扩展获取暴露协议对应的编解码器,本处为获取dubbo协议对应的编解码器;其具体实现类为:DubboCodec;
    • nettyServerHandler为服务端的实际业务的handler,其只是NettyServer相应接口的简单包装,而NettyServer这个handler又是其构造函数中传过来的handler的简单封装,而这个handler的实现请看以下代码。

    注:在此主要对流程进行分析,故不对DubboCodec的具体实现细节做过多分析。

    (2)、Exchanger层服务的创建:

    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }
    

    由以上可知,NettyServer中的handler实现为DecodeHandler,而DecodeHandler是HeaderExchangeHandler的简单封装,HeaderExchangeHandler主要根据请求类型进行处理,其实际请求的处理是交由handler进行处理的,而此处的handler实现为DubboProtocol中的requestHandler对象,requestHandler的实现类为
    ExchangeHandlerAdapter的匿名子类。

    2.2、服务端请求处理流程

    以下即为请求处理的主要调用流程:

    dubbo服务器对客户端调用的处理.jpg

    (1)、DubboCodec.decode()

    模块:dubbo-rpc->dubbo-rpc-dubbo
    包:org.apache.dubbo.rpc.protocol.dubbo
    主要方法:decode()
    主要源码:

    protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
        // check magic number.
        if (readable > 0 && header[0] != MAGIC_HIGH
                || readable > 1 && header[1] != MAGIC_LOW) {
            int length = header.length;
            if (header.length < readable) {
                header = Bytes.copyOf(header, readable);
                buffer.readBytes(header, length, readable - length);
            }
            for (int i = 1; i < header.length - 1; i++) {
                if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
                    buffer.readerIndex(buffer.readerIndex() - header.length + i);
                    header = Bytes.copyOf(header, i);
                    break;
                }
            }
            return super.decode(channel, buffer, readable, header);
        }
        // check length.
        if (readable < HEADER_LENGTH) {
            return DecodeResult.NEED_MORE_INPUT;
        }
    
        // get data length.
        int len = Bytes.bytes2int(header, 12);
        checkPayload(channel, len);
    
        int tt = len + HEADER_LENGTH;
        if (readable < tt) {
            return DecodeResult.NEED_MORE_INPUT;
        }
    
        // limit input stream.
        ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);
    
        try {
            return decodeBody(channel, is, header);
        } finally {
            if (is.available() > 0) {
                try {
                    if (logger.isWarnEnabled()) {
                        logger.warn("Skip input stream " + is.available());
                    }
                    StreamUtils.skipUnusedStream(is);
                } catch (IOException e) {
                    logger.warn(e.getMessage(), e);
                }
            }
        }
    }
    

    此处主要是对请求数据进行基本的解码处理,然后将消息交由decodeBody()进行进一步的解码处理。

    (2)、DubboCodec.decodeBody()

    模块:dubbo-rpc->dubbo-rpc-dubbo
    包:org.apache.dubbo.rpc.protocol.dubbo
    主要方法:decodeBody()
    主要源码:

    protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
        byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
        // get request id.
        long id = Bytes.bytes2long(header, 4);
        if ((flag & FLAG_REQUEST) == 0) {
            // decode response.
            Response res = new Response(id);
            if ((flag & FLAG_EVENT) != 0) {
                res.setEvent(Response.HEARTBEAT_EVENT);
            }
            // get status.
            byte status = header[3];
            res.setStatus(status);
            try {
                ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
                if (status == Response.OK) {
                    Object data;
                    if (res.isHeartbeat()) {
                        data = decodeHeartbeatData(channel, in);
                    } else if (res.isEvent()) {
                        data = decodeEventData(channel, in);
                    } else {
                        DecodeableRpcResult result;
                        if (channel.getUrl().getParameter(
                                Constants.DECODE_IN_IO_THREAD_KEY,
                                Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
                            result = new DecodeableRpcResult(channel, res, is,
                                    (Invocation) getRequestData(id), proto);
                            result.decode();
                        } else {
                            result = new DecodeableRpcResult(channel, res,
                                    new UnsafeByteArrayInputStream(readMessageData(is)),
                                    (Invocation) getRequestData(id), proto);
                        }
                        data = result;
                    }
                    res.setResult(data);
                } else {
                    res.setErrorMessage(in.readUTF());
                }
            } catch (Throwable t) {
                if (log.isWarnEnabled()) {
                    log.warn("Decode response failed: " + t.getMessage(), t);
                }
                res.setStatus(Response.CLIENT_ERROR);
                res.setErrorMessage(StringUtils.toString(t));
            }
            return res;
        } else {
            // decode request.
            Request req = new Request(id);
            req.setVersion(Version.getProtocolVersion());
            req.setTwoWay((flag & FLAG_TWOWAY) != 0);
            if ((flag & FLAG_EVENT) != 0) {
                req.setEvent(true);
            }
            try {
                Object data;
                ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
                if (req.isHeartbeat()) {
                    data = decodeHeartbeatData(channel, in);
                } else if (req.isEvent()) {
                    data = decodeEventData(channel, in);
                } else {
                    DecodeableRpcInvocation inv;
                    if (channel.getUrl().getParameter(
                            Constants.DECODE_IN_IO_THREAD_KEY,
                            Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
                        inv = new DecodeableRpcInvocation(channel, req, is, proto);
                        inv.decode();
                    } else {
                        inv = new DecodeableRpcInvocation(channel, req,
                                new UnsafeByteArrayInputStream(readMessageData(is)), proto);
                    }
                    data = inv;
                }
                req.setData(data);
            } catch (Throwable t) {
                if (log.isWarnEnabled()) {
                    log.warn("Decode request failed: " + t.getMessage(), t);
                }
                // bad request
                req.setBroken(true);
                req.setData(t);
            }
    
            return req;
        }
    }
    

    此处主要是将消息解码成DecodeableRpcInvocation对象,并交由业务层的具体的业务handler进行处理。

    (3)、DecodeHandler.received()

    模块:dubbo-remoting->dubbo-remoting-api
    包:org.apache.dubbo.remoting.transport
    主要方法:received()
    主要源码:

    public void received(Channel channel, Object message) throws RemotingException {
        if (message instanceof Decodeable) {
            decode(message);
        }
    
        if (message instanceof Request) {
            decode(((Request) message).getData());
        }
    
        if (message instanceof Response) {
            decode(((Response) message).getResult());
        }
    
        handler.received(channel, message);
    }
    

    此处主要调用handler.received()进行处理,实际调用的是HeaderExchangeHandler.received()进程处理。

    (4)、HeaderExchangeHandler.received()

    模块:dubbo-remoting->dubbo-remoting-api
    包:org.apache.dubbo.remoting.exchange.support.header
    主要方法:received()
    主要源码:

    public void received(Channel channel, Object message) throws RemotingException {
        channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
        final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
        try {
            if (message instanceof Request) {
                // handle request.
                Request request = (Request) message;
                if (request.isEvent()) {
                    handlerEvent(channel, request);
                } else {
                    if (request.isTwoWay()) {
                        handleRequest(exchangeChannel, request);
                    } else {
                        handler.received(exchangeChannel, request.getData());
                    }
                }
            } else if (message instanceof Response) {
                handleResponse(channel, (Response) message);
            } else if (message instanceof String) {
                if (isClientSide(channel)) {
                    Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                    logger.error(e.getMessage(), e);
                } else {
                    String echo = handler.telnet(channel, (String) message);
                    if (echo != null && echo.length() > 0) {
                        channel.send(echo);
                    }
                }
            } else {
                handler.received(exchangeChannel, message);
            }
        } finally {
            HeaderExchangeChannel.removeChannelIfDisconnected(channel);
        }
    }
    

    主要处理流程:

    • 若消息类型是Request,若是事件消息,则进行事件处理;若是需要应答的消息,则调用handleRequest()进行进一步处理;若是无需应答的消息,则直接调用实际handler.received()进行实际的消息处理;
    • 若未应答消息,则调用handleResponse()进行应答处理;
    • 若为String类型的消息,则表示通过telnet进行通信,调用handler.telnet()对消息进行处理;

    (5)、DecodeHandler.handleRequest()

    模块:dubbo-remoting->dubbo-remoting-api
    包:org.apache.dubbo.remoting.exchange.support.header
    主要方法:handleRequest()
    主要源码:

    void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
        Response res = new Response(req.getId(), req.getVersion());
        if (req.isBroken()) {
            Object data = req.getData();
    
            String msg;
            if (data == null) {
                msg = null;
            } else if (data instanceof Throwable) {
                msg = StringUtils.toString((Throwable) data);
            } else {
                msg = data.toString();
            }
            res.setErrorMessage("Fail to decode request due to: " + msg);
            res.setStatus(Response.BAD_REQUEST);
    
            channel.send(res);
            return;
        }
        // find handler by message class.
        Object msg = req.getData();
        try {
            // handle data.
            CompletableFuture<Object> future = handler.reply(channel, msg);
            if (future.isDone()) {
                res.setStatus(Response.OK);
                res.setResult(future.get());
                channel.send(res);
                return;
            }
            future.whenComplete((result, t) -> {
                try {
                    if (t == null) {
                        res.setStatus(Response.OK);
                        res.setResult(result);
                    } else {
                        res.setStatus(Response.SERVICE_ERROR);
                        res.setErrorMessage(StringUtils.toString(t));
                    }
                    channel.send(res);
                } catch (RemotingException e) {
                    logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);
                } finally {
                    // HeaderExchangeChannel.removeChannelIfDisconnected(channel);
                }
            });
        } catch (Throwable e) {
            res.setStatus(Response.SERVICE_ERROR);
            res.setErrorMessage(StringUtils.toString(e));
            channel.send(res);
        }
    }
    

    此处主要是调用handler.reply()处理消息,获得返回结果,将结果返回给客户端;

    (6)、DubboProtocol.requestHandler.reply()

    模块:dubbo-rpc->dubbo-rpc-dubbo
    包:org.apache.dubbo.rpc.protocol.dubbo
    主要方法:reply()
    主要源码:

    public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
        if (message instanceof Invocation) {
            Invocation inv = (Invocation) message;
            Invoker<?> invoker = getInvoker(channel, inv);
            // need to consider backward-compatibility if it's a callback
            if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
                String methodsStr = invoker.getUrl().getParameters().get("methods");
                boolean hasMethod = false;
                if (methodsStr == null || !methodsStr.contains(",")) {
                    hasMethod = inv.getMethodName().equals(methodsStr);
                } else {
                    String[] methods = methodsStr.split(",");
                    for (String method : methods) {
                        if (inv.getMethodName().equals(method)) {
                            hasMethod = true;
                            break;
                        }
                    }
                }
                if (!hasMethod) {
                    logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
                            + " not found in callback service interface ,invoke will be ignored."
                            + " please update the api interface. url is:"
                            + invoker.getUrl()) + " ,invocation is :" + inv);
                    return null;
                }
            }
            RpcContext rpcContext = RpcContext.getContext();
            boolean supportServerAsync = invoker.getUrl().getMethodParameter(inv.getMethodName(), Constants.ASYNC_KEY, false);
            if (supportServerAsync) {
                CompletableFuture<Object> future = new CompletableFuture<>();
                rpcContext.setAsyncContext(new AsyncContextImpl(future));
            }
            rpcContext.setRemoteAddress(channel.getRemoteAddress());
            Result result = invoker.invoke(inv);
    
            if (result instanceof AsyncRpcResult) {
                return ((AsyncRpcResult) result).getResultFuture().thenApply(r -> (Object) r);
            } else {
                return CompletableFuture.completedFuture(result);
            }
        }
        throw new RemotingException(channel, "Unsupported request: "
                + (message == null ? null : (message.getClass().getName() + ": " + message))
                + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
    }
    

    处理流程:

    • getInvoker():通调用接口信息获取对应的代理实现类invoke;
    • 调用invoker.invoke()实现对具体接口实现类的调用并获取返回数据;

    (7)、DubboProtocol.requestHandler.getInvoke()

    模块:dubbo-rpc->dubbo-rpc-dubbo
    包:org.apache.dubbo.rpc.protocol.dubbo
    主要方法:getInvoke()
    主要源码:

    Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
        boolean isCallBackServiceInvoke = false;
        boolean isStubServiceInvoke = false;
        int port = channel.getLocalAddress().getPort();
        String path = inv.getAttachments().get(Constants.PATH_KEY);
        // if it's callback service on client side
        isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getAttachments().get(Constants.STUB_EVENT_KEY));
        if (isStubServiceInvoke) {
            port = channel.getRemoteAddress().getPort();
        }
        //callback
        isCallBackServiceInvoke = isClientSide(channel) && !isStubServiceInvoke;
        if (isCallBackServiceInvoke) {
            path += "." + inv.getAttachments().get(Constants.CALLBACK_SERVICE_KEY);
            inv.getAttachments().put(IS_CALLBACK_SERVICE_INVOKE, Boolean.TRUE.toString());
        }
        String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY));
    
        DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
    
        if (exporter == null) {
            throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " +
                    exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv);
        }
    
        return exporter.getInvoker();
    }
    

    此处主要是通过接口信息,从exporterMap中获取对应的DubboExporter,此处的exporter即为服务提供者暴露流程生成的exporter;

    (8)、AbstractProxyInvoker.invoke()

    模块:dubbo-rpc->dubbo-rpc-apr
    包:org.apache.dubbo.rpc.proxy
    主要方法:invoke()
    主要源码:

    public Result invoke(Invocation invocation) throws RpcException {
        RpcContext rpcContext = RpcContext.getContext();
        try {
            Object obj = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
            if (RpcUtils.isReturnTypeFuture(invocation)) {
                return new AsyncRpcResult((CompletableFuture<Object>) obj);
            } else if (rpcContext.isAsyncStarted()) { // ignore obj in case of RpcContext.startAsync()? always rely on user to write back.
                return new AsyncRpcResult(rpcContext.getAsyncContext().getInternalFuture());
            } else {
                return new RpcResult(obj);
            }
        } catch (InvocationTargetException e) {
            // TODO async throw exception before async thread write back, should stop asyncContext
            if (rpcContext.isAsyncStarted() && !rpcContext.stopAsync()) {
                logger.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", e);
            }
            return new RpcResult(e.getTargetException());
        } catch (Throwable e) {
            throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
    

    此处主要调用子类的doInvoke()进行实际的接口调用处理;

    (9)、AbstractProxyInvoker.doInvoke()

    模块:dubbo-rpc->dubbo-rpc-apr
    包:org.apache.dubbo.rpc.proxy
    主要方法:doInvoke()
    主要源码:

    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }
    

    此处是服务暴露处理过程中生成动态代理的地方,具体为JavassistProxyFactory类中;此处创建了一个AbstractProxyInvoker的匿名,此类重写了doInvoke()方法,在方法内调用代理类的invokeMethod()方法对具体调用进行处理。上一步中调用的doInvoke()即为本处匿名类实现的doInvoke()。

    (10)、接口的Wrapper类

    模块:dubbo-rpc->dubbo-rpc-apr
    包:org.apache.dubbo.rpc.proxy
    主要方法:invokeMethod()
    主要源码:

    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }
    

    通过Wrapper.getWrapper()动态编译生成的接口的代理类;其invokeMethod()具体实现是,通过方法名称的对比来调用实际的实现类相应的方法;

    (11)调用接口的具体实现类

    以下即为动态生成的invokeMethod的demo,接口名称为:DubboTestApi,接口只有一个方法:echoTest;通过以下动态生成的代码可以看出,其实现是根据客户端调用的方法名称调用具体实现类的对应方法。

    public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException {
        dubbo.demo.api.DubboTestApi w;
        try {
            w = ((dubbo.demo.api.DubboTestApi) $1);
        } catch (Throwable e) {
            throw new IllegalArgumentException(e);
        }
        try {
            if ("echoTest".equals($2) && $3.length == 1) {
                return ($w) w.echoTest((java.lang.String) $4[0]);
            }
        } catch (Throwable e) {
            throw new java.lang.reflect.InvocationTargetException(e);
        }
        throw new com.alibaba.dubbo.common.bytecode.NoSuchMethodException("Not found method \"" + $2 + "\" in class dubbo.demo.api.DubboTestApi.");
    }
    
    

    相关文章

      网友评论

        本文标题:dubbo源码愫读(4)dubbo服务发布流程分析

        本文链接:https://www.haomeiwen.com/subject/wouxsqtx.html