简介
随着硬件的不断升级,多线程处理能力越来越强,很多时候如果不能合理利用多线程处理反而是对于性能的一种浪费,所以在开发中应该适当的利用这种能力。
public class ThreadPoolExecutor extends AbstractExecutorService {
以下代码均来源Android的API25的源码
通过ThreadPoolExecutor的注释可以大致了解线程池的作用,平常在使用的过程中可能直接new Thread()也可以实现在子线程运行任务,但是通过ThreadPoolExecutor可以做到线程的管理和复用,从而做到资源的节省和有效利用,应该是使用多线程的不二选择。
介绍
/**
* 通过指定的参数建立新的线程池
*
* @param corePoolSize 除非设置allowCoreThreadTimeOut,
* 否则线程池中会一直保留这个数量的线程,哪怕这些线程处于空闲状态
* @param maximumPoolSize 线程池中允许的最大线程数量,必须大于corePoolSize
* @param keepAliveTime 线程池中的线程数量已经大于corePoolSize,并且在线程池terminating之前,
* 那些超过corePoolSize的线程的最大空闲时间,用于等待新的任务,超过会被回收
* @param unit keepAliveTime参数的单位
* @param workQueue 用于持有在那些还没有执行的任务的队列,该队列只能持有通过execute方法传入的Runnable任务
* @param threadFactory 线程池用于创建线程的工厂
* @param handler 线程池中的线程达到 maximumPoolSize,并且workQueue已经满了的情况下,用于在还有新任务进入的时候的回调
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
接下来重点了解一下核心参数的意义,从中可以了解线程池的基本运作原理
corePoolSize/maximumPoolSize
corePoolSize:线程池中核心线程的数量。
maximumPoolSize:线程池中最大可容纳线程的数量,必须大于等于corePoolSize。
结合上面两个参数介绍一下,当一个新的Runnable任务通过execute进入线程池中执行的时候
1.如果当前线程池中的线程数目还没有达到corePoolSize,不管是否有其它空闲的线程,都会直接新建线程执行用于执行。
2.如果当前线程池中的线程数量已经大于等于corePoolSize,但是小于maximumPoolSize,此时只会在workQueue已经满了的情况下直接新建线程执行,否则会将当前任务放入workQueue中等待下次执行。
/**
* 设置核心线程数
*/
public void setCorePoolSize(int corePoolSize) {
if (corePoolSize < 0)
throw new IllegalArgumentException();
int delta = corePoolSize - this.corePoolSize;//计算当前核心线程数的变化
this.corePoolSize = corePoolSize;//设置新的核心线程数
if (workerCountOf(ctl.get()) > corePoolSize)//当前线程池中的线程大于核心线程数
interruptIdleWorkers();//回收那些空闲的线程
else if (delta > 0) {//当前核心线程数增加
// delta为当前想要增加的数量,workQueue.size()为当前等待中的任务
// 获得当前可以尝试开始执行的任务的数量
int k = Math.min(delta, workQueue.size());
while (k-- > 0 && addWorker(null, true)) {//尝试添加任务
if (workQueue.isEmpty())
break;
}
}
}
/**
* 设置最大线程数
*/
public void setMaximumPoolSize(int maximumPoolSize) {
//最大线程数必须比之前的核心线程数大,同时修改核心线程和最大线程的时候需要注意
//最好先修改核心线程数目
if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
throw new IllegalArgumentException();
this.maximumPoolSize = maximumPoolSize;
if (workerCountOf(ctl.get()) > maximumPoolSize)//当前线程池中的线程大于最大线程数
interruptIdleWorkers();//回收那些空闲的线程
}
可以通过setCorePoolSize和setMaximumPoolSize修改线程池中的配置,这个在Android中常用于网络状态的变化从而修改线程池的能力,比方说弱网状态下因为请求慢,实际上并不需要那么高的并发能力,同时可以回收一些空闲线程(可以参考Picasso的处理)
/**
* 尝试开启一个核心线程
* @return true表示线程开启成功,否则失败
*/
public boolean prestartCoreThread() {
//当前线程池中的线程少于核心线程数,尝试开启核心线程,但是没有实际运行的任务
return workerCountOf(ctl.get()) < corePoolSize &&
addWorker(null, true);
}
/**
* 尽可能的开启核心线程
* @return 实际开启的核心线程数量
*/
public int prestartAllCoreThreads() {
int n = 0;
while (addWorker(null, true))//尽可能的开启核心线程
++n;
return n;
}
虽然线程池在执行线程的时候会自动管理核心线程,但是有的时候可能构建线程池的时候queue中已经有任务,此时可以手动开启核心线程,此时会自动执行queue中的任务。
keepAliveTime/unit
keepAliveTime:如果线程池中的线程数目大于核心线程数目,超过核心线程数目的那些线程如果空闲超过keepAliveTime指定的时间,那么这些线程会被回收。
unit :keepAliveTime的单位,最终在ThreadPoolExecutor中都会转为nano来计算。
/**
* 设置超过核心线程数的那些线程的最大存活时间(空闲时间)
* 超过之后线程会被回收
*/
public void setKeepAliveTime(long time, TimeUnit unit) {
if (time < 0)
throw new IllegalArgumentException();
if (time == 0 && allowsCoreThreadTimeOut())
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
long keepAliveTime = unit.toNanos(time);//转换为纳秒
long delta = keepAliveTime - this.keepAliveTime;//计算当前存活时间和之前存活时间的差值
this.keepAliveTime = keepAliveTime;//设置新的存活时间
//存活时间变少了
if (delta < 0)
interruptIdleWorkers();//将所有空闲的线程都回收了
}
/**
* 设置核心线程是否允许超时回收策略
* @since 1.6
*/
public void allowCoreThreadTimeOut(boolean value) {
if (value && keepAliveTime <= 0)
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
if (value != allowCoreThreadTimeOut) {
allowCoreThreadTimeOut = value;
if (value)//允许超时回收,先回收所有空闲的线程
interruptIdleWorkers();
}
}
注意到其实核心线程也可以设置超时回收机制,有的时候如果确认当前线程池会在一段比较长的时间之后才会重新使用,可以考虑回收核心线程。
workQueue
工作队列的使用有以下几种情况:
1.如果线程池中的线程数目小于核心线程数,则execute之后会先直接新建线程执行,而不会放入queue中。
2.如果线程池中的线程数目大于核心线程数,则execute之后会优先将任务放入到queue中。
3.如果queue中无法再放入任务,会尝试新建线程直接执行任务,但是此时如果达到maximumPoolSize,此时该任务会执行拒绝策略。
常用的典型队列有三种:
1.SynchronousQueue:同步队列,这种队列在插入的时候需要等待之前的数据出去,同一时刻队列中只有一个数据,这就意味这新任务尝试进入队列的时候,都会失败,此时会尝试立刻新建一个线程执行任务,所以说这种一般需要配合足够大的maximumPoolSize,从而避免大量的任务被拒绝。这种队列可以用于尽可能的利用线程执行,从而不阻塞任务。
2.LinkedBlockingQueue:没有大小限制的队列,这意味着很难出现任务进入队列失败的情况,也就不会立刻新建线程执行,优先会将任务存在队列当中等待执行,这样的线程池中也就不会创建超过corePoolSize数量的线程,线程数目比较固定。
3.ArrayBlockingQueue:有大小限制的队列,这意味着在大小限制之内,任务会等待执行,超过之后会新建线程执行。这种情况下需要合理的设置队列的大小和maximumPoolSize。小的队列加上大的maximumPoolSize,会导致CPU非常的频繁,这样会导致一些需要快速计算的任务的效率下降。大的队列加上小的maximumPoolSize,这种情况不适宜有阻塞性操作,因为CPU利用率有限,也会导致效率下降。
ThreadFactory
用于创建新的线程
/**
* ThreadPoolExecutor默认使用的线程工厂
*/
private static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();//Android默认为null
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);//指定了相同的线程组
if (t.isDaemon())
t.setDaemon(false);//默认都是非守护线程
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);//默认优先级为中等
return t;
}
}
一般来说不需要自己复写,除非需要指定线程的名称优先级之类的。
RejectedExecutionHandler
拒绝策略,前面提到,如果在线程池线程已经达到maximumPoolSize,而且当前有新的任务到来,但是queue无法容纳,此时会采用拒绝策略。ThreadPoolExecute提供了几个默认策略:
1.AbortPolicy:直接抛出RejectedExecutionException异常。
2.CallerRunsPolicy:如果当前线程池没有shutDown,则直接在execute的调用线程中执行任务。
3.DiscardPolicy:什么都不做。
4.DiscardOldestPolicy:如果当前线程池没有shutDown,尝试将poll队列的第一个任务(相当于丢弃),然后重新execute的当前任务。
默认的线程池
实际上Executors提供了一些常用的线程池,一般来说可以直接使用
/**
* 核心线程就是最大线程数
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
/**
* 有且只有一个线程工作的线程池
*/
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
/**
* 没有核心线程池,任务来了就执行,每一个线程都有60s的空闲时间
*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
//通过DelayedWorkQueue实现延时从队列中获取任务,起到延时执行的任务
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}
1.newFixedThreadPool:核心线程和最大线程一致,这意味着线程池中最多只有核心线程数目的线程,多余的任务会被放到队列当中,等待执行。一般可用于CPU资源有限的场景,比方说Android手机CPU核心数目有限,有的相对耗时的任务可以通过这种方式处理。
2.newSingleThreadExecutor:只有一个线程工作的线程池,这也意味着串行执行。
3.newCachedThreadPool:没有核心线程,但是最大线程数很大,一有任务到来,如果有线程闲着,直接执行,否则新建线程执行。
4.newScheduledThreadPool:指定核心线程数目,这个一般用于计划任务,比方说3s执行一个任务。
部分代码
看一下ThreadPoolExecutor的一些关键代码
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;//29
private static final int CAPACITY = (1 << COUNT_BITS) - 1;//0001 1111 .... 1111共32位
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;//1010 .... 0000
private static final int SHUTDOWN = 0 << COUNT_BITS;//0000 .... 0000
private static final int STOP = 1 << COUNT_BITS;//0010 .... 0000
private static final int TIDYING = 2 << COUNT_BITS;//0100 .... 0000
private static final int TERMINATED = 3 << COUNT_BITS;//0110 .... 0000
//通过与1111 0000 .... 0000 得到最高的四位,即状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
//通过与0000 1111 .... 1111 得到低28位,即线程数
private static int workerCountOf(int c) { return c & CAPACITY; }
//将高四位状态和低28线程数或运算,组合成一个int值
private static int ctlOf(int rs, int wc) { return rs | wc; }
先看一下基础设计,一个int占4个字节,一共32位。
这里定义了5种状态,在最高位表示即
1111 0000 .... 0000 中四个1的位置是用于表示当前线程池所处状态。
其余28为是用于表示当前线程池运行数量。
Atomic作为一个线程安全的容器,里面组合了当前线程池状态和当前线程数。即
(0000)(线程池状态) (0000 ... 0000)(线程数量)
/**
* 执行某一个任务
* 可能直接执行当前任务,也可能执行队列中第一个元素,也可能被拒绝
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();//获得当前线程池状态和线程数目
if (workerCountOf(c) < corePoolSize) {//线程池中的线程小于核心线程数
if (addWorker(command, true))//尝试直接新建线程执行
return;
c = ctl.get();//当前状态可能改变,重新获取当前线程池状态和线程数目
}
//当前线程池运行中,并且成功往队列中添加任务
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();//重新检查当前线程池状态和线程数目
//当前线程池已经不处于运行状态,此时不应该接收新的任务,并且回调拒绝策略,标示当前任务因为线程池已经停止运行而被拒绝,但是当前任务之前已经成功添加到队列中,所以要重新将当前任务从队列中移除,从而避免SHUTDOWN状态下执行了队列中的当前任务
//移除失败则队列不可能为空,此时还是尝试将队列中的任务执行
//否则terminated线程池,队列会清空,并且线程数被标记0
if (!isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)//当前线程池运行中
addWorker(null, false);//如果当前线程池中有空闲线程,尝试执行队列中第一个元素
} else if (!addWorker(command, false))//当前线程池已经关闭或者当前添加任务到队列失败,尝试将任务执行
reject(command);//此时如果任务执行失败(线程池已经关闭必然失败),采用拒绝策略
}
/**
* 添加任务并且执行
* @param firstTask 当前需要执行的任务,为null的话会尝试执行队列中的第一个元素
* @param core 是否为核心线程,如果是核心线程,要求当前线程数必须小于corePoolSize
* 否则对比maximumPoolSize最大线程数
* @return true表示执行成功,否则失败
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();//获得当前线程池状态和线程数
int rs = runStateOf(c);//获得当前线程池状态
//当前线程池处于SHUTDOWN状态,如果队列中还有任务,则会继续执行任务
//如果当前线程池处于SHUTDOWN之后的状态,那么都不会执行任务
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);//获得当前线程数
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;//根据是否是核心线程,决定是否接收当前任务
//线程数+1,跳出循环,一般来说如果没有多线程导致数据异常都是直接通过
if (compareAndIncrementWorkerCount(c))
break retry;
//可能有其他线程修改了数据
c = ctl.get(); //重新获得当前线程池状态和线程数
if (runStateOf(c) != rs)//当前线程池状态被修改了
continue retry;//回到retry:位置重新执行,主要是重新进行线程池是否应该继续执行等逻辑
//如果其他线程只是修改了线程数目,继续当前循环判断线程数和允许的线程数即可
}
}
//来到这里,意味着要开始执行任务
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//假设之前因为获取锁失败被阻塞了一段时间
//那么这里需要重新获得当前线程池状态和线程数
//获得线程池允许状态
int rs = runStateOf(ctl.get());
//当前线程池运行中
//当前线程池已经SHUTDOWN,但是依然可以执行队列中的元素
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) //线程不应该重复启动
throw new IllegalThreadStateException();
workers.add(w);//添加工作线程到集合中
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;//标记工作线程添加成功
}
} finally {
mainLock.unlock();
}
if (workerAdded) {//工作线程成功添加
t.start();//执行当前线程
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
/**
* 新建Worker,会通过ThreadFactory创建一个新的线程
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
/**
* 运行指定线程
* @param w
*/
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//先尝试执行指定的任务,否则不断的从队列中取任务来执行
//实际上可以看到,只要当前线程没有被interrupted,实际上就是一直在复用
//getTask中处理线程的回收机制
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
//一旦立刻这个循环,线程就会被回收
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
/**
* 尝试从队列中获取任务
*/
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 当前线程池已停止或者队列以空,当前线程不需要继续执行
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 当前线程是否会超时
//如果核心线程被标记会超时,那么所有线程都会超时
//如果核心线程不会超时,但是当前线程池中线程数量大于核心线程数,那么直到核心线程的数量为止,要尝试回收那些多余线程
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//当前任务不能多于最大线程数
//当前任务开始计时,并且已经等待队列keepAliveTime,但是依然没有获取到任务,超时返回
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))//回收当前线程
return null;
continue;
}
try {
//这里是处理超时的核心
//如果当前线程会超时,那么通过阻塞队列的等待机制获取任务,如果在指定的时间内没有获取到任务,那么任务为空,如果这个时间内有任务进入队列,阻塞会被唤醒,此时获取到最新的任务并且返回到runWorker中去执行
//如果当前线程不会超时,直接获取一次任务,如果获取成功则直接回到runWorker中继续循环,否则继续循环尝试获取数据(但是不会返回null,从而使得线程执行完毕,不可复用)
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;//等待空闲时间还是没有获得新的任务,标记超时,下一次循环会跳出
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
线程在工作的时候会不停地从队列中获取任务来执行,从而起到复用的作用。通过阻塞队列的等待特性来实现空闲时间后跳出循环,从而回收(结束)当前线程。
总结
实际开发中应该根据情况决定使用哪一类线程池
比方说一些执行快而且执行频繁的可以采用Executors.newCachedThreadPool()
实际开发中在采用Executors.newFixedThreadPool()这种线程池的时候,可能还是要考虑一下队列最大数量(不应该无上限),免得造成内存溢出
串行执行可以使用Executors.newSingleThreadExecutor()
总的来说,在一个会较多依赖多线程的工程下,还是尽可能的使用线程池来管理。
网友评论