美文网首页
Dubbo-线程池

Dubbo-线程池

作者: 我可能是个假开发 | 来源:发表于2023-02-01 20:09 被阅读0次

一、Dubbo中的线程池

提供了三种线程池的实现:

  • fixed:固定大小的线程池,启动时建立,并且不会关闭,这也是缺省的设置。
  • cached:缓存线程池,空闲一分钟自动删除,需要时重建。
  • limited:可伸缩线程池,但是线程池中的线程数只会增长不会收缩,这样做的目的是为了避免当进行收缩时流量突然增加造成性能问题。
image.png

二、解决线程池耗尽

@DubboService
@Component
public class AsyncOrderFacadeImpl implements AsyncOrderFacade {
    @Override
    public OrderInfo queryOrderById(String id) {
        // 这里模拟执行一段耗时的业务逻辑
        sleepInner(5000);
        OrderInfo resultInfo = new OrderInfo(
                "GeekDubbo",
                "服务方异步方式之RpcContext.startAsync#" + id,
                new BigDecimal(129));
        return resultInfo;
    }
}

如果 queryOrderById 这个方法的调用量上来了,很容易导致 Dubbo 线程池耗尽。因为 Dubbo 线程池总数默认是固定的,200 个,假设系统在单位时间内可以处理 500 个请求,一旦 queryOrderById 的请求流量上来了,极端情况下,可能会出现 200 个线程都在处理这个耗时的任务,那么剩下的 300 个请求,即使是不耗时的功能,也很难有机会拿到线程资源。所以,紧接着就导致 Dubbo 线程池耗尽了。

解决:
为了让这种耗时的请求尽量不占用公共的线程池资源,这段代码优化成异步形式。核心问题是异步化的时候需要拿到返回结果。
选择线程池的实现,如果能有一个存储媒介来存储异步化的结果,然后把存储媒介中的数据取出来返回回去。
在 queryOrderById 方法中开始异步化分支处理,紧接着在异步化分支中得到异步结果,然后把异步结果存储到某个地方,最后再取出这个异步结果并返回。

处理请求时共同的必经之路:
寻找一种可以拦截所有方法的流程机制,在拦截处拿到异步结果并返回:


image.png

因为 queryOrderById 走了异步分支,可能导致最终什么也没拦截到,所以要让拦截处想办法感知到 queryOrderById 内部实现是否走了异步处理,从代码层面上,引入一个变量,让拦截处一旦感知到业务接口(比如这里的 queryOrderById)开启了异步化模式处理,就可以理所当然地直接从存储异步结果的地方,把结果取回并返回。
采用上下文对象来存储,异步化的结果存储在上下文对象中。首先拦截识别异步,当拦截处发现有异步化模式的变量,从上下文对象中取出异步化结果并返回。
使用java.util.concurrent.Future
java.util.concurrent.CompletableFuture#get(long timeout, TimeUnit unit) 方法支持传入超时时间。

@DubboService
@Component
public class AsyncOrderFacadeImpl implements AsyncOrderFacade {
    @Override
    public OrderInfo queryOrderById(String id) {
        // 创建线程池对象
        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
        
        // 开启异步化操作模式,标识异步化模式开始
        AsyncContext asyncContext = RpcContext.startAsync();
        
        // 利用线程池来处理 queryOrderById 的核心业务逻辑
        cachedThreadPool.execute(new Runnable() {
            @Override
            public void run() {
                // 将 queryOrderById 所在线程的上下文信息同步到该子线程中
                asyncContext.signalContextSwitch();
                
                // 这里模拟执行一段耗时的业务逻辑
                sleepInner(5000);
                OrderInfo resultInfo = new OrderInfo(
                        "GeekDubbo",
                        "服务方异步方式之RpcContext.startAsync#" + id,
                        new BigDecimal(129));
                System.out.println(resultInfo);
                
                // 利用 asyncContext 将 resultInfo 返回回去
                asyncContext.write(resultInfo);
            }
        });
        return null;
    }
}

在 queryOrderById 中创建了一个线程池,然后把 Runnable 内部类放到线程池中去执行,并且存在一个上下文信息的传递动作,最后在 Runnable 实现类中,将异步结果写入到上下文对象。

  • 定义线程池对象,通过 RpcContext.startAsync 方法开启异步模式;
  • 在异步线程中通过 asyncContext.signalContextSwitch 同步父线程的上下文信息;
  • 在异步线程中将异步结果通过 asyncContext.write 写入到异步线程的上下文信息中。

三、Dubbo 异步实现原理

1.定义线程池对象

在 Dubbo 中 RpcContext.startAsync 方法意味着异步模式的开启:

image.png
通过 CAS 原子性的方式创建了一个 java.util.concurrent.CompletableFuture 对象,这个对象就存储在当前的上下文 org.apache.dubbo.rpc.RpcContextAttachment 对象中。

2.在异步线程中同步父线程的上下文信息

image.png
asyncContext.signalContextSwitch 同步不同线程间的信息,也就是信息的拷贝,只不过这个拷贝需要利用到异步模式开启之后的返回对象 asyncContext。因为 asyncContext 富含上下文信息,只需要把这个所谓的 asyncContext 对象传入到子线程中,然后将 asyncContext 中的上下文信息充分拷贝到子线程中,这样,子线程处理所需要的任何信息就不会因为开启了异步化处理而缺失。

3.在异步线程中,将异步结果写入到异步线程的上下文信息中

// org.apache.dubbo.rpc.AsyncContextImpl#write
public void write(Object value) {
    if (isAsyncStarted() && stop()) {
        if (value instanceof Throwable) {
            Throwable bizExe = (Throwable) value;
            future.completeExceptionally(bizExe);
        } else {
            future.complete(value);
        }
    } else {
        throw new IllegalStateException("The async response has probably been wrote back by another thread, or the asyncContext has been closed.");
    }
}

Dubbo 用 asyncContext.write 写入异步结果,通过 write 方法的查看,最终的异步化结果是存入了 java.util.concurrent.CompletableFuture 对象中,这样拦截处只需要调用 java.util.concurrent.CompletableFuture#get(long timeout, TimeUnit unit) 方法就可以很轻松地拿到异步化结果了。

四、异步应用场景

定义了线程池,可能会认为定义线程池的目的就是为了异步化操作,其实不是,因为异步化的操作会使 queryOrderById 方法立马返回,也就是说,异步化耗时的操作并没有在 queryOrderById 方法所在线程中继续占用资源,而是在新开辟的线程池中占用资源。

可以采用异步的场景:

  • 一些 IO 耗时的操作,比较影响客户体验和使用性能
  • 某段业务逻辑开启异步执行后不太影响主线程的原有业务逻辑。
    因为 queryOrderById 开启异步操作后就立马返回了,queryOrderById 所在的线程和异步线程没有太多瓜葛,异步线程的完成与否,不太影响 queryOrderById 的返回操作。
  • 时序上没有严格要求的业务逻辑
    在 queryOrderById 这段简单的逻辑中,只开启了一个异步化的操作,站在时序的角度上看,queryOrderById 方法返回了,但是异步化的逻辑还在慢慢执行着,完全对时序的先后顺序没有严格要求。

极客时间《Dubbo 源码剖析与实战》学习笔记Day18 - http://gk.link/a/11VBp

相关文章

  • Dubbo-线程池

    一、Dubbo中的线程池 提供了三种线程池的实现: fixed:固定大小的线程池,启动时建立,并且不会关闭,这也是...

  • java线程池

    线程VS线程池 普通线程使用 创建线程池 执行任务 执行完毕,释放线程对象 线程池 创建线程池 拿线程池线程去执行...

  • java----线程池

    什么是线程池 为什么要使用线程池 线程池的处理逻辑 如何使用线程池 如何合理配置线程池的大小 结语 什么是线程池 ...

  • Java线程池的使用

    线程类型: 固定线程 cached线程 定时线程 固定线程池使用 cache线程池使用 定时调度线程池使用

  • Spring Boot之ThreadPoolTaskExecut

    初始化线程池 corePoolSize 线程池维护线程的最少数量keepAliveSeconds 线程池维护线程...

  • 线程池

    1.线程池简介 1.1 线程池的概念 线程池就是首先创建一些线程,它们的集合称为线程池。使用线程池可以很好地提高性...

  • 多线程juc线程池

    java_basic juc线程池 创建线程池 handler是线程池拒绝策略 排队策略 线程池状态 RUNNIN...

  • ThreadPoolExecutor线程池原理以及源码分析

    线程池流程: 线程池核心类:ThreadPoolExecutor:普通的线程池ScheduledThreadPoo...

  • 线程池

    线程池 [TOC] 线程池概述 什么是线程池 为什么使用线程池 线程池的优势第一:降低资源消耗。通过重复利用已创建...

  • java 线程池使用和详解

    线程池的使用 构造方法 corePoolSize:线程池维护线程的最少数量 maximumPoolSize:线程池...

网友评论

      本文标题:Dubbo-线程池

      本文链接:https://www.haomeiwen.com/subject/okgihdtx.html