美文网首页
dubbo线程池

dubbo线程池

作者: SparkOnly | 来源:发表于2021-03-29 21:51 被阅读0次

类型

dubbo的线程池类型通过SPI定义在org.apache.dubbo.common.threadpool.ThreadPool文件里
总共有以下4类

  • fixed(默认)
    org.apache.dubbo.common.threadpool.support.fixed.FixedThreadPool
    复用固定数量的线程
public Executor getExecutor(URL url) {
        //threadname, Dubbo
        String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
        //threads, 200
        int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
        //queues, 0
        int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
        return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }
  • cached
    org.apache.dubbo.common.threadpool.support.cached.CachedThreadPool
    默认线程1分钟后就会被回收,新线程会随着请求的到来被创建
public Executor getExecutor(URL url) {
        //默认Dubbo
        String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
        //默认0
        int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
        int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE);
        //默认0
        int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
        //默认60000,1分钟
        int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE);
        return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }
  • limited
    org.apache.dubbo.common.threadpool.support.limited.LimitedThreadPool
    需要的时候就创建线程,直到达到上限。线程池不会自动收缩
public Executor getExecutor(URL url) {
        String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
        int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
        int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
        int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
        return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }
  • eager
    org.apache.dubbo.common.threadpool.support.eager.EagerThreadPool
    核心线程都在使用的时候,对于新任务创建新线程,而不是放入阻塞队列
public Executor getExecutor(URL url) {
        String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
        int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
        int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE);
        int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
        int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE);

        // init queue and executor
        TaskQueue<Runnable> taskQueue = new TaskQueue<Runnable>(queues <= 0 ? 1 : queues);
        EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores,
                threads,
                alive,
                TimeUnit.MILLISECONDS,
                taskQueue,
                new NamedInternalThreadFactory(name, true),
                new AbortPolicyWithReport(name, url));
        taskQueue.setExecutor(executor);
        return executor;
    }

EagerThreadPoolExecutor核心过程
记录提交的任务数,如果执行抛出RejectedExecutionException会重新放入队列进行执行

public void execute(Runnable command) {
        if (command == null) {
            throw new NullPointerException();
        }
        // do not increment in method beforeExecute!
        submittedTaskCount.incrementAndGet();
        try {
            super.execute(command);
        } catch (RejectedExecutionException rx) {
            // retry to offer the task into queue.
            final TaskQueue queue = (TaskQueue) super.getQueue();
            try {
                if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
                    submittedTaskCount.decrementAndGet();
                    throw new RejectedExecutionException("Queue capacity is full.", rx);
                }
            } catch (InterruptedException x) {
                submittedTaskCount.decrementAndGet();
                throw new RejectedExecutionException(x);
            }
        } catch (Throwable t) {
            // decrease any way
            submittedTaskCount.decrementAndGet();
            throw t;
        }
    }

TaskQueue的核心过程

public boolean offer(Runnable runnable) {
        if (executor == null) {
            throw new RejectedExecutionException("The task queue does not have executor!");
        }
        int currentPoolThreadSize = executor.getPoolSize();
        // have free worker. put task into queue to let the worker deal with task.
        // 已提交任务数<当前线程池大小,说明有空闲的worker,直接放入队列让worker处理任务
        if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
            return super.offer(runnable);
        }
        // return false to let executor create new worker.
        // 当前线程池大小< 最大线程池大小,直接返回false,让线程池创建新的worker
        if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
            return false;
        }
        // currentPoolThreadSize >= max
        return super.offer(runnable);
    }

初始化过程

  1. NettyClient(消费端)或NettyServer(服务端)初始化后,会初始化线程池。
    通过调用DefaultExecutorRepository#createExecutorIfAbsent。

下面两张图分别为NettyClient和NettyServer初始化时候的设值


消费端
服务端
  • 消费端
    默认采用cached方式
    AbstractClient类:
//String THREADPOOL_KEY = "threadpool";
//String DEFAULT_CLIENT_THREADPOOL = "cached";
 private void initExecutor(URL url) {
        url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME);
        url = url.addParameterIfAbsent(THREADPOOL_KEY, DEFAULT_CLIENT_THREADPOOL);
        executor = executorRepository.createExecutorIfAbsent(url);
    }
  • 服务端
    默认采用fixed方式
    从SPI注解上获取到类型
@SPI("fixed")
public interface ThreadPool {
    /**
     * Thread pool
     *
     * @param url URL contains thread parameter
     * @return thread pool
     */
    @Adaptive({THREADPOOL_KEY})
    Executor getExecutor(URL url);
}

DefaultExecutorRepository#createExecutor
会动态生成ThreadPool$Adaptive对象,默认获取SPI注解上的参数

private ExecutorService createExecutor(URL url) {
        return (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
    }

相关文章

  • 3. Dubbo线程池模型

    dubbo有两种线程池,io线程池、业务线程池以netty作为io框架为例:boss线程池: 主要处理新的连接请求...

  • 信号量用法:锁

    用于dubbo线程池满的拒绝策略

  • dubbo线程池

    类型 dubbo的线程池类型通过SPI定义在org.apache.dubbo.common.threadpool....

  • Dubbo线程池

    Dubbo的线程模型与线程池策略 Dubbo默认的底层网络通讯使用的是Netty,服务提供方NettyServer...

  • Dubbo-线程池

    一、Dubbo中的线程池 提供了三种线程池的实现: fixed:固定大小的线程池,启动时建立,并且不会关闭,这也是...

  • [Dubbo]基础组件之ThreadPool

    介绍 ThreadPool 我们在开发过程中经常使用,java线程池的相关知识见线程池相关文章dubbo也不例外会...

  • Dubbo-Client线程池机制

    DUbbo-Client默认的线程池是CachedThreadPool 实际过程中可能会导致client线程堆积 ...

  • 03-Dubbo支持的协议

    1.Dubbo 1.1 Dubbo官方推荐的协议.1.2 本质:使用NIO和线程池进行处理.1.3 缺点:大文件传...

  • dubbo线程池exhausted

    在项目开发中使用dubbo的场景很多, 如果运气好, 会遇到以下异常 Request processing fai...

  • 23.Dubbo线程池策略

    此处的线程池就是上一章说的业务线程池,Dubbo对此也提供了一个ThreadPool的扩展SPI接口 FixedT...

网友评论

      本文标题:dubbo线程池

      本文链接:https://www.haomeiwen.com/subject/xqsbhltx.html