美文网首页
Dubbo 3.0源码剖析

Dubbo 3.0源码剖析

作者: 王侦 | 来源:发表于2023-02-24 17:00 被阅读0次

1.手写模拟Dubbo

  • 定义一个接口,服务提供者需要实现该接口。该接口需要打成一个jar包,然后服务提供者和调用者都要依赖该接口的jar包。
  • Dubbo需要处理Http请求(HttpServer)、Dubbo请求
  • 服务提供者需要启动HttpServer,比如Tomcat还要添加DispatchServlet
  • DispatchServlet#service调用Handler处理请求,服务调用者RPC调用服务提供者,需要传递的参数包括(封装成Invocation,序列化与反序列化统一为JDK序列化方式):
     接口名
     方法名
     方法参数类型列表
     方法参数值列表
     版本号
  • 服务提供者从请求里面拿到Invocation对象后,需要找到对应接口实现类的执行方法。那到底可以调用哪些实现类呢?服务提供者这边需要对可以向外暴露的实现类进行本地注册。
    然后根据Invocation中的接口拿到实现类,然后使用Invocation方法名、参数调用实现类对应的方法。
  • 如果服务提供者对应接口有多个版本的实现类,则Invocation需要加入版本号,然后通过指定版本号执行指定版本的实现类。
  • 服务调用者需要远程调用,比如HttpClient,对Invocation进行JDK序列化,发送请求,然后接收响应。
  • 服务调用者升级)可直接拿到接口的动态代理实现类,然后直接通过该实现类进行RPC调用。把RPC调用逻辑封装到该代理实现类里面。ProxyFactory#getProxy获取JDK动态代理,将RPC逻辑封装到InvocationHandler#invoke里面去。
  • 注册中心+负载均衡)服务提供者地址:ip + port需要可配置,而不是写死。需要根据服务名称可以直接获取到ip + port。这其实就是注册中心的功能,服务提供者需要向注册中心进行远程注册,存入Map<接口名,List<URL>>。然后服务调用者从注册中心拉取服务提供者信息,然后进行负载均衡选取一个服务提供者的URL,然后进行RPC调用。
  • 服务容错+重试服务)调用服务出错,调用指定的容错类方法,该方法的逻辑非常灵活,可以返回错误说明等。某个服务调用失败,可以进行指定次数的重试,下次重试调用要避开失败的服务实例。
  • Mock机制)模拟、伪造服务提供者。
  • 支持多协议)比如支持Tomcat以及Netty。抽象出一个接口Protocol,start()启动协议,send()基于协议发送数据。然后DubboProtocol(基于Netty)和HttpProtocol(基于Tomcat)都要实现Protocol接口。然后再提供一个ProtocolFactory工厂,根据传入的类型创建对应的协议。最后通过配置动态切换协议,程序代码不用变。(更进一步)服务提供者向注册中心注册的时候,带上协议,也即URL里面加入一个protocol。服务调用者可以根据拉取到的服务实例中URL的protocol创建对应的协议进行RPC调用。
  • 优化升级,向Dubbo靠拢
    1)URL除了protocol,hostname,port,再增加两个信息:interfaceName以及implClass。
    2)Protocol接口更改:export和refer方法。
    3)实现类DubboProtocol和HttpProtocol。
    服务提供者调用DubboProtocol#export向本地以及远程注册URL,并且启动Netty;服务调用者调用DubboProtocol#refer生成DubboInvoker对象。
    HttpProtocol#export向本地及远程注册URL,并且启动HttpServer;HttpProtocol#refer生成HttpInvoker对象。
    4)Invoker接口:invoke(Invocation);
    5)DubboInvoker及HttpInvoker实现Invoker接口。DubboInvoker通过netty实现调用,参数包括url及Invocation。HttpInvoker通过HttpClient实现调用,参数包括url及Invocation。
    6)CulsterInvoker也实现了Invoker接口,持有List<Invoker>。CulsterInvoker#join,根据服务接口名称获取List<URL>,然后对每一个URL生成对应的Invoker,加入到List<Invoker>里面。然后在CulsterInvoker#invoke里面进行负载均衡调用。

总结:

  • 1)核心是Protocol
  • 2)服务提供者,构建URL,然后根据URL中的Protocol获取实际的Protocol,调用其export进行服务导出。向本地以及远程注册URL,并且启动相应的服务器(Netty或者Tomcat)。
  • 3)服务调用者,动态代理,其InvocationHandler#invoke中首先创建Invocation,然后根据Invocation中的服务接口名从注册中心拿到对应的List<URL>,然后对每个URL根据其协议生成对应的Invoker(DubboInvoker或者HttpInvoker)。Invoker会根据对应的协议使用对应的客户端进行RPC调用,传递的参数就是Invocation。
  • 4)服务提供者,NettyServerHandler#channelRead、HttpServerHandler#handle会从参数Invocation根据服务接口名、版本号获取对应的实现服务(本地服务注册表),然后根据方法名、方法参数进行实际调用。

2.服务导出

几个核心步骤:

  • 1)构建服务URL(协议、ip、port)
  • 2)启动Tomcat/Netty,要先启动,后注册,否则调用端拉到URL,提供端没启动,就没法调通
  • 3)向注册中心注册(接口名:URL)

Dubbo接口级的服务注册 vs SpringCloud应用级的服务注册:


@EnableDubbo
-> @EnableDubboConfig
-> @Import(DubboConfigConfigurationRegistrar.class)
-> DubboConfigConfigurationRegistrar是个ImportBeanDefinitionRegistrar,调用registerBeanDefinitions()
-> DubboSpringInitializer#initialize
-> DubboBeanUtils#registerCommonBeans
-> 注册DubboDeployApplicationListener监听器监听ContextRefreshedEvent事件。
-> DubboDeployApplicationListener#onContextRefreshedEvent
-> DefaultModuleDeployer#start
1)服务导出exportServices();
2)服务引入referServices();
3)应用级注册onModuleStarted();

DubboDeployApplicationListener#onApplicationEvent,监听ContextRefreshedEvent事件。

    public void onApplicationEvent(ApplicationContextEvent event) {
        if (nullSafeEquals(applicationContext, event.getSource())) {
            if (event instanceof ContextRefreshedEvent) {
                onContextRefreshedEvent((ContextRefreshedEvent) event);
            } else if (event instanceof ContextClosedEvent) {
                onContextClosedEvent((ContextClosedEvent) event);
            }
        }
    }

2.1 Dubbo3.0之前接口级的服务注册

注册过程:

  • 1)@DubboService,解析注解得到实现类UserServiceImpl和接口UserService,生成一个对象ServiceConfig(包含UserService、UserServiceImpl、version等)
  • 2)ServiceConfig#export进行服务导出
    2-1)确定协议(应用配置),生成URL;
    2-2)启动Netty、Jetty;
    2-3)URL存到注册中心;

应用的每个接口都会注册到注册中心里面:


2.2 Dubbo3.0应用级的服务注册

Dubbo3.0为了兼容之前版本,既会进行接口级服务注册,也会进行应用级的服务注册。

注册过程:

  • 1)所有接口服务注册完成;(可以关掉register-mode:instance,只进行应用级注册);
  • 2)所有接口会向注册中心mapping注册一个<接口名:实例名(应用名)>
  • 3)(默认没有进行注册,metadata-type改为remote才启用)会将应用元数据信息(应用提供了哪些Dubbo服务接口)存到元数据中心的metadata(元数据中心和注册中心可以共用一个,也可以分开);(metadata-type为local启动服务接口,在这里进行导出)另外一个方法,服务提供者会提供一个元数据Dubbo服务接口MetadataService,消费者可以调用该服务的getMetadataInfo获取元数据信息。
  • 4)然后才进行应用级注册,只用注册一次,应用名: 实例ip+port,这里port默认取Dubbo协议端口,如果没有Dubbo则取第一个协议端口(ServiceInstanceHostPortCustomizer#customize);这里还会存值,包括dubbo.endpoints(port, protocol)。

如果服务暴露了Triple协议+20880端口,并且metadata-type为local,则应用级注册为:ip + 20881(优先是Dubbo的ip+port,这里是MetadataService)。因为此时MetadataService会暴露为Dubbo协议+20881(默认的20880被占用)。

怎么判断端口是否被占用?

  • MetadataService -> ServiceConfig#export -> NetUtils#getAvailablePort会对port进行遍历,new ServerSocket(i),如果抛异常,则表示被占用,则尝试i++。

只根据应用级注册的信息,服务消费者(调用者)怎么办?

  • 1)根据服务名(接口)可以拿到实例 ip+port。有两个问题,问题一,你怎么根据接口名找到应用名(实例名);问题二,你怎么确定该服务提供者是否导出了?
  • 2)问题一,就是通过注册时,注册<接口名:实例名(应用名)>解决;
  • 3)问题二,通过元数据中心metadata可以查到该应用实例提供了哪些接口服务、协议、超时时间(然后就可以生成对应的Invoker,比如DubboInvoker、TripleInvoker)

那这里,如果应用配置相同的协议(dubbo)有两个端口,在dubbo.endpoints里怎么处理?

  • 只会存一个,这里是个bug,放到Map<Protocol, Port>,这里会覆盖。
  • ProtocolPortsMetadataCustomizer#customize
  • 因此服务消费者只会生成一个URL,但是服务导出有两个URL。

如果配置Triple协议,有两个端口,强制走接口FORCE_INTERFACE,会有另外一个Bug,也只会调用一个端口,另一个端口无法调用。

  • ServiceConfig#doExportUrl,根据接口、实现类对象ref、服务URL生成一个Invoker;JavassistProxyFactory,执行服务接口方法时,最后都会调用到具体实现类ref的相应方法。
  • TripleProtocol#export服务导出时,会进行本地注册,放到path2Invoker.put(path, invoker),接口名字 -> invoker(实现类),这里如果协议相同、端口不同,第二个URL生成的Invoker会覆盖第一个URL的Invoker,本地注册有问题。
  • 那DubboProtocol#export为什么没有问题,因为本地注册时,它的key生成时,包含了端口号,而不仅仅是接口名字。

元数据中心特点:

  • 1)数据不怎么变化
  • 2)这里面的数据跟服务消费者(调用者)没有太大的关系

如上图,应用级注册的值里面也会标识metadata是local还是remote,也就是指示服务消费者是到元数据中心查询还是通过Dubbo调用服务提供者的MetadataService来获取元数据信息。

另外还有一个重要应用,通过元数据信息只能查到<接口/服务, 协议>,是没有端口的,那这个端口在哪里?就在应用级注册的值里面:

  • metadata -> dubbo.endpoints里面,会有port以及对应的协议。

1)元数据默认方式:

  • 服务提供者提供MetadataService,会向注册中心进行注册,用的老一套的接口级注册


2)元数据向元数据中心注册的方式:


3.服务引入

@DubboReference
-> ReferenceConfig
-> ReferenceConfig#get 生成代理对象
-> RegistryProtocol#refer
-> RegistryProtocol#doRefer 生成MigrationInvoker
A)FORCE_INTERFACE,强制使用接口级服务引入
B)FORCE_APPLICATION,强制使用应用级服务引入
C)APPLICATION_FIRST,智能选择是接口级还是应用级,默认就是这个

总结一下整体服务消费者(调用者)的查找流程:

  • step1.根据服务名/接口名DemoService,去dubbo/mapping查找应用名
  • step2.根据应用名dubbo-springboot-demo-provider去应用级注册信息services查找实例,可能有多个(192.168.65.61:20881),然后找出实例值里面的dubbo.metadata.storage-type(local后者remote)以及dubbo.endpoints(port, protocol)。
  • step3.根据dubbo.metadata.storage-type查找实例元数据信息(远程就是元数据中心,如果共用zk,就是dubbo/metadata;如果是local,就是调用dubbo服务MetadataService),获取到服务(接口)对应的协议,例如DemoService:tri
  • step4.找到服务接口 + 协议后,根据step2中获取到的dubbo.endpoints,就能获取到服务接口+协议对应的端口
  • step5.最后遍历所有实例,生成URL(跟dubbo2.7一样),然后过滤(比如@DubboReference(protocol = "dubbo")),最终生成对应的Invoker(TripleInvoke和DubooInvoker),包装成ClusterInvoker,最后生成接口DemoServcie的代理对象。
    tri://实例ip:对应端口/DemoService
    dubbo://实例ip:对应端口/DemoService
  • step6.调用
    ClusterInvoker#invoke -> TripleInvoker#invoke -> 使用指定的协议发送请求到服务提供者:实例ip + triple协议端口

ServiceDiscoveryRegistryDirectory#subscribe
-> ServiceDiscoveryRegistry#doSubscribe
-> serviceNameMapping.getAndListen(),获取接口对应的应用名
-> serviceDiscovery.getInstances() 根据应用名,从/services/应用名节点查出所有实例
-> serviceDiscovery.getRemoteMetadata(),获取应用的元数据(从元数据中心或元数据服务获取)

服务消费者监听什么变化?

  • 监听 /dubbo/services/应用名节点 下面的变化,就是服务实例重启后发生的变化
  • ServiceInstancesChangedListener#doOnEvent
    调用serviceDiscovery.getRemoteMetadata(),重新去获取应用的元数据
  • 另外缓存metaCacheManager有过期机制,会更新

MigrationInvoker

# dubbo.application.service-discovery.migration 仅支持通过 -D 以及 全局配置中心 两种方式进行配置。
dubbo.application.service-discovery.migration=APPLICATION_FIRST

# 可选值 
# FORCE_INTERFACE,强制使用接口级服务引入
# FORCE_APPLICATION,强制使用应用级服务引入
# APPLICATION_FIRST,智能选择是接口级还是应用级,默认就是这个

事实上,在进行某个服务的服务引入时,会统一利用InterfaceCompatibleRegistryProtocol的refer来生成一个MigrationInvoker对象,在MigrationInvoker中有三个属性:

private volatile ClusterInvoker<T> invoker;  // 用来记录接口级ClusterInvoker
private volatile ClusterInvoker<T> serviceDiscoveryInvoker; // 用来记录应用级的ClusterInvoker
private volatile ClusterInvoker<T> currentAvailableInvoker; // 用来记录当前使用的ClusterInvoker,要么是接口级,要么应用级

RegistryProtocol#doRefer

  • migrationInvoke = getMigrationInvoker()
  • RegistryProtocol#interceptInvoker()
    -> listener.onRefer()
    -> MigrationRuleListener#onRefer
    -> MigrationRuleHandler#doMigrate
    -> MigrationInvoker#migrateToApplicationFirstInvoker
    -> refreshInterfaceInvoker() 接口级ClusterInvoker (2.7版本)
     refreshServiceDiscoveryInvoker 应用级ClusterInvoker (3.0版本)
     calcPreferredInvoker()选择哪个ClusterInvoker的逻辑

MigrationInvoker#invoke

    @Override
    public Result invoke(Invocation invocation) throws RpcException {
        // currentAvailableInvoker要么是接口级ClusterInvoker,要么是应用级ClusterInvoker
        if (currentAvailableInvoker != null) {
            if (step == APPLICATION_FIRST) {

                // call ratio calculation based on random value
                // 在同时支持接口级和应用级的情况下,如果promotion小于100,则每次调用时,生成一个100以内的随机数,如果随机数大于promotion,则走接口级ClusterInvoker进行服务调用
                // 表示支持部分走接口级调用,部分走应用级调用,看随机数
                // promotion默认等于100,所以默认不会支持部分
                if (promotion < 100 && ThreadLocalRandom.current().nextDouble(100) > promotion) {
                    return invoker.invoke(invocation);
                }
            }

            return currentAvailableInvoker.invoke(invocation);
        }

        switch (step) {
            case APPLICATION_FIRST:
                if (checkInvokerAvailable(serviceDiscoveryInvoker)) {
                    currentAvailableInvoker = serviceDiscoveryInvoker;
                } else if (checkInvokerAvailable(invoker)) {
                    currentAvailableInvoker = invoker;
                } else {
                    currentAvailableInvoker = serviceDiscoveryInvoker;
                }
                break;
            case FORCE_APPLICATION:
                currentAvailableInvoker = serviceDiscoveryInvoker;
                break;
            case FORCE_INTERFACE:
            default:
                currentAvailableInvoker = invoker;
        }

        return currentAvailableInvoker.invoke(invocation);
    }

4.服务调用

4.1 服务导出TripleProtocol协议的流程

TripleProtocol#export

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();
        checkProtobufVersion(url);
        // 本地注册--->实例类
        String key = serviceKey(url);

        // 服务导出器,用来卸载服务时做一些善后处理
        final AbstractExporter<T> exporter = new AbstractExporter<T>(invoker) {
            @Override
            public void afterUnExport() {
                pathResolver.remove(url.getServiceKey());
                pathResolver.add(url.getServiceModel().getServiceModel().getInterfaceName(),
                    invoker);
                // set service status
                triBuiltinService.getHealthStatusManager()
                    .setStatus(url.getServiceKey(), ServingStatus.NOT_SERVING);
                triBuiltinService.getHealthStatusManager()
                    .setStatus(url.getServiceInterface(), ServingStatus.NOT_SERVING);
                exporterMap.remove(key);
            }
        };

        exporterMap.put(key, exporter);

        invokers.add(invoker);

        pathResolver.add(url.getServiceKey(), invoker); // url 20882
        pathResolver.add(url.getServiceModel().getServiceModel().getInterfaceName(), invoker);

        // set service status
        triBuiltinService.getHealthStatusManager()
            .setStatus(url.getServiceKey(), HealthCheckResponse.ServingStatus.SERVING);
        triBuiltinService.getHealthStatusManager()
            .setStatus(url.getServiceInterface(), HealthCheckResponse.ServingStatus.SERVING);

        // 启动服务器,用来处理HTTP2的请求
        PortUnificationExchanger.bind(invoker.getUrl());
        return exporter;
    }

PortUnificationExchanger#bind

    public static void bind(URL url) {
        // servers表示可以同时运行多个PortUnificationServer,只需要绑定的host+port不一样即可
        servers.computeIfAbsent(url.getAddress(), addr -> {
            final PortUnificationServer server = new PortUnificationServer(url);
            // 运行NettyServer,并绑定ip和port
            server.bind();
            return server;
        });
    }
    public void bind() {
        if (channel == null) {
            doOpen();
        }
    }
    protected void doOpen() {
        bootstrap = new ServerBootstrap();

        bossGroup = NettyEventLoopFactory.eventLoopGroup(1, EVENT_LOOP_BOSS_POOL_NAME);
        workerGroup = NettyEventLoopFactory.eventLoopGroup(
            getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
            EVENT_LOOP_WORKER_POOL_NAME);

        bootstrap.group(bossGroup, workerGroup)
            .channel(NettyEventLoopFactory.serverSocketChannelClass())
            .option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
            .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
            .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            .childHandler(new ChannelInitializer<SocketChannel>() {

                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    // FIXME: should we use getTimeout()?
                    int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
                    // ch是Socket连接
                    final ChannelPipeline p = ch.pipeline();
//                        p.addLast(new LoggingHandler(LogLevel.DEBUG));

                    final boolean enableSsl = getUrl().getParameter(SSL_ENABLED_KEY, false);
                    if (enableSsl) {
                        p.addLast("negotiation-ssl", new SslServerTlsHandler(getUrl()));
                    }

                    // 初始化SocketChannel,并在pipeline中绑定PortUnificationServerHandler
                    final PortUnificationServerHandler puHandler = new PortUnificationServerHandler(url, protocols);
                    p.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS));
                    p.addLast("negotiation-protocol", puHandler);
                    channelGroup = puHandler.getChannels();
                }
            });
        // bind

        String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
        int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
        if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
            bindIp = ANYHOST_VALUE;
        }
        InetSocketAddress bindAddress = new InetSocketAddress(bindIp, bindPort);
        ChannelFuture channelFuture = bootstrap.bind(bindAddress);
        channelFuture.syncUninterruptibly();
        channel = channelFuture.channel();
    }

核心是PortUnificationServerHandler。


ByteToMessageDecoder#channelRead

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof ByteBuf) {
            CodecOutputList out = CodecOutputList.newInstance();
            try {
                first = cumulation == null;
                cumulation = cumulator.cumulate(ctx.alloc(),
                        first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);
                callDecode(ctx, cumulation, out);
            } catch (DecoderException e) {
                throw e;
            } catch (Exception e) {
                throw new DecoderException(e);
            } finally {
                try {
                    if (cumulation != null && !cumulation.isReadable()) {
                        numReads = 0;
                        cumulation.release();
                        cumulation = null;
                    } else if (++numReads >= discardAfterReads) {
                        // We did enough reads already try to discard some bytes so we not risk to see a OOME.
                        // See https://github.com/netty/netty/issues/4275
                        numReads = 0;
                        discardSomeReadBytes();
                    }

                    int size = out.size();
                    firedChannelRead |= out.insertSinceRecycled();
                    fireChannelRead(ctx, out, size);
                } finally {
                    out.recycle();
                }
            }
        } else {
            ctx.fireChannelRead(msg);
        }
    }

ByteToMessageDecoder#callDecode

    protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        try {
            while (in.isReadable()) {
                int outSize = out.size();

                if (outSize > 0) {
                    fireChannelRead(ctx, out, outSize);
                    out.clear();

                    // Check if this handler was removed before continuing with decoding.
                    // If it was removed, it is not safe to continue to operate on the buffer.
                    //
                    // See:
                    // - https://github.com/netty/netty/issues/4635
                    if (ctx.isRemoved()) {
                        break;
                    }
                    outSize = 0;
                }

                int oldInputLength = in.readableBytes();
                decodeRemovalReentryProtection(ctx, in, out);

                // Check if this handler was removed before continuing the loop.
                // If it was removed, it is not safe to continue to operate on the buffer.
                //
                // See https://github.com/netty/netty/issues/1664
                if (ctx.isRemoved()) {
                    break;
                }

                if (outSize == out.size()) {
                    if (oldInputLength == in.readableBytes()) {
                        break;
                    } else {
                        continue;
                    }
                }

                if (oldInputLength == in.readableBytes()) {
                    throw new DecoderException(
                            StringUtil.simpleClassName(getClass()) +
                                    ".decode() did not read anything but decoded a message.");
                }

                if (isSingleDecode()) {
                    break;
                }
            }
        } catch (DecoderException e) {
            throw e;
        } catch (Exception cause) {
            throw new DecoderException(cause);
        }
    }
    final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
            throws Exception {
        decodeState = STATE_CALLING_CHILD_DECODE;
        try {
            decode(ctx, in, out);
        } finally {
            boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
            decodeState = STATE_INIT;
            if (removePending) {
                fireChannelRead(ctx, out, out.size());
                out.clear();
                handlerRemoved(ctx);
            }
        }
    }

这里会调用PortUnificationServerHandler#decode

    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
       // ctx.channel() 拿到的是NioSocketChannel 接收到数据ByteBuf后,会先解码,然后再触发Handler的channelRead

        // 根据前5个字节确定对应的协议
        // Will use the first five bytes to detect a protocol.
        if (in.readableBytes() < 5) {
            return;
        }

        // 看是不是HTTP2.0
        for (final WireProtocol protocol : protocols) {
            in.markReaderIndex();
            final ProtocolDetector.Result result = protocol.detector().detect(ctx, in);
            in.resetReaderIndex();
            switch (result) {
                case UNRECOGNIZED:
                    continue;
                case RECOGNIZED:
                    // 符合个某个协议后,再給Socket连接对应的pipeline绑定Handler
                    protocol.configServerPipeline(url, ctx.pipeline(), sslCtx);
                    ctx.pipeline().remove(this);
                case NEED_MORE_DATA:
                    return;
                default:
                    return;
            }
        }
        // Unknown protocol; discard everything and close the connection.
        in.clear();
        ctx.close();
    }

总结一下后续流程:

  • Http2ProtocolDetector#detect,如何要建立的是一个HTTP2连接,那么在建立完Socket连接后,客户端会发送一个连接前言,也就是一串字节(对应的字符串为:“PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n”),给到服务端,服务端从而知道要建立的是一个HTTP2的连接
  • 符合HTTP2协议后,会删除本身的Handler,通过TripleHttp2Protocol#configServerPipeline添加处理Triple协议的Handler
    1)Http2FrameCodec,配置HTTP2,比如max-concurrent-streams、max-frame-size;
    2)TripleServerConnectionHandler用来处理Http2PingFrame、Http2GoAwayFrame;
    3)Http2MultiplexHandler是用来创建子Channel的,并且ChannelInitializer是用来初始化子Channel,一个Socket连接对应一个NioSocketChannel,下层可以设置多个子Channel,每个子Channel对应一个HTTP2Stream(TripleCommandOutBoundHandler:将QueuedCommand转换成Http2StreamFrame,然后再发出去;TripleHttp2FrameServerHandler:处理的是HTTP2Stream所对应的子Channel,核心Handler, 用来处理Http2HeadersFrame、Http2DataFrame, lookupExecutor会根据服务url得到一个线程池,每个子Channel对应一个线程池?还是共享一个线程池?默认是共享一个。)
    4)TripleTailHandler释放ByteBuf的内存空间

重点看一下TripleHttp2Protocol#configServerPipeline

        // Http2MultiplexHandler是用来创建子Channel的,并且ChannelInitializer是用来初始化子Channel
        // 一个Socket连接对应一个NioSocketChannel,下层可以设置多个子Channel,每个子Channel对应一个HTTP2Stream
        final Http2MultiplexHandler handler = new Http2MultiplexHandler(

            // Channel初始化器,用来初始化传入进来的Channel,比如HTTP2Stream所对应的Channel
            new ChannelInitializer<Channel>() {
                @Override
                protected void initChannel(Channel ch) {
                    final ChannelPipeline p = ch.pipeline();
                    // 将QueuedCommand转换成Http2StreamFrame,然后再发出去
                    p.addLast(new TripleCommandOutBoundHandler());

                    // TripleHttp2FrameServerHandler处理的是HTTP2Stream所对应的子Channel
                    // 核心Handler, 用来处理Http2HeadersFrame、Http2DataFrame, lookupExecutor会根据服务url得到一个线程池
                    // 每个子Channel对应一个线程池?还是共享一个线程池?默认是共享一个
                    p.addLast(new TripleHttp2FrameServerHandler(frameworkModel, lookupExecutor(url),
                        filters));
                }
            });

核心是TripleHttp2FrameServerHandler。

4.2 服务引入TripleProtocol协议的流程

    public TripleInvoker(Class<T> serviceType,
        URL url,
        String acceptEncodings,
        ConnectionManager connectionManager,
        Set<Invoker<?>> invokers,
        ExecutorService streamExecutor) {
        super(serviceType, url, new String[]{INTERFACE_KEY, GROUP_KEY, TOKEN_KEY});
        this.invokers = invokers;
        // 与服务提供者建立Socket连接
        this.connection = connectionManager.connect(url);
        this.acceptEncodings = acceptEncodings;
        this.streamExecutor = streamExecutor;
    }

MultiplexProtocolConnectionManager#connect

    public Connection connect(URL url) {
        // 协议相同的URL将对应同一个ConnectionManager
        final ConnectionManager manager = protocols.computeIfAbsent(url.getProtocol(), this::createSingleProtocolConnectionManager);
        return manager.connect(url);
    }

SingleProtocolConnectionManager#connect

    public Connection connect(URL url) {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        // 不同的address对应不同的Connection对象
        return connections.compute(url.getAddress(), (address, conn) -> {
            if (conn == null) {
                final Connection created = new Connection(url);
                created.getClosePromise().addListener(future -> connections.remove(address, created));
                return created;
            } else {
                conn.retain();
                return conn;
            }
        });
    }
    public Connection(URL url) {
        url = ExecutorUtil.setThreadName(url, "DubboClientHandler");
        url = url.addParameterIfAbsent(THREADPOOL_KEY, DEFAULT_CLIENT_THREADPOOL);
        this.url = url;
        this.protocol = ExtensionLoader.getExtensionLoader(WireProtocol.class).getExtension(url.getProtocol());
        this.connectTimeout = url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT);
        this.remote = getConnectAddress();
        // 只是创建一个Bootstrap对象,并不会建立Socket连接
        this.bootstrap = create();
    }
    private Bootstrap create() {
        final Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(NettyEventLoopFactory.NIO_EVENT_LOOP_GROUP.get())
                .option(ChannelOption.SO_KEEPALIVE, true)
                .option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .remoteAddress(remote)
                .channel(socketChannelClass());

        final ConnectionHandler connectionHandler = new ConnectionHandler(this);
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout);
        bootstrap.handler(new ChannelInitializer<SocketChannel>() {

            @Override
            protected void initChannel(SocketChannel ch) {
                final ChannelPipeline pipeline = ch.pipeline();
                SslContext sslContext = null;
                if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
                    pipeline.addLast("negotiation", new SslClientTlsHandler(url));
                }

                //.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                // TODO support IDLE
//                int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());
                pipeline.addLast(connectionHandler);
                protocol.configClientPipeline(url, pipeline, sslContext);
                // TODO support Socks5
            }
        });
        return bootstrap;
    }

构造TripleInvoker,创建connection只是创建一个Bootstrap对象,并不会建立Socket连接。

4.3 调用逻辑

4.3.1 服务消费者发送请求

那Socket连接什么时候创建呢?

  • 第一次使用TripleInvoker#invoke
    -> TripleInvoker#doInvoke
    -> Connection#isAvailable
    -> Connection#connect
    -> Bootstrap#connect()
 StreamObserver<String> streamObserver = demoService.sayHelloBiStream(new ZhouyuResultStreamObserver());
 streamObserver.onNext("zhouyu1");
 streamObserver.onCompleted();

TripleInvoker#doInvoke

  • 1)检查Socket连接是否可用,如果不可用并且没有初始化,那就连接服务端创建Socket连接
  • 2)拿到服务接口信息和当前调用的方法信息
  • 3)根据方法区分调用类型
    UNARY -> invokeUnary();
    SERVER_STREAM -> invokeServerStream();
    CLIENT_STREAM/ BI_STREAM -> invokeBiOrClientStream();

TripleInvoker#invokeBiOrClientStream

  • 1)拿到处理响应的responseObserver,也就是上面调用方法时传入的ZhouyuResultStreamObserver;
  • 2)创建requestObserver,streamCall方法中就会创建一个Stream,并且返回一个StreamObserver对象,可以利用这个requestObserver来向Stream中发送数据,将responseObserver封装为ClientCall.Listener,ClientCall.Listener是用来接收响应数据的。

TripleInvoker#streamCall

  • 将responseObserver封装为ClientCall.Listener
  • call.start()返回一个ClientStreamObserver,用来发送数据

ClientCall#start

  • this.stream = new ClientStream(),在构造方法里面, this.writeQueue = createWriteQueue(parent);

ClientStream#createWriteQueue

    private WriteQueue createWriteQueue(Channel parent) {
        // 利用Netty开启一个Http2StreamChannel,也就是HTTP2中的流
        final Http2StreamChannelBootstrap bootstrap = new Http2StreamChannelBootstrap(parent);
        final Future<Http2StreamChannel> future = bootstrap.open().syncUninterruptibly();
        if (!future.isSuccess()) {
            throw new IllegalStateException("Create remote stream failed. channel:" + parent);
        }

        // 并绑定两个Handler,一个工作在发送数据时,一个工作在接收数据时
        final Http2StreamChannel channel = future.getNow();
        channel.pipeline()
            .addLast(new TripleCommandOutBoundHandler())
            // TripleHttp2ClientResponseHandler是用来接收响应的
            .addLast(new TripleHttp2ClientResponseHandler(createTransportListener()));

        // 基于Http2StreamChannel创建一个WriteQueue
        // 后续把要发送的数据,添加到WriteQueue中就能发送出去了
        return new WriteQueue(channel);
    }

当用户程序调用streamObserver.onNext()发送数据时,实际调用的是:

  • ClientCallToObserverAdapter#onNext
  • ClientCall#sendMessage
    1)发送请求头
    2)发送请求体
     把要发送的message进行序列化,得到字节数组;
     看是否需要压缩数据;
     stream.writeMessage(compress, compressed);发送数据,发送的是请求体,在请求体的最开始最记录当前请求体是否被压缩,压缩只会的数据长度是多少(DataQueueCommand,表示HTTP2中的Http2DataFrame,用的就是gRPC发送请求体的格式);

发送的核心逻辑WriteQueue#enqueue(QueuedCommand):

  • queue.add(command);这里的queue是ConcurrentLinkedQueue
  • scheduleFlush();调用channel.eventLoop().execute(this::flush);
  • 发送逻辑在WriteQueue#flush
    1)while循环从queue.poll()获取发送数据cmd
    2)cmd.run(channel);将数据帧添加到Http2StreamChannel中,添加并不会立马发送,调用了flush才发送
    3)连续从队列中取到了128个数据帧就flush一次,channel.flush();这里处理高并发情况,128就发送一次
    4)如果i != 0,但是没有达到128,也会发送

具体的发送:

  • 1)TripleCommandOutBoundHandler#write
  • 2)QueuedCommand#send
  • 3)DataQueueCommand#doSend

DataQueueCommand#doSend

    public void doSend(ChannelHandlerContext ctx, ChannelPromise promise) {
        if (data == null) {
            ctx.write(new DefaultHttp2DataFrame(endStream), promise);
        } else {
            ByteBuf buf = ctx.alloc().buffer();
            // 第一个字节记录请求体是否被压缩
            buf.writeByte(compressFlag);
            // 后四个字节记录请求体的长度
            buf.writeInt(data.length);
            // 真实的数据
            buf.writeBytes(data);
            // 发送
            ctx.write(new DefaultHttp2DataFrame(buf, endStream), promise);
        }
    }

总结一下:

  • 1)ClientCallToObserverAdapter#onNext
    1-1)ClientStream#sendHeader发送请求头,HeaderQueueCommand,endStream是false;
    1-2)ClientStream#writeMessage发送请求体,DataQueueCommand,endStream是false;
  • 2)ClientCallToObserverAdapter#onCompleted,DefaultHttp2DataFrame,endStream是true。

4.3.2 服务提供者处理请求逻辑

TripleHttp2FrameServerHandler#channelRead

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof Http2HeadersFrame) {
            onHeadersRead(ctx, (Http2HeadersFrame) msg);
        } else if (msg instanceof Http2DataFrame) {
            onDataRead(ctx, (Http2DataFrame) msg);
        } else if (msg instanceof ReferenceCounted) {
            // ignored
            ReferenceCountUtil.release(msg);
        }
    }

处理请求头TripleHttp2FrameServerHandler#onHeadersRead

  • 1)收到一个Http2HeadersFrame时,生成一个ServerStream,和ClientStream对应,此处ctx.channel()拿到的是子Channel, 对应的是Http2StreamChannel,表示流
  • 2)ServerStream.ServerTransportObserver#processHeader
     A)各种校验
     B)获取接口名(服务名)serviceName以及方法名originalMethodName,根据(serviceName, group, version)从本地注册表pathResolver中取出Invoker。
     C)得到数据解压器TriDecoder,会解压数据,并把解压后的数据交给ServerDecoderListener进行处理
     D)创建ReflectionServerCall,传入进来的executor是SerializingExecutor,但是在构造方法里会再包一层,以call里的executor为 SerializingExecutor(SerializingExecutor(ThreadPoolExecutor))SerializingExecutor;并调用ReflectionServerCall#startCall
      D-1)ReflectionServerCall#doStartCall
      D-2)调用至ReflectionServerCall.ServerStreamListenerImpl#startCall
      D-3)调用至ReflectionServerCall.ServerStreamListenerImpl#trySetListener
      D-4)调用至ServerCall#startInternalCall,这里会创建ServerCall.Listener:
      UNARY:UnaryServerCallListener
      SERVER_STREAM:ServerStreamServerCallListener
      BI_STREAM或CLIENT_STREAM:BiStreamServerCallListener

处理请求体TripleHttp2FrameServerHandler#onDataRead

  • 1)ServerStream.ServerTransportObserver#doOnData
  • 2)deframer.deframe(data);这里deframer就是TriDecoder
  • 3)TriDecoder#deframe处理请求体
  • 4)TriDecoder#deliver
    4-1)processHeader();处理请求体的前5个字节,第一个字节表示是否压缩;后四个字节表示请求体长度。
    4-2)processBody();处理实际发过来的数据。如果压缩了,先解压缩。
    ServerCall.ServerStreamListenerBase#onMessage
    ReflectionServerCall.ServerStreamListenerImpl#doOnMessage
     A)obj = packableMethod.getRequestUnpack().unpack(message);把解压之后的字节数据进行反序列化
     B)listener.onMessage(obj);
      UNARY:UnaryServerCallListener
      SERVER_STREAM:ServerStreamServerCallListener
      BI_STREAM或CLIENT_STREAM:BiStreamServerCallListener

UNARY调用逻辑(同步阻塞)

  • 1)ClientCallToObserverAdapter#onNext发送请求头和请求体。
    服务提供者接收到请求体才执行:UnaryServerCallListener#onMessage这里invocation.setArguments()会把接收到的数据作为方法参数,但是还没有调用Invoker#invoke方法。
  • 2)ClientCallToObserverAdapter#onCompleted发送空的请求体,但是endStream是true。
    服务提供者:ServerStream.ServerTransportObserver#doOnData中,endStream是true,会调用deframer.close();
    TriDecoder#close,这里closing = true;
    -> TriDecoder#deliver()
    -> ServerStream.ServerTransportObserver.ServerDecoderListener#close
    -> ReflectionServerCall.ServerStreamListenerImpl#complete
    -> UnaryServerCallListener#onComplete 这里会调用invoke()
     A)会执行final Result response = invoker.invoke(invocation);
     B)onReturn(r.getValue());实际为UnaryServerCallListener#onReturn方法执行完后,把方法结果写回给客户端。
     B-1)responseObserver.onNext(value);
     B-2)responseObserver.onCompleted(TriRpcStatus.OK);

SERVER_STREAM调用逻辑(异步)

  • 1)ClientCallToObserverAdapter#onNext发送请求头和请求体。
    服务提供者接收到请求体才执行:ServerStreamServerCallListener#onMessage这里invocation.setArguments(new Object[]{message, responseObserver});把接收到的数据和ServerCallToObserverAdapter作为方法调用的参数,ServerCallToObserverAdapter可以用来可以客户端发送数据。
  • 2)ServerStreamServerCallListener#onComplete也是调用invoke()。

在Invoker#invoker里面就可以调用responseObserver向客户端发送响应。

    public void sayHelloServerStream(String name, StreamObserver<String> response) {

        response.onNext(name + " hello");

        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        response.onNext(name + " world");

        response.onCompleted();

    }

BI_STREAM或CLIENT_STREAM(异步)

  • 1)ClientCallToObserverAdapter#onNext发送请求头;
    服务提供者接收到请求头就执行,在BiStreamServerCallListener构造方法里面:
    1-1)invocation.setArguments(new Object[]{responseObserver});构造监听器的时候,把服务端流对象设置为业务方法参数;
    1-2)invoke(); 执行业务方法-->onReturn,在onReturn里面this.requestObserver = (StreamObserver<Object>) value;会将程序员创建的StreamObserver赋值给requestObserver。
  • 2)ClientCallToObserverAdapter#onNext发送请求体:
    BiStreamServerCallListener#onMessage调用requestObserver.onNext(message);也即程序员写的StreamObserver.onNext()。
    public StreamObserver<String> sayHelloBiStream(StreamObserver<String> response) {
        return new StreamObserver<String>() {
            @Override
            public void onNext(String name) {
                System.out.println(name);
                response.onNext("hello: "+name);
            }

            @Override
            public void onError(Throwable throwable) {
            }

            @Override
            public void onCompleted() {
                System.out.println("completed");
            }
        };
    }

4.3.3 客户端接收到响应TripleHttp2ClientResponseHandler

TripleHttp2ClientResponseHandler#channelRead0

    protected void channelRead0(ChannelHandlerContext ctx, Http2StreamFrame msg) throws Exception {
        if (msg instanceof Http2HeadersFrame) {
            final Http2HeadersFrame headers = (Http2HeadersFrame) msg;
            transportListener.onHeader(headers.headers(), headers.isEndStream());
        } else if (msg instanceof Http2DataFrame) {
            final Http2DataFrame data = (Http2DataFrame) msg;
            transportListener.onData(data.content(), data.isEndStream());
        } else {
            super.channelRead(ctx, msg);
        }
    }

ClientStream.ClientTransportListener#onData

        public void onData(ByteBuf data, boolean endStream) {
            executor.execute(() -> {

                // transportError不等于null,表示处理响应头时就有问题了
                if (transportError != null) {
                    transportError.appendDescription(
                        "Data:" + data.toString(StandardCharsets.UTF_8));
                    // 释放内存空间
                    ReferenceCountUtil.release(data);
                    //
                    if (transportError.description.length() > 512 || endStream) {
                        handleH2TransportError(transportError);

                    }
                    return;
                }

                if (!headerReceived) {
                    handleH2TransportError(TriRpcStatus.INTERNAL.withDescription(
                        "headers not received before payload"));
                    return;
                }

                // 接收到响应体数据后,把数据添加到accumulate中进行保存
                deframer.deframe(data);
            });
        }

ClientStream.ClientTransportListener#onData
-> TriDecoder#deframe
-> TriDecoder#deliver
-> TriDecoder#processBody
-> ClientCall.ClientStreamListenerImpl#onMessage
-> listener.onMessage(unpacked);有两种方式:同步和异步
UnaryClientCallListener#onMessage同步
ObserverToClientCallListenerAdapter#onMessage异步

ObserverToClientCallListenerAdapter#onMessage异步

    public void onMessage(Object message) {
        // 接收到一个响应结果,回调StreamObserver
        delegate.onNext(message);
        // 继续处理下一个响应结果
        if (call.isAutoRequestN()) {
            call.requestN(1);
        }
    }

上面的delegate就是服务消费者这边创建的ZhouyuResultStreamObserver:

    static class ZhouyuResultStreamObserver implements StreamObserver<String> {


        @Override
        public void onNext(String data) {
            System.out.println(data);
        }

        @Override
        public void onError(Throwable throwable) {
            System.out.println(throwable);
        }

        @Override
        public void onCompleted() {
            System.out.println("complete");
        }
    }

梳理一下客户端AbstractInvoker#invoke整个流程

  • 1)AsyncRpcResult asyncResult = doInvokeAndReturn(invocation);
    1-1)TripleInvoker#doInvoke
     A)UNARY,invokeUnary()返回是一个正常的CompletableFuture,未完成,后面就会阻塞。
     B)SERVER_STREAM,invokeServerStream()返回return new AsyncRpcResult(CompletableFuture.completedFuture(new AppResponse()), invocation);直接返回一个已经完成了的CompletableFuture,外层invoke阻塞的地方将直接通过,异步的效果。
     C)CLIENT_STREAM和BI_STREAM,invokeBiOrClientStream()返回的也是一个已经完成了的CompletableFuture。
  • 2) waitForResultIfSync(asyncResult, invocation);如果需要同步就阻塞
    2-1)responseFuture.get(timeout, unit),这里就会判断responseFuture,如果没有完成就阻塞。

同步阻塞什么时候唤醒?

  • 服务提供者调用StreamObserver#onCompleted后,服务消费者就会调用到UnaryClientCallListener#onClose
  • 这里future.received(status, result);触发DeadlineFuture完毕(调用CompletableFuture#complete),解开TripleInvoker的invoke方法中的同步阻塞

4.3.4 SERVER_STREAM的一个BUG

服务提供者逻辑如下。

    public void sayHelloServerStream(String name, StreamObserver<String> response) {

        response.onNext(name + " hello");

        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        response.onNext(name + " world");

        response.onCompleted();

    }

预期是服务消费者拿到一个数据帧后,停3s,然后再拿到后续的数据。但是Dubbo这里是直接等3s,然后一下子拿到了所有三个数据。

整体流程:

  • 服务提供者处理请求TripleHttp2FrameServerHandler#onDataRead
    1)executor.execute(() -> doOnData(data, endStream));这里线程池是SerializingExecutor
    2)利用一个线程去执行SerializingExecutor中的run方法,从runQueue获取任务执行,也就是执行上面的lambda表达式。
  • 服务提供者获取到请求体后就会调用invoker.invoke(invocation)执行具体的方法;
  • 在服务方法里面会调用responseObserver.onNext,也即ServerCallToObserverAdapter#onNext向客户端发送数据。
    1)executor.execute(writeMessage);将Runnable添加到队列中,此处的executor为SerializingExecutor(SerializingExecutor(ThreadPoolExecutor)),内部的SerializingExecutor(ThreadPoolExecutor)就是用来接收请求数据的。
    执行效果:
    A)首先将writeMessage加入到外面的SerializingExecutor队列中;
    B)然后执行里面SerializingExecutor的execute时,将writeMessage加入到里面SerializingExecutor的队列中,然后由于里面SerializingExecutor的状态已经改为了true,所以就没法执行下去,直接返回了,所以这了数据并没有发送出去。
  • 那什么时候数据才能发出去呢?就是要里面SerializingExecutor的状态改为false才可以,这时候需要整个方法sayHelloServerStream()都执行完后才行。

建议更改:

  • ServerCall#writeMessage中executor.execute(writeMessage)直接改为doWriteMessage(message),不需要添加到线程池,直接发送即可。

SerializingExecutor#execute

    public void execute(Runnable r) {
        // SerializingExecutor会对加入到runQueue中的Runnable用一个线程进行串行处理

        // 将Runnable任务添加到队列
        runQueue.add(r);

        // 使用内部线程池executor中的一个线程来运行
        schedule(r);
    }

SerializingExecutor#schedule

    private void schedule(Runnable removable) {
        if (atomicBoolean.compareAndSet(false, true)) {
            boolean success = false;
            try {
                // SerializingExecutor内部保护了一个线程池executor,这个线程池是根据服务url创建出来的
                // 注意:这里并不是把runQueue队列中的Runnable任务拿出来用线程去执行
                // 而是把SerializingExecutor自己作为一个Runnable交给线程池中的一个线程去执行
                // 这里其实就是利用一个线程去执行SerializingExecutor中的run方法,从而获取runQueue中的任务进行执行
                executor.execute(this);
                success = true;
            } finally {
                // It is possible that at this point that there are still tasks in
                // the queue, it would be nice to keep trying but the error may not
                // be recoverable.  So we update our state and propagate so that if
                // our caller deems it recoverable we won't be stuck.
                if (!success) {
                    if (removable != null) {
                        // This case can only be reached if 'this' was not currently running, and we failed to
                        // reschedule.  The item should still be in the queue for removal.
                        // ConcurrentLinkedQueue claims that null elements are not allowed, but seems to not
                        // throw if the item to remove is null.  If removable is present in the queue twice,
                        // the wrong one may be removed.  It doesn't seem possible for this case to exist today.
                        // This is important to run in case of RejectedExecutionException, so that future calls
                        // to execute don't succeed and accidentally run a previous runnable.
                        runQueue.remove(removable);
                    }
                    atomicBoolean.set(false);
                }
            }
        }
    }

SerializingExecutor#run

    public void run() {
        Runnable r;
        try {
            while ((r = runQueue.poll()) != null) {
                try {
                    r.run();
                } catch (RuntimeException e) {
                    LOGGER.error("Exception while executing runnable " + r, e);
                }
            }
        } finally {
            atomicBoolean.set(false);
        }

        // 如果队列中不为空,则继续获取一个线程执行run(),继续获取队列中的任务进行执行
        if (!runQueue.isEmpty()) {
            // we didn't enqueue anything but someone else did.
            schedule(null);
        }
    }

相关文章

网友评论

      本文标题:Dubbo 3.0源码剖析

      本文链接:https://www.haomeiwen.com/subject/ygbokdtx.html