美文网首页
Dubbo 3.0源码剖析

Dubbo 3.0源码剖析

作者: 王侦 | 来源:发表于2023-02-24 17:00 被阅读0次

    1.手写模拟Dubbo

    • 定义一个接口,服务提供者需要实现该接口。该接口需要打成一个jar包,然后服务提供者和调用者都要依赖该接口的jar包。
    • Dubbo需要处理Http请求(HttpServer)、Dubbo请求
    • 服务提供者需要启动HttpServer,比如Tomcat还要添加DispatchServlet
    • DispatchServlet#service调用Handler处理请求,服务调用者RPC调用服务提供者,需要传递的参数包括(封装成Invocation,序列化与反序列化统一为JDK序列化方式):
       接口名
       方法名
       方法参数类型列表
       方法参数值列表
       版本号
    • 服务提供者从请求里面拿到Invocation对象后,需要找到对应接口实现类的执行方法。那到底可以调用哪些实现类呢?服务提供者这边需要对可以向外暴露的实现类进行本地注册。
      然后根据Invocation中的接口拿到实现类,然后使用Invocation方法名、参数调用实现类对应的方法。
    • 如果服务提供者对应接口有多个版本的实现类,则Invocation需要加入版本号,然后通过指定版本号执行指定版本的实现类。
    • 服务调用者需要远程调用,比如HttpClient,对Invocation进行JDK序列化,发送请求,然后接收响应。
    • 服务调用者升级)可直接拿到接口的动态代理实现类,然后直接通过该实现类进行RPC调用。把RPC调用逻辑封装到该代理实现类里面。ProxyFactory#getProxy获取JDK动态代理,将RPC逻辑封装到InvocationHandler#invoke里面去。
    • 注册中心+负载均衡)服务提供者地址:ip + port需要可配置,而不是写死。需要根据服务名称可以直接获取到ip + port。这其实就是注册中心的功能,服务提供者需要向注册中心进行远程注册,存入Map<接口名,List<URL>>。然后服务调用者从注册中心拉取服务提供者信息,然后进行负载均衡选取一个服务提供者的URL,然后进行RPC调用。
    • 服务容错+重试服务)调用服务出错,调用指定的容错类方法,该方法的逻辑非常灵活,可以返回错误说明等。某个服务调用失败,可以进行指定次数的重试,下次重试调用要避开失败的服务实例。
    • Mock机制)模拟、伪造服务提供者。
    • 支持多协议)比如支持Tomcat以及Netty。抽象出一个接口Protocol,start()启动协议,send()基于协议发送数据。然后DubboProtocol(基于Netty)和HttpProtocol(基于Tomcat)都要实现Protocol接口。然后再提供一个ProtocolFactory工厂,根据传入的类型创建对应的协议。最后通过配置动态切换协议,程序代码不用变。(更进一步)服务提供者向注册中心注册的时候,带上协议,也即URL里面加入一个protocol。服务调用者可以根据拉取到的服务实例中URL的protocol创建对应的协议进行RPC调用。
    • 优化升级,向Dubbo靠拢
      1)URL除了protocol,hostname,port,再增加两个信息:interfaceName以及implClass。
      2)Protocol接口更改:export和refer方法。
      3)实现类DubboProtocol和HttpProtocol。
      服务提供者调用DubboProtocol#export向本地以及远程注册URL,并且启动Netty;服务调用者调用DubboProtocol#refer生成DubboInvoker对象。
      HttpProtocol#export向本地及远程注册URL,并且启动HttpServer;HttpProtocol#refer生成HttpInvoker对象。
      4)Invoker接口:invoke(Invocation);
      5)DubboInvoker及HttpInvoker实现Invoker接口。DubboInvoker通过netty实现调用,参数包括url及Invocation。HttpInvoker通过HttpClient实现调用,参数包括url及Invocation。
      6)CulsterInvoker也实现了Invoker接口,持有List<Invoker>。CulsterInvoker#join,根据服务接口名称获取List<URL>,然后对每一个URL生成对应的Invoker,加入到List<Invoker>里面。然后在CulsterInvoker#invoke里面进行负载均衡调用。

    总结:

    • 1)核心是Protocol
    • 2)服务提供者,构建URL,然后根据URL中的Protocol获取实际的Protocol,调用其export进行服务导出。向本地以及远程注册URL,并且启动相应的服务器(Netty或者Tomcat)。
    • 3)服务调用者,动态代理,其InvocationHandler#invoke中首先创建Invocation,然后根据Invocation中的服务接口名从注册中心拿到对应的List<URL>,然后对每个URL根据其协议生成对应的Invoker(DubboInvoker或者HttpInvoker)。Invoker会根据对应的协议使用对应的客户端进行RPC调用,传递的参数就是Invocation。
    • 4)服务提供者,NettyServerHandler#channelRead、HttpServerHandler#handle会从参数Invocation根据服务接口名、版本号获取对应的实现服务(本地服务注册表),然后根据方法名、方法参数进行实际调用。

    2.服务导出

    几个核心步骤:

    • 1)构建服务URL(协议、ip、port)
    • 2)启动Tomcat/Netty,要先启动,后注册,否则调用端拉到URL,提供端没启动,就没法调通
    • 3)向注册中心注册(接口名:URL)

    Dubbo接口级的服务注册 vs SpringCloud应用级的服务注册:


    @EnableDubbo
    -> @EnableDubboConfig
    -> @Import(DubboConfigConfigurationRegistrar.class)
    -> DubboConfigConfigurationRegistrar是个ImportBeanDefinitionRegistrar,调用registerBeanDefinitions()
    -> DubboSpringInitializer#initialize
    -> DubboBeanUtils#registerCommonBeans
    -> 注册DubboDeployApplicationListener监听器监听ContextRefreshedEvent事件。
    -> DubboDeployApplicationListener#onContextRefreshedEvent
    -> DefaultModuleDeployer#start
    1)服务导出exportServices();
    2)服务引入referServices();
    3)应用级注册onModuleStarted();

    DubboDeployApplicationListener#onApplicationEvent,监听ContextRefreshedEvent事件。

        public void onApplicationEvent(ApplicationContextEvent event) {
            if (nullSafeEquals(applicationContext, event.getSource())) {
                if (event instanceof ContextRefreshedEvent) {
                    onContextRefreshedEvent((ContextRefreshedEvent) event);
                } else if (event instanceof ContextClosedEvent) {
                    onContextClosedEvent((ContextClosedEvent) event);
                }
            }
        }
    

    2.1 Dubbo3.0之前接口级的服务注册

    注册过程:

    • 1)@DubboService,解析注解得到实现类UserServiceImpl和接口UserService,生成一个对象ServiceConfig(包含UserService、UserServiceImpl、version等)
    • 2)ServiceConfig#export进行服务导出
      2-1)确定协议(应用配置),生成URL;
      2-2)启动Netty、Jetty;
      2-3)URL存到注册中心;

    应用的每个接口都会注册到注册中心里面:


    2.2 Dubbo3.0应用级的服务注册

    Dubbo3.0为了兼容之前版本,既会进行接口级服务注册,也会进行应用级的服务注册。

    注册过程:

    • 1)所有接口服务注册完成;(可以关掉register-mode:instance,只进行应用级注册);
    • 2)所有接口会向注册中心mapping注册一个<接口名:实例名(应用名)>
    • 3)(默认没有进行注册,metadata-type改为remote才启用)会将应用元数据信息(应用提供了哪些Dubbo服务接口)存到元数据中心的metadata(元数据中心和注册中心可以共用一个,也可以分开);(metadata-type为local启动服务接口,在这里进行导出)另外一个方法,服务提供者会提供一个元数据Dubbo服务接口MetadataService,消费者可以调用该服务的getMetadataInfo获取元数据信息。
    • 4)然后才进行应用级注册,只用注册一次,应用名: 实例ip+port,这里port默认取Dubbo协议端口,如果没有Dubbo则取第一个协议端口(ServiceInstanceHostPortCustomizer#customize);这里还会存值,包括dubbo.endpoints(port, protocol)。

    如果服务暴露了Triple协议+20880端口,并且metadata-type为local,则应用级注册为:ip + 20881(优先是Dubbo的ip+port,这里是MetadataService)。因为此时MetadataService会暴露为Dubbo协议+20881(默认的20880被占用)。

    怎么判断端口是否被占用?

    • MetadataService -> ServiceConfig#export -> NetUtils#getAvailablePort会对port进行遍历,new ServerSocket(i),如果抛异常,则表示被占用,则尝试i++。

    只根据应用级注册的信息,服务消费者(调用者)怎么办?

    • 1)根据服务名(接口)可以拿到实例 ip+port。有两个问题,问题一,你怎么根据接口名找到应用名(实例名);问题二,你怎么确定该服务提供者是否导出了?
    • 2)问题一,就是通过注册时,注册<接口名:实例名(应用名)>解决;
    • 3)问题二,通过元数据中心metadata可以查到该应用实例提供了哪些接口服务、协议、超时时间(然后就可以生成对应的Invoker,比如DubboInvoker、TripleInvoker)

    那这里,如果应用配置相同的协议(dubbo)有两个端口,在dubbo.endpoints里怎么处理?

    • 只会存一个,这里是个bug,放到Map<Protocol, Port>,这里会覆盖。
    • ProtocolPortsMetadataCustomizer#customize
    • 因此服务消费者只会生成一个URL,但是服务导出有两个URL。

    如果配置Triple协议,有两个端口,强制走接口FORCE_INTERFACE,会有另外一个Bug,也只会调用一个端口,另一个端口无法调用。

    • ServiceConfig#doExportUrl,根据接口、实现类对象ref、服务URL生成一个Invoker;JavassistProxyFactory,执行服务接口方法时,最后都会调用到具体实现类ref的相应方法。
    • TripleProtocol#export服务导出时,会进行本地注册,放到path2Invoker.put(path, invoker),接口名字 -> invoker(实现类),这里如果协议相同、端口不同,第二个URL生成的Invoker会覆盖第一个URL的Invoker,本地注册有问题。
    • 那DubboProtocol#export为什么没有问题,因为本地注册时,它的key生成时,包含了端口号,而不仅仅是接口名字。

    元数据中心特点:

    • 1)数据不怎么变化
    • 2)这里面的数据跟服务消费者(调用者)没有太大的关系

    如上图,应用级注册的值里面也会标识metadata是local还是remote,也就是指示服务消费者是到元数据中心查询还是通过Dubbo调用服务提供者的MetadataService来获取元数据信息。

    另外还有一个重要应用,通过元数据信息只能查到<接口/服务, 协议>,是没有端口的,那这个端口在哪里?就在应用级注册的值里面:

    • metadata -> dubbo.endpoints里面,会有port以及对应的协议。

    1)元数据默认方式:

    • 服务提供者提供MetadataService,会向注册中心进行注册,用的老一套的接口级注册


    2)元数据向元数据中心注册的方式:


    3.服务引入

    @DubboReference
    -> ReferenceConfig
    -> ReferenceConfig#get 生成代理对象
    -> RegistryProtocol#refer
    -> RegistryProtocol#doRefer 生成MigrationInvoker
    A)FORCE_INTERFACE,强制使用接口级服务引入
    B)FORCE_APPLICATION,强制使用应用级服务引入
    C)APPLICATION_FIRST,智能选择是接口级还是应用级,默认就是这个

    总结一下整体服务消费者(调用者)的查找流程:

    • step1.根据服务名/接口名DemoService,去dubbo/mapping查找应用名
    • step2.根据应用名dubbo-springboot-demo-provider去应用级注册信息services查找实例,可能有多个(192.168.65.61:20881),然后找出实例值里面的dubbo.metadata.storage-type(local后者remote)以及dubbo.endpoints(port, protocol)。
    • step3.根据dubbo.metadata.storage-type查找实例元数据信息(远程就是元数据中心,如果共用zk,就是dubbo/metadata;如果是local,就是调用dubbo服务MetadataService),获取到服务(接口)对应的协议,例如DemoService:tri
    • step4.找到服务接口 + 协议后,根据step2中获取到的dubbo.endpoints,就能获取到服务接口+协议对应的端口
    • step5.最后遍历所有实例,生成URL(跟dubbo2.7一样),然后过滤(比如@DubboReference(protocol = "dubbo")),最终生成对应的Invoker(TripleInvoke和DubooInvoker),包装成ClusterInvoker,最后生成接口DemoServcie的代理对象。
      tri://实例ip:对应端口/DemoService
      dubbo://实例ip:对应端口/DemoService
    • step6.调用
      ClusterInvoker#invoke -> TripleInvoker#invoke -> 使用指定的协议发送请求到服务提供者:实例ip + triple协议端口

    ServiceDiscoveryRegistryDirectory#subscribe
    -> ServiceDiscoveryRegistry#doSubscribe
    -> serviceNameMapping.getAndListen(),获取接口对应的应用名
    -> serviceDiscovery.getInstances() 根据应用名,从/services/应用名节点查出所有实例
    -> serviceDiscovery.getRemoteMetadata(),获取应用的元数据(从元数据中心或元数据服务获取)

    服务消费者监听什么变化?

    • 监听 /dubbo/services/应用名节点 下面的变化,就是服务实例重启后发生的变化
    • ServiceInstancesChangedListener#doOnEvent
      调用serviceDiscovery.getRemoteMetadata(),重新去获取应用的元数据
    • 另外缓存metaCacheManager有过期机制,会更新

    MigrationInvoker

    # dubbo.application.service-discovery.migration 仅支持通过 -D 以及 全局配置中心 两种方式进行配置。
    dubbo.application.service-discovery.migration=APPLICATION_FIRST
    
    # 可选值 
    # FORCE_INTERFACE,强制使用接口级服务引入
    # FORCE_APPLICATION,强制使用应用级服务引入
    # APPLICATION_FIRST,智能选择是接口级还是应用级,默认就是这个
    

    事实上,在进行某个服务的服务引入时,会统一利用InterfaceCompatibleRegistryProtocol的refer来生成一个MigrationInvoker对象,在MigrationInvoker中有三个属性:

    private volatile ClusterInvoker<T> invoker;  // 用来记录接口级ClusterInvoker
    private volatile ClusterInvoker<T> serviceDiscoveryInvoker; // 用来记录应用级的ClusterInvoker
    private volatile ClusterInvoker<T> currentAvailableInvoker; // 用来记录当前使用的ClusterInvoker,要么是接口级,要么应用级
    

    RegistryProtocol#doRefer

    • migrationInvoke = getMigrationInvoker()
    • RegistryProtocol#interceptInvoker()
      -> listener.onRefer()
      -> MigrationRuleListener#onRefer
      -> MigrationRuleHandler#doMigrate
      -> MigrationInvoker#migrateToApplicationFirstInvoker
      -> refreshInterfaceInvoker() 接口级ClusterInvoker (2.7版本)
       refreshServiceDiscoveryInvoker 应用级ClusterInvoker (3.0版本)
       calcPreferredInvoker()选择哪个ClusterInvoker的逻辑

    MigrationInvoker#invoke

        @Override
        public Result invoke(Invocation invocation) throws RpcException {
            // currentAvailableInvoker要么是接口级ClusterInvoker,要么是应用级ClusterInvoker
            if (currentAvailableInvoker != null) {
                if (step == APPLICATION_FIRST) {
    
                    // call ratio calculation based on random value
                    // 在同时支持接口级和应用级的情况下,如果promotion小于100,则每次调用时,生成一个100以内的随机数,如果随机数大于promotion,则走接口级ClusterInvoker进行服务调用
                    // 表示支持部分走接口级调用,部分走应用级调用,看随机数
                    // promotion默认等于100,所以默认不会支持部分
                    if (promotion < 100 && ThreadLocalRandom.current().nextDouble(100) > promotion) {
                        return invoker.invoke(invocation);
                    }
                }
    
                return currentAvailableInvoker.invoke(invocation);
            }
    
            switch (step) {
                case APPLICATION_FIRST:
                    if (checkInvokerAvailable(serviceDiscoveryInvoker)) {
                        currentAvailableInvoker = serviceDiscoveryInvoker;
                    } else if (checkInvokerAvailable(invoker)) {
                        currentAvailableInvoker = invoker;
                    } else {
                        currentAvailableInvoker = serviceDiscoveryInvoker;
                    }
                    break;
                case FORCE_APPLICATION:
                    currentAvailableInvoker = serviceDiscoveryInvoker;
                    break;
                case FORCE_INTERFACE:
                default:
                    currentAvailableInvoker = invoker;
            }
    
            return currentAvailableInvoker.invoke(invocation);
        }
    

    4.服务调用

    4.1 服务导出TripleProtocol协议的流程

    TripleProtocol#export

        public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
            URL url = invoker.getUrl();
            checkProtobufVersion(url);
            // 本地注册--->实例类
            String key = serviceKey(url);
    
            // 服务导出器,用来卸载服务时做一些善后处理
            final AbstractExporter<T> exporter = new AbstractExporter<T>(invoker) {
                @Override
                public void afterUnExport() {
                    pathResolver.remove(url.getServiceKey());
                    pathResolver.add(url.getServiceModel().getServiceModel().getInterfaceName(),
                        invoker);
                    // set service status
                    triBuiltinService.getHealthStatusManager()
                        .setStatus(url.getServiceKey(), ServingStatus.NOT_SERVING);
                    triBuiltinService.getHealthStatusManager()
                        .setStatus(url.getServiceInterface(), ServingStatus.NOT_SERVING);
                    exporterMap.remove(key);
                }
            };
    
            exporterMap.put(key, exporter);
    
            invokers.add(invoker);
    
            pathResolver.add(url.getServiceKey(), invoker); // url 20882
            pathResolver.add(url.getServiceModel().getServiceModel().getInterfaceName(), invoker);
    
            // set service status
            triBuiltinService.getHealthStatusManager()
                .setStatus(url.getServiceKey(), HealthCheckResponse.ServingStatus.SERVING);
            triBuiltinService.getHealthStatusManager()
                .setStatus(url.getServiceInterface(), HealthCheckResponse.ServingStatus.SERVING);
    
            // 启动服务器,用来处理HTTP2的请求
            PortUnificationExchanger.bind(invoker.getUrl());
            return exporter;
        }
    

    PortUnificationExchanger#bind

        public static void bind(URL url) {
            // servers表示可以同时运行多个PortUnificationServer,只需要绑定的host+port不一样即可
            servers.computeIfAbsent(url.getAddress(), addr -> {
                final PortUnificationServer server = new PortUnificationServer(url);
                // 运行NettyServer,并绑定ip和port
                server.bind();
                return server;
            });
        }
    
        public void bind() {
            if (channel == null) {
                doOpen();
            }
        }
    
        protected void doOpen() {
            bootstrap = new ServerBootstrap();
    
            bossGroup = NettyEventLoopFactory.eventLoopGroup(1, EVENT_LOOP_BOSS_POOL_NAME);
            workerGroup = NettyEventLoopFactory.eventLoopGroup(
                getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
                EVENT_LOOP_WORKER_POOL_NAME);
    
            bootstrap.group(bossGroup, workerGroup)
                .channel(NettyEventLoopFactory.serverSocketChannelClass())
                .option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
                .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childHandler(new ChannelInitializer<SocketChannel>() {
    
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        // FIXME: should we use getTimeout()?
                        int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
                        // ch是Socket连接
                        final ChannelPipeline p = ch.pipeline();
    //                        p.addLast(new LoggingHandler(LogLevel.DEBUG));
    
                        final boolean enableSsl = getUrl().getParameter(SSL_ENABLED_KEY, false);
                        if (enableSsl) {
                            p.addLast("negotiation-ssl", new SslServerTlsHandler(getUrl()));
                        }
    
                        // 初始化SocketChannel,并在pipeline中绑定PortUnificationServerHandler
                        final PortUnificationServerHandler puHandler = new PortUnificationServerHandler(url, protocols);
                        p.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS));
                        p.addLast("negotiation-protocol", puHandler);
                        channelGroup = puHandler.getChannels();
                    }
                });
            // bind
    
            String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
            int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
            if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
                bindIp = ANYHOST_VALUE;
            }
            InetSocketAddress bindAddress = new InetSocketAddress(bindIp, bindPort);
            ChannelFuture channelFuture = bootstrap.bind(bindAddress);
            channelFuture.syncUninterruptibly();
            channel = channelFuture.channel();
        }
    

    核心是PortUnificationServerHandler。


    ByteToMessageDecoder#channelRead

        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            if (msg instanceof ByteBuf) {
                CodecOutputList out = CodecOutputList.newInstance();
                try {
                    first = cumulation == null;
                    cumulation = cumulator.cumulate(ctx.alloc(),
                            first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);
                    callDecode(ctx, cumulation, out);
                } catch (DecoderException e) {
                    throw e;
                } catch (Exception e) {
                    throw new DecoderException(e);
                } finally {
                    try {
                        if (cumulation != null && !cumulation.isReadable()) {
                            numReads = 0;
                            cumulation.release();
                            cumulation = null;
                        } else if (++numReads >= discardAfterReads) {
                            // We did enough reads already try to discard some bytes so we not risk to see a OOME.
                            // See https://github.com/netty/netty/issues/4275
                            numReads = 0;
                            discardSomeReadBytes();
                        }
    
                        int size = out.size();
                        firedChannelRead |= out.insertSinceRecycled();
                        fireChannelRead(ctx, out, size);
                    } finally {
                        out.recycle();
                    }
                }
            } else {
                ctx.fireChannelRead(msg);
            }
        }
    

    ByteToMessageDecoder#callDecode

        protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
            try {
                while (in.isReadable()) {
                    int outSize = out.size();
    
                    if (outSize > 0) {
                        fireChannelRead(ctx, out, outSize);
                        out.clear();
    
                        // Check if this handler was removed before continuing with decoding.
                        // If it was removed, it is not safe to continue to operate on the buffer.
                        //
                        // See:
                        // - https://github.com/netty/netty/issues/4635
                        if (ctx.isRemoved()) {
                            break;
                        }
                        outSize = 0;
                    }
    
                    int oldInputLength = in.readableBytes();
                    decodeRemovalReentryProtection(ctx, in, out);
    
                    // Check if this handler was removed before continuing the loop.
                    // If it was removed, it is not safe to continue to operate on the buffer.
                    //
                    // See https://github.com/netty/netty/issues/1664
                    if (ctx.isRemoved()) {
                        break;
                    }
    
                    if (outSize == out.size()) {
                        if (oldInputLength == in.readableBytes()) {
                            break;
                        } else {
                            continue;
                        }
                    }
    
                    if (oldInputLength == in.readableBytes()) {
                        throw new DecoderException(
                                StringUtil.simpleClassName(getClass()) +
                                        ".decode() did not read anything but decoded a message.");
                    }
    
                    if (isSingleDecode()) {
                        break;
                    }
                }
            } catch (DecoderException e) {
                throw e;
            } catch (Exception cause) {
                throw new DecoderException(cause);
            }
        }
    
        final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
                throws Exception {
            decodeState = STATE_CALLING_CHILD_DECODE;
            try {
                decode(ctx, in, out);
            } finally {
                boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
                decodeState = STATE_INIT;
                if (removePending) {
                    fireChannelRead(ctx, out, out.size());
                    out.clear();
                    handlerRemoved(ctx);
                }
            }
        }
    

    这里会调用PortUnificationServerHandler#decode

        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
           // ctx.channel() 拿到的是NioSocketChannel 接收到数据ByteBuf后,会先解码,然后再触发Handler的channelRead
    
            // 根据前5个字节确定对应的协议
            // Will use the first five bytes to detect a protocol.
            if (in.readableBytes() < 5) {
                return;
            }
    
            // 看是不是HTTP2.0
            for (final WireProtocol protocol : protocols) {
                in.markReaderIndex();
                final ProtocolDetector.Result result = protocol.detector().detect(ctx, in);
                in.resetReaderIndex();
                switch (result) {
                    case UNRECOGNIZED:
                        continue;
                    case RECOGNIZED:
                        // 符合个某个协议后,再給Socket连接对应的pipeline绑定Handler
                        protocol.configServerPipeline(url, ctx.pipeline(), sslCtx);
                        ctx.pipeline().remove(this);
                    case NEED_MORE_DATA:
                        return;
                    default:
                        return;
                }
            }
            // Unknown protocol; discard everything and close the connection.
            in.clear();
            ctx.close();
        }
    

    总结一下后续流程:

    • Http2ProtocolDetector#detect,如何要建立的是一个HTTP2连接,那么在建立完Socket连接后,客户端会发送一个连接前言,也就是一串字节(对应的字符串为:“PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n”),给到服务端,服务端从而知道要建立的是一个HTTP2的连接
    • 符合HTTP2协议后,会删除本身的Handler,通过TripleHttp2Protocol#configServerPipeline添加处理Triple协议的Handler
      1)Http2FrameCodec,配置HTTP2,比如max-concurrent-streams、max-frame-size;
      2)TripleServerConnectionHandler用来处理Http2PingFrame、Http2GoAwayFrame;
      3)Http2MultiplexHandler是用来创建子Channel的,并且ChannelInitializer是用来初始化子Channel,一个Socket连接对应一个NioSocketChannel,下层可以设置多个子Channel,每个子Channel对应一个HTTP2Stream(TripleCommandOutBoundHandler:将QueuedCommand转换成Http2StreamFrame,然后再发出去;TripleHttp2FrameServerHandler:处理的是HTTP2Stream所对应的子Channel,核心Handler, 用来处理Http2HeadersFrame、Http2DataFrame, lookupExecutor会根据服务url得到一个线程池,每个子Channel对应一个线程池?还是共享一个线程池?默认是共享一个。)
      4)TripleTailHandler释放ByteBuf的内存空间

    重点看一下TripleHttp2Protocol#configServerPipeline

            // Http2MultiplexHandler是用来创建子Channel的,并且ChannelInitializer是用来初始化子Channel
            // 一个Socket连接对应一个NioSocketChannel,下层可以设置多个子Channel,每个子Channel对应一个HTTP2Stream
            final Http2MultiplexHandler handler = new Http2MultiplexHandler(
    
                // Channel初始化器,用来初始化传入进来的Channel,比如HTTP2Stream所对应的Channel
                new ChannelInitializer<Channel>() {
                    @Override
                    protected void initChannel(Channel ch) {
                        final ChannelPipeline p = ch.pipeline();
                        // 将QueuedCommand转换成Http2StreamFrame,然后再发出去
                        p.addLast(new TripleCommandOutBoundHandler());
    
                        // TripleHttp2FrameServerHandler处理的是HTTP2Stream所对应的子Channel
                        // 核心Handler, 用来处理Http2HeadersFrame、Http2DataFrame, lookupExecutor会根据服务url得到一个线程池
                        // 每个子Channel对应一个线程池?还是共享一个线程池?默认是共享一个
                        p.addLast(new TripleHttp2FrameServerHandler(frameworkModel, lookupExecutor(url),
                            filters));
                    }
                });
    

    核心是TripleHttp2FrameServerHandler。

    4.2 服务引入TripleProtocol协议的流程

        public TripleInvoker(Class<T> serviceType,
            URL url,
            String acceptEncodings,
            ConnectionManager connectionManager,
            Set<Invoker<?>> invokers,
            ExecutorService streamExecutor) {
            super(serviceType, url, new String[]{INTERFACE_KEY, GROUP_KEY, TOKEN_KEY});
            this.invokers = invokers;
            // 与服务提供者建立Socket连接
            this.connection = connectionManager.connect(url);
            this.acceptEncodings = acceptEncodings;
            this.streamExecutor = streamExecutor;
        }
    

    MultiplexProtocolConnectionManager#connect

        public Connection connect(URL url) {
            // 协议相同的URL将对应同一个ConnectionManager
            final ConnectionManager manager = protocols.computeIfAbsent(url.getProtocol(), this::createSingleProtocolConnectionManager);
            return manager.connect(url);
        }
    

    SingleProtocolConnectionManager#connect

        public Connection connect(URL url) {
            if (url == null) {
                throw new IllegalArgumentException("url == null");
            }
            // 不同的address对应不同的Connection对象
            return connections.compute(url.getAddress(), (address, conn) -> {
                if (conn == null) {
                    final Connection created = new Connection(url);
                    created.getClosePromise().addListener(future -> connections.remove(address, created));
                    return created;
                } else {
                    conn.retain();
                    return conn;
                }
            });
        }
    
        public Connection(URL url) {
            url = ExecutorUtil.setThreadName(url, "DubboClientHandler");
            url = url.addParameterIfAbsent(THREADPOOL_KEY, DEFAULT_CLIENT_THREADPOOL);
            this.url = url;
            this.protocol = ExtensionLoader.getExtensionLoader(WireProtocol.class).getExtension(url.getProtocol());
            this.connectTimeout = url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT);
            this.remote = getConnectAddress();
            // 只是创建一个Bootstrap对象,并不会建立Socket连接
            this.bootstrap = create();
        }
    
        private Bootstrap create() {
            final Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(NettyEventLoopFactory.NIO_EVENT_LOOP_GROUP.get())
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                    .remoteAddress(remote)
                    .channel(socketChannelClass());
    
            final ConnectionHandler connectionHandler = new ConnectionHandler(this);
            bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
    
                @Override
                protected void initChannel(SocketChannel ch) {
                    final ChannelPipeline pipeline = ch.pipeline();
                    SslContext sslContext = null;
                    if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
                        pipeline.addLast("negotiation", new SslClientTlsHandler(url));
                    }
    
                    //.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                    // TODO support IDLE
    //                int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());
                    pipeline.addLast(connectionHandler);
                    protocol.configClientPipeline(url, pipeline, sslContext);
                    // TODO support Socks5
                }
            });
            return bootstrap;
        }
    

    构造TripleInvoker,创建connection只是创建一个Bootstrap对象,并不会建立Socket连接。

    4.3 调用逻辑

    4.3.1 服务消费者发送请求

    那Socket连接什么时候创建呢?

    • 第一次使用TripleInvoker#invoke
      -> TripleInvoker#doInvoke
      -> Connection#isAvailable
      -> Connection#connect
      -> Bootstrap#connect()
     StreamObserver<String> streamObserver = demoService.sayHelloBiStream(new ZhouyuResultStreamObserver());
     streamObserver.onNext("zhouyu1");
     streamObserver.onCompleted();
    

    TripleInvoker#doInvoke

    • 1)检查Socket连接是否可用,如果不可用并且没有初始化,那就连接服务端创建Socket连接
    • 2)拿到服务接口信息和当前调用的方法信息
    • 3)根据方法区分调用类型
      UNARY -> invokeUnary();
      SERVER_STREAM -> invokeServerStream();
      CLIENT_STREAM/ BI_STREAM -> invokeBiOrClientStream();

    TripleInvoker#invokeBiOrClientStream

    • 1)拿到处理响应的responseObserver,也就是上面调用方法时传入的ZhouyuResultStreamObserver;
    • 2)创建requestObserver,streamCall方法中就会创建一个Stream,并且返回一个StreamObserver对象,可以利用这个requestObserver来向Stream中发送数据,将responseObserver封装为ClientCall.Listener,ClientCall.Listener是用来接收响应数据的。

    TripleInvoker#streamCall

    • 将responseObserver封装为ClientCall.Listener
    • call.start()返回一个ClientStreamObserver,用来发送数据

    ClientCall#start

    • this.stream = new ClientStream(),在构造方法里面, this.writeQueue = createWriteQueue(parent);

    ClientStream#createWriteQueue

        private WriteQueue createWriteQueue(Channel parent) {
            // 利用Netty开启一个Http2StreamChannel,也就是HTTP2中的流
            final Http2StreamChannelBootstrap bootstrap = new Http2StreamChannelBootstrap(parent);
            final Future<Http2StreamChannel> future = bootstrap.open().syncUninterruptibly();
            if (!future.isSuccess()) {
                throw new IllegalStateException("Create remote stream failed. channel:" + parent);
            }
    
            // 并绑定两个Handler,一个工作在发送数据时,一个工作在接收数据时
            final Http2StreamChannel channel = future.getNow();
            channel.pipeline()
                .addLast(new TripleCommandOutBoundHandler())
                // TripleHttp2ClientResponseHandler是用来接收响应的
                .addLast(new TripleHttp2ClientResponseHandler(createTransportListener()));
    
            // 基于Http2StreamChannel创建一个WriteQueue
            // 后续把要发送的数据,添加到WriteQueue中就能发送出去了
            return new WriteQueue(channel);
        }
    

    当用户程序调用streamObserver.onNext()发送数据时,实际调用的是:

    • ClientCallToObserverAdapter#onNext
    • ClientCall#sendMessage
      1)发送请求头
      2)发送请求体
       把要发送的message进行序列化,得到字节数组;
       看是否需要压缩数据;
       stream.writeMessage(compress, compressed);发送数据,发送的是请求体,在请求体的最开始最记录当前请求体是否被压缩,压缩只会的数据长度是多少(DataQueueCommand,表示HTTP2中的Http2DataFrame,用的就是gRPC发送请求体的格式);

    发送的核心逻辑WriteQueue#enqueue(QueuedCommand):

    • queue.add(command);这里的queue是ConcurrentLinkedQueue
    • scheduleFlush();调用channel.eventLoop().execute(this::flush);
    • 发送逻辑在WriteQueue#flush
      1)while循环从queue.poll()获取发送数据cmd
      2)cmd.run(channel);将数据帧添加到Http2StreamChannel中,添加并不会立马发送,调用了flush才发送
      3)连续从队列中取到了128个数据帧就flush一次,channel.flush();这里处理高并发情况,128就发送一次
      4)如果i != 0,但是没有达到128,也会发送

    具体的发送:

    • 1)TripleCommandOutBoundHandler#write
    • 2)QueuedCommand#send
    • 3)DataQueueCommand#doSend

    DataQueueCommand#doSend

        public void doSend(ChannelHandlerContext ctx, ChannelPromise promise) {
            if (data == null) {
                ctx.write(new DefaultHttp2DataFrame(endStream), promise);
            } else {
                ByteBuf buf = ctx.alloc().buffer();
                // 第一个字节记录请求体是否被压缩
                buf.writeByte(compressFlag);
                // 后四个字节记录请求体的长度
                buf.writeInt(data.length);
                // 真实的数据
                buf.writeBytes(data);
                // 发送
                ctx.write(new DefaultHttp2DataFrame(buf, endStream), promise);
            }
        }
    

    总结一下:

    • 1)ClientCallToObserverAdapter#onNext
      1-1)ClientStream#sendHeader发送请求头,HeaderQueueCommand,endStream是false;
      1-2)ClientStream#writeMessage发送请求体,DataQueueCommand,endStream是false;
    • 2)ClientCallToObserverAdapter#onCompleted,DefaultHttp2DataFrame,endStream是true。

    4.3.2 服务提供者处理请求逻辑

    TripleHttp2FrameServerHandler#channelRead

        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            if (msg instanceof Http2HeadersFrame) {
                onHeadersRead(ctx, (Http2HeadersFrame) msg);
            } else if (msg instanceof Http2DataFrame) {
                onDataRead(ctx, (Http2DataFrame) msg);
            } else if (msg instanceof ReferenceCounted) {
                // ignored
                ReferenceCountUtil.release(msg);
            }
        }
    

    处理请求头TripleHttp2FrameServerHandler#onHeadersRead

    • 1)收到一个Http2HeadersFrame时,生成一个ServerStream,和ClientStream对应,此处ctx.channel()拿到的是子Channel, 对应的是Http2StreamChannel,表示流
    • 2)ServerStream.ServerTransportObserver#processHeader
       A)各种校验
       B)获取接口名(服务名)serviceName以及方法名originalMethodName,根据(serviceName, group, version)从本地注册表pathResolver中取出Invoker。
       C)得到数据解压器TriDecoder,会解压数据,并把解压后的数据交给ServerDecoderListener进行处理
       D)创建ReflectionServerCall,传入进来的executor是SerializingExecutor,但是在构造方法里会再包一层,以call里的executor为 SerializingExecutor(SerializingExecutor(ThreadPoolExecutor))SerializingExecutor;并调用ReflectionServerCall#startCall
        D-1)ReflectionServerCall#doStartCall
        D-2)调用至ReflectionServerCall.ServerStreamListenerImpl#startCall
        D-3)调用至ReflectionServerCall.ServerStreamListenerImpl#trySetListener
        D-4)调用至ServerCall#startInternalCall,这里会创建ServerCall.Listener:
        UNARY:UnaryServerCallListener
        SERVER_STREAM:ServerStreamServerCallListener
        BI_STREAM或CLIENT_STREAM:BiStreamServerCallListener

    处理请求体TripleHttp2FrameServerHandler#onDataRead

    • 1)ServerStream.ServerTransportObserver#doOnData
    • 2)deframer.deframe(data);这里deframer就是TriDecoder
    • 3)TriDecoder#deframe处理请求体
    • 4)TriDecoder#deliver
      4-1)processHeader();处理请求体的前5个字节,第一个字节表示是否压缩;后四个字节表示请求体长度。
      4-2)processBody();处理实际发过来的数据。如果压缩了,先解压缩。
      ServerCall.ServerStreamListenerBase#onMessage
      ReflectionServerCall.ServerStreamListenerImpl#doOnMessage
       A)obj = packableMethod.getRequestUnpack().unpack(message);把解压之后的字节数据进行反序列化
       B)listener.onMessage(obj);
        UNARY:UnaryServerCallListener
        SERVER_STREAM:ServerStreamServerCallListener
        BI_STREAM或CLIENT_STREAM:BiStreamServerCallListener

    UNARY调用逻辑(同步阻塞)

    • 1)ClientCallToObserverAdapter#onNext发送请求头和请求体。
      服务提供者接收到请求体才执行:UnaryServerCallListener#onMessage这里invocation.setArguments()会把接收到的数据作为方法参数,但是还没有调用Invoker#invoke方法。
    • 2)ClientCallToObserverAdapter#onCompleted发送空的请求体,但是endStream是true。
      服务提供者:ServerStream.ServerTransportObserver#doOnData中,endStream是true,会调用deframer.close();
      TriDecoder#close,这里closing = true;
      -> TriDecoder#deliver()
      -> ServerStream.ServerTransportObserver.ServerDecoderListener#close
      -> ReflectionServerCall.ServerStreamListenerImpl#complete
      -> UnaryServerCallListener#onComplete 这里会调用invoke()
       A)会执行final Result response = invoker.invoke(invocation);
       B)onReturn(r.getValue());实际为UnaryServerCallListener#onReturn方法执行完后,把方法结果写回给客户端。
       B-1)responseObserver.onNext(value);
       B-2)responseObserver.onCompleted(TriRpcStatus.OK);

    SERVER_STREAM调用逻辑(异步)

    • 1)ClientCallToObserverAdapter#onNext发送请求头和请求体。
      服务提供者接收到请求体才执行:ServerStreamServerCallListener#onMessage这里invocation.setArguments(new Object[]{message, responseObserver});把接收到的数据和ServerCallToObserverAdapter作为方法调用的参数,ServerCallToObserverAdapter可以用来可以客户端发送数据。
    • 2)ServerStreamServerCallListener#onComplete也是调用invoke()。

    在Invoker#invoker里面就可以调用responseObserver向客户端发送响应。

        public void sayHelloServerStream(String name, StreamObserver<String> response) {
    
            response.onNext(name + " hello");
    
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            response.onNext(name + " world");
    
            response.onCompleted();
    
        }
    

    BI_STREAM或CLIENT_STREAM(异步)

    • 1)ClientCallToObserverAdapter#onNext发送请求头;
      服务提供者接收到请求头就执行,在BiStreamServerCallListener构造方法里面:
      1-1)invocation.setArguments(new Object[]{responseObserver});构造监听器的时候,把服务端流对象设置为业务方法参数;
      1-2)invoke(); 执行业务方法-->onReturn,在onReturn里面this.requestObserver = (StreamObserver<Object>) value;会将程序员创建的StreamObserver赋值给requestObserver。
    • 2)ClientCallToObserverAdapter#onNext发送请求体:
      BiStreamServerCallListener#onMessage调用requestObserver.onNext(message);也即程序员写的StreamObserver.onNext()。
        public StreamObserver<String> sayHelloBiStream(StreamObserver<String> response) {
            return new StreamObserver<String>() {
                @Override
                public void onNext(String name) {
                    System.out.println(name);
                    response.onNext("hello: "+name);
                }
    
                @Override
                public void onError(Throwable throwable) {
                }
    
                @Override
                public void onCompleted() {
                    System.out.println("completed");
                }
            };
        }
    

    4.3.3 客户端接收到响应TripleHttp2ClientResponseHandler

    TripleHttp2ClientResponseHandler#channelRead0

        protected void channelRead0(ChannelHandlerContext ctx, Http2StreamFrame msg) throws Exception {
            if (msg instanceof Http2HeadersFrame) {
                final Http2HeadersFrame headers = (Http2HeadersFrame) msg;
                transportListener.onHeader(headers.headers(), headers.isEndStream());
            } else if (msg instanceof Http2DataFrame) {
                final Http2DataFrame data = (Http2DataFrame) msg;
                transportListener.onData(data.content(), data.isEndStream());
            } else {
                super.channelRead(ctx, msg);
            }
        }
    

    ClientStream.ClientTransportListener#onData

            public void onData(ByteBuf data, boolean endStream) {
                executor.execute(() -> {
    
                    // transportError不等于null,表示处理响应头时就有问题了
                    if (transportError != null) {
                        transportError.appendDescription(
                            "Data:" + data.toString(StandardCharsets.UTF_8));
                        // 释放内存空间
                        ReferenceCountUtil.release(data);
                        //
                        if (transportError.description.length() > 512 || endStream) {
                            handleH2TransportError(transportError);
    
                        }
                        return;
                    }
    
                    if (!headerReceived) {
                        handleH2TransportError(TriRpcStatus.INTERNAL.withDescription(
                            "headers not received before payload"));
                        return;
                    }
    
                    // 接收到响应体数据后,把数据添加到accumulate中进行保存
                    deframer.deframe(data);
                });
            }
    

    ClientStream.ClientTransportListener#onData
    -> TriDecoder#deframe
    -> TriDecoder#deliver
    -> TriDecoder#processBody
    -> ClientCall.ClientStreamListenerImpl#onMessage
    -> listener.onMessage(unpacked);有两种方式:同步和异步
    UnaryClientCallListener#onMessage同步
    ObserverToClientCallListenerAdapter#onMessage异步

    ObserverToClientCallListenerAdapter#onMessage异步

        public void onMessage(Object message) {
            // 接收到一个响应结果,回调StreamObserver
            delegate.onNext(message);
            // 继续处理下一个响应结果
            if (call.isAutoRequestN()) {
                call.requestN(1);
            }
        }
    

    上面的delegate就是服务消费者这边创建的ZhouyuResultStreamObserver:

        static class ZhouyuResultStreamObserver implements StreamObserver<String> {
    
    
            @Override
            public void onNext(String data) {
                System.out.println(data);
            }
    
            @Override
            public void onError(Throwable throwable) {
                System.out.println(throwable);
            }
    
            @Override
            public void onCompleted() {
                System.out.println("complete");
            }
        }
    

    梳理一下客户端AbstractInvoker#invoke整个流程

    • 1)AsyncRpcResult asyncResult = doInvokeAndReturn(invocation);
      1-1)TripleInvoker#doInvoke
       A)UNARY,invokeUnary()返回是一个正常的CompletableFuture,未完成,后面就会阻塞。
       B)SERVER_STREAM,invokeServerStream()返回return new AsyncRpcResult(CompletableFuture.completedFuture(new AppResponse()), invocation);直接返回一个已经完成了的CompletableFuture,外层invoke阻塞的地方将直接通过,异步的效果。
       C)CLIENT_STREAM和BI_STREAM,invokeBiOrClientStream()返回的也是一个已经完成了的CompletableFuture。
    • 2) waitForResultIfSync(asyncResult, invocation);如果需要同步就阻塞
      2-1)responseFuture.get(timeout, unit),这里就会判断responseFuture,如果没有完成就阻塞。

    同步阻塞什么时候唤醒?

    • 服务提供者调用StreamObserver#onCompleted后,服务消费者就会调用到UnaryClientCallListener#onClose
    • 这里future.received(status, result);触发DeadlineFuture完毕(调用CompletableFuture#complete),解开TripleInvoker的invoke方法中的同步阻塞

    4.3.4 SERVER_STREAM的一个BUG

    服务提供者逻辑如下。

        public void sayHelloServerStream(String name, StreamObserver<String> response) {
    
            response.onNext(name + " hello");
    
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            response.onNext(name + " world");
    
            response.onCompleted();
    
        }
    

    预期是服务消费者拿到一个数据帧后,停3s,然后再拿到后续的数据。但是Dubbo这里是直接等3s,然后一下子拿到了所有三个数据。

    整体流程:

    • 服务提供者处理请求TripleHttp2FrameServerHandler#onDataRead
      1)executor.execute(() -> doOnData(data, endStream));这里线程池是SerializingExecutor
      2)利用一个线程去执行SerializingExecutor中的run方法,从runQueue获取任务执行,也就是执行上面的lambda表达式。
    • 服务提供者获取到请求体后就会调用invoker.invoke(invocation)执行具体的方法;
    • 在服务方法里面会调用responseObserver.onNext,也即ServerCallToObserverAdapter#onNext向客户端发送数据。
      1)executor.execute(writeMessage);将Runnable添加到队列中,此处的executor为SerializingExecutor(SerializingExecutor(ThreadPoolExecutor)),内部的SerializingExecutor(ThreadPoolExecutor)就是用来接收请求数据的。
      执行效果:
      A)首先将writeMessage加入到外面的SerializingExecutor队列中;
      B)然后执行里面SerializingExecutor的execute时,将writeMessage加入到里面SerializingExecutor的队列中,然后由于里面SerializingExecutor的状态已经改为了true,所以就没法执行下去,直接返回了,所以这了数据并没有发送出去。
    • 那什么时候数据才能发出去呢?就是要里面SerializingExecutor的状态改为false才可以,这时候需要整个方法sayHelloServerStream()都执行完后才行。

    建议更改:

    • ServerCall#writeMessage中executor.execute(writeMessage)直接改为doWriteMessage(message),不需要添加到线程池,直接发送即可。

    SerializingExecutor#execute

        public void execute(Runnable r) {
            // SerializingExecutor会对加入到runQueue中的Runnable用一个线程进行串行处理
    
            // 将Runnable任务添加到队列
            runQueue.add(r);
    
            // 使用内部线程池executor中的一个线程来运行
            schedule(r);
        }
    

    SerializingExecutor#schedule

        private void schedule(Runnable removable) {
            if (atomicBoolean.compareAndSet(false, true)) {
                boolean success = false;
                try {
                    // SerializingExecutor内部保护了一个线程池executor,这个线程池是根据服务url创建出来的
                    // 注意:这里并不是把runQueue队列中的Runnable任务拿出来用线程去执行
                    // 而是把SerializingExecutor自己作为一个Runnable交给线程池中的一个线程去执行
                    // 这里其实就是利用一个线程去执行SerializingExecutor中的run方法,从而获取runQueue中的任务进行执行
                    executor.execute(this);
                    success = true;
                } finally {
                    // It is possible that at this point that there are still tasks in
                    // the queue, it would be nice to keep trying but the error may not
                    // be recoverable.  So we update our state and propagate so that if
                    // our caller deems it recoverable we won't be stuck.
                    if (!success) {
                        if (removable != null) {
                            // This case can only be reached if 'this' was not currently running, and we failed to
                            // reschedule.  The item should still be in the queue for removal.
                            // ConcurrentLinkedQueue claims that null elements are not allowed, but seems to not
                            // throw if the item to remove is null.  If removable is present in the queue twice,
                            // the wrong one may be removed.  It doesn't seem possible for this case to exist today.
                            // This is important to run in case of RejectedExecutionException, so that future calls
                            // to execute don't succeed and accidentally run a previous runnable.
                            runQueue.remove(removable);
                        }
                        atomicBoolean.set(false);
                    }
                }
            }
        }
    

    SerializingExecutor#run

        public void run() {
            Runnable r;
            try {
                while ((r = runQueue.poll()) != null) {
                    try {
                        r.run();
                    } catch (RuntimeException e) {
                        LOGGER.error("Exception while executing runnable " + r, e);
                    }
                }
            } finally {
                atomicBoolean.set(false);
            }
    
            // 如果队列中不为空,则继续获取一个线程执行run(),继续获取队列中的任务进行执行
            if (!runQueue.isEmpty()) {
                // we didn't enqueue anything but someone else did.
                schedule(null);
            }
        }
    

    相关文章

      网友评论

          本文标题:Dubbo 3.0源码剖析

          本文链接:https://www.haomeiwen.com/subject/ygbokdtx.html