之前对于ThreadPoolExecutor的几个参数corePoolSize,maximumPoolSize,keepAliveTime,BlockingQueue的理解是:
corePoolSize:核心线程数
maximumPoolSize:线程池最大线程数,包括coreThread,所以应大于等于corePoolSize
keepAliveTime:允许存活时间,线程池是为了避免线程的频繁创建和销毁,所以当非核心线程没有任务要做时,会等待keepAliveTime时间。如果超过keepAliveTime时间没有任务到来,就销毁。核心线程会一直存在,即使一直没有任务到来。
BlockingQueue:任务队列,当线程池线程数超过maximumPoolSize时,将任务存入BlockingQueue;当BlockingQueue满时,拒绝并丢弃之后来的任
务。如果线程池中线程当前任务执行完毕后,会从BlockingQueue中取任务,继续执行。
最近发现,当线程池线程数超过maximumPoolSize时,将任务存入BlockingQueue理解是错误的。
发现的缘由,用线程池实现上传多张图片功能。想在线程池执行完毕后,不再保留线程,所以将corePoolSize设为0;想最多有两个线程执行,maximumPoolSize设置为2;存活时长设为60;任务多时,能无限存储,BlockingQueue使用LinkedBlockingQueue。代码如下:
ExecutorService executorService = new ThreadPoolExecutor(0, 2,
60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
运行一下,没有问题。过了好久发现,所有的任务都在同一个线程中执行,已经很久了,有坑!
想不明白为什么,只好看ExcutorService的源码。任务是通过executorService.execute()方法执行的,就从execute()方法看起:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { // 1
if (addWorker(command, true)) // 1.5
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) { // 2
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command)) // 2.1
reject(command);
else if (workerCountOf(recheck) == 0) // 2.5
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
在1处,判断当前线程池中的线程数目是否小于核心线程数,如果小于,在1.5处创建核心线程,如果创建成功,就return。注意,此处传入了command,应该是创建完成后就直接执行。
因为我设置到corePoolSize为0,所以1.5不会执行,执行到2。
在2处,判断线程池是否处于running的状态,如果处于running状态,任务是否成功添加到workQueue中!!!
注意了!!!当线程数目大于核心线程数后,就会将任务添加到workQueue中!!!
2处,线程池为running状态,且LinkedWorkQueue能添加无限任务,所以2为true,会在2.1处再次判断线程池是否running,此处为runnig,则走到2.5,判断线程池的数目是否为0,如果为0,则创建非核心线程。注意,此处传入null,应该是在线程执行中,从workQueue中取任务来执行。令人意外的是,如果线程池数目不为0,那么这个方法就结束了!!!只做了将任务添加到workQueue中一件事!
那接着看下工作者线程的执行:
final void runWorker(Worker w) {
// ...省略无关代码
try {
while (task != null || (task = getTask()) != null) { // 1
// ...省略无关代码
try {
task.run(); // 2
// ...省略无关代码
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
} finally {
processWorkerExit(w, completedAbruptly);
}
}
只有两步:1 取任务 2 任务执行, 看getTask()
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take(); // 1
if (r != null)
return r;
timedOut = true;
}
}
从workQueue中取任务执行 那么,回到最初的代码
ExecutorService executorService = new ThreadPoolExecutor(0, 2,
60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
当首次调用execute()方法时,由于核心线程数为0, 所以只创建了一个非核心线程;之后再调用execute()方法,只将runnable放入LinkedBlockingQueue中,然后,一个线程不断得循环,从workQueue中取任务执行,直到没有任务后,这个就会等待60秒,如果60秒内无新任务到来,这个线程就退出了。
所以,我成功得创建了一个单线程的线程池,此时无论传入的maximumPoolSize为多少,都只会创建最多一个线程!
正确的使用方法:
val executorService = Executors.newFixedThreadPool(3)
或者使用RxJava
Observable.from(photos)
.onBackpressureBuffer()
.flatMap({ upload(it) }, 3)
.buffer(photos.size)
.flatMap {
Observable.just(zipPairs(it.toTypedArray()))
}
.subscribeOn(Schedulers.io())
————————————————
版权声明:本文为CSDN博主「ZHxin」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/ZHXLXH/article/details/85342570
网友评论