类型
dubbo的线程池类型通过SPI定义在org.apache.dubbo.common.threadpool.ThreadPool文件里
总共有以下4类
- fixed(默认)
org.apache.dubbo.common.threadpool.support.fixed.FixedThreadPool
复用固定数量的线程
public Executor getExecutor(URL url) {
//threadname, Dubbo
String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
//threads, 200
int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
//queues, 0
int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
- cached
org.apache.dubbo.common.threadpool.support.cached.CachedThreadPool
默认线程1分钟后就会被回收,新线程会随着请求的到来被创建
public Executor getExecutor(URL url) {
//默认Dubbo
String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
//默认0
int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE);
//默认0
int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
//默认60000,1分钟
int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE);
return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
- limited
org.apache.dubbo.common.threadpool.support.limited.LimitedThreadPool
需要的时候就创建线程,直到达到上限。线程池不会自动收缩
public Executor getExecutor(URL url) {
String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
- eager
org.apache.dubbo.common.threadpool.support.eager.EagerThreadPool
核心线程都在使用的时候,对于新任务创建新线程,而不是放入阻塞队列
public Executor getExecutor(URL url) {
String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE);
int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE);
// init queue and executor
TaskQueue<Runnable> taskQueue = new TaskQueue<Runnable>(queues <= 0 ? 1 : queues);
EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores,
threads,
alive,
TimeUnit.MILLISECONDS,
taskQueue,
new NamedInternalThreadFactory(name, true),
new AbortPolicyWithReport(name, url));
taskQueue.setExecutor(executor);
return executor;
}
EagerThreadPoolExecutor核心过程
记录提交的任务数,如果执行抛出RejectedExecutionException会重新放入队列进行执行
public void execute(Runnable command) {
if (command == null) {
throw new NullPointerException();
}
// do not increment in method beforeExecute!
submittedTaskCount.incrementAndGet();
try {
super.execute(command);
} catch (RejectedExecutionException rx) {
// retry to offer the task into queue.
final TaskQueue queue = (TaskQueue) super.getQueue();
try {
if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException("Queue capacity is full.", rx);
}
} catch (InterruptedException x) {
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException(x);
}
} catch (Throwable t) {
// decrease any way
submittedTaskCount.decrementAndGet();
throw t;
}
}
TaskQueue的核心过程
public boolean offer(Runnable runnable) {
if (executor == null) {
throw new RejectedExecutionException("The task queue does not have executor!");
}
int currentPoolThreadSize = executor.getPoolSize();
// have free worker. put task into queue to let the worker deal with task.
// 已提交任务数<当前线程池大小,说明有空闲的worker,直接放入队列让worker处理任务
if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
return super.offer(runnable);
}
// return false to let executor create new worker.
// 当前线程池大小< 最大线程池大小,直接返回false,让线程池创建新的worker
if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
return false;
}
// currentPoolThreadSize >= max
return super.offer(runnable);
}
初始化过程
- NettyClient(消费端)或NettyServer(服务端)初始化后,会初始化线程池。
通过调用DefaultExecutorRepository#createExecutorIfAbsent。
下面两张图分别为NettyClient和NettyServer初始化时候的设值


- 消费端
默认采用cached方式
AbstractClient类:
//String THREADPOOL_KEY = "threadpool";
//String DEFAULT_CLIENT_THREADPOOL = "cached";
private void initExecutor(URL url) {
url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME);
url = url.addParameterIfAbsent(THREADPOOL_KEY, DEFAULT_CLIENT_THREADPOOL);
executor = executorRepository.createExecutorIfAbsent(url);
}
- 服务端
默认采用fixed方式
从SPI注解上获取到类型
@SPI("fixed")
public interface ThreadPool {
/**
* Thread pool
*
* @param url URL contains thread parameter
* @return thread pool
*/
@Adaptive({THREADPOOL_KEY})
Executor getExecutor(URL url);
}
DefaultExecutorRepository#createExecutor
会动态生成ThreadPool$Adaptive对象,默认获取SPI注解上的参数
private ExecutorService createExecutor(URL url) {
return (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
}
网友评论