FixedThreadPool
public Executor getExecutor(URL url) {
//获取指定的线程名称
String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
//获取指定的线程数量
int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
//获取queue来判定创建阻塞队列
int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
//通过ThreadPoolExecutor来创建线程池
return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
FixThreadPool内部是通过ThreadPoolExecutor来创建线程,核心线程数和最大线程数都是上下文中指定的线程数量threads,因为不存在空闲线程所以keepAliveTime为0,
当queues=0,创建SynchronousQueue阻塞队列;
当queues<0,创建无界的阻塞队列LinkedBlockingQueue;
当queues>0,创建有界的阻塞队列LinkedBlockingQueue。
采用dubbo自己实现的线程工厂NamedInternalThreadFactory,将线程置为守护线程(Demon)
拒绝策略为AbortPolicyWithReport,策略为将调用时的堆栈信息保存到本地文件中,并抛出异常RejectedExecutionException
CachedThreadPool
public Executor getExecutor(URL url) {
String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE);
int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE);
return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
CachedThreadPool与FixedThreadPool的区别是核心线程数和最大线程数不相等,通过alive来控制空闲线程的释放
LimitedThreadPool
public Executor getExecutor(URL url) {
String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
LimitedThreadPool与CachedThreadPool的区别是空闲线程的超时时间为Long.MAX_VALUE,相当于线程数量不会动态变化了,创建的线程不会被释放。
EagerThreadPool
public Executor getExecutor(URL url) {
String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE);
int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE);
TaskQueue<Runnable> taskQueue = new TaskQueue<Runnable>(queues <= 0 ? 1 : queues);
EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores,
threads,
alive,
TimeUnit.MILLISECONDS,
taskQueue,
new NamedInternalThreadFactory(name, true),
new AbortPolicyWithReport(name, url));
taskQueue.setExecutor(executor);
return executor;
}
与上述三种线程池不同,EagerThreadPool并非通过JUC中的ThreadPoolExecutor来创建线程池,而是通过EagerThreadPoolExecutor来创建线程池,EagerThreadPoolExecutor继承自ThreadPoolExecutor,实现自定义的execute方法,采用的阻塞队列是TaskQueue,TaskQueue继承自LinkedBlockingQueue。
//构造方法
public EagerThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit, TaskQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
//execute方法
public void execute(Runnable command) {
if (command == null) {
throw new NullPointerException();
}
// do not increment in method beforeExecute!
submittedTaskCount.incrementAndGet();
try {
//
super.execute(command);
} catch (RejectedExecutionException rx) {
// retry to offer the task into queue.
final TaskQueue queue = (TaskQueue) super.getQueue();
try {
if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException("Queue capacity is full.", rx);
}
} catch (InterruptedException x) {
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException(x);
}
} catch (Throwable t) {
// decrease any way
submittedTaskCount.decrementAndGet();
throw t;
}
}
execute方法首先调用ThreadPoolExecutor的execute方法,如果执行失败会重新放入TaskQueue进行重试。
实现自定义的ThreadPool
ThreadPool被定义为一个扩展点,如下所示,
@SPI("fixed")
public interface ThreadPool {
/**
* Thread pool
*
* @param url URL contains thread parameter
* @return thread pool
*/
@Adaptive({THREADPOOL_KEY})
Executor getExecutor(URL url);
}
其默认实现是FixedThreadPool,可以通过实现该扩展来实现自定义的线程池策略。
网友评论