首先在使用线程池的时候有今个问题:
Q1:什么是多线程?
Q2:为什么要用到线程池?
Q3:线程池那么多参数都有什么用?
Q4:当我们提交一个任务的时候到底发生了什么?
Q5:线程池里的任务是怎么执行的?
Q6:阻塞队列到底有什么用?
什么是多线程,为什么要用到多线程
当任务不是CPU密集型操作时,比如是网络、数据库等IO密集型操作,单个线程会阻塞很长时间导致CPU等待时间的浪费,在多线程模型下CPU可以在这段时间进行切换去做其他的任务,可以最大程度发挥CPU性能,在多核的设备下这种效果更明显。
当然在Android中是不允许操作阻塞主线程的,这也就强制开发者要用多线程技术。
为什么要用到线程池?
线程的时间包括创建时间+运行时间+销毁时间
而线程池有什么作用的,线程池顾名思义就是放线程的池子,里面放了一条条线程,当有任务来的时候就去池子里拿一条现成的线程直接执行任务,这就节省了创建和销毁时间。
线程池的参数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
- corePoolSize 核心线程数,一旦开启就存活的线程,可以设置超时参数
- maximumPoolSize 最大线程数,核心线程数+非核心线程数
- keepAliveTime 线程空闲时间,超过这个时间线程被回收
- unit 时间单位
- workQueue 保存任务的队列
- threadFactory 线程工厂
- handler 线程被拒绝的处理者
创建线程池,参考AsyncTask中的线程池:
private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
private static final int KEEP_ALIVE_SECONDS = 30;
private static final ThreadFactory sThreadFactory = new ThreadFactory() {
private final AtomicInteger mCount = new AtomicInteger(1);
public Thread newThread(Runnable r) {
return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
}
};
private static final BlockingQueue<Runnable> sPoolWorkQueue =
new LinkedBlockingQueue<Runnable>(128);
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
sPoolWorkQueue, sThreadFactory);
threadPoolExecutor.allowCoreThreadTimeOut(true);
这里用到了默认的handler:
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
所以在拒绝任务的时候会抛异常,所以当线程池不够大时要注意这个参数。
通常结合具体的使用场景corePoolSize,maximumPoolSize,workQueue会使用不同的参数,下面先介绍任务被加入的过程,才能更好理解这些参数。
任务被加入的过程
有几个重要的变量
- ctl 是一个32位的整数,高三位存储线程的状态,低29位表示工作线程数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
private static final int COUNT_BITS = Integer.SIZE - 3;
CAPACITY 高三位为0,后29位为1
怎么计算线程池的状态:
private static int runStateOf(int c) { return c & ~CAPACITY; }
返回的是c的高3位
计算工作线程数量也一样
private static int workerCountOf(int c) { return c & CAPACITY; }
返回c的低29位
线程池的状态
线程池的状态
- Running :就是正在运行,线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理
- SHUTDOWN:线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务,调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN
- STOP:线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) -> STOP
- TIDYING:当所有的任务已终止,ctl记录的"任务数量"为0,线程池会变为TIDYING状态
- TERMINATED:线程池彻底终止,就变成TERMINATED状态
任务加入的过程
任务加入过程提交任务有两种方式,第一种方式是
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
当执行了execute(Runnable r)方法之后,先判断当前线程的数量是否小于核心线程数,如果是就直接新建一个线程去执行,如果核心线程已满或者上一步新建出错,就将任务加入任务队列,如果加入队列成功就再次检查当前的线程池状态,因为加入队列的时候线程池的状态可能改变了,如果不在运行就移除任务执行拒绝策略,如果没有工作线程就开启一个线程;如果加入队列不成功,说明队列已满,直接开启新线程去执行任务,如果再开启不了新线程,就说明线程池关闭了或者线程满了,执行拒绝策略。
过程很复杂,总的来说就是有任务先判断核心线程是否已满,如果没有就放到核心线程,如果满了就添加进队列,如果队列满了就创建非核心线程,如果都满了就拒绝,当有线程空闲下来,就去队列里面取任务去执行。
线程池为什么要设计一个阻塞队列,而且只有队列满了才开启非核心线程呢?
主要是因为我们很多时候不希望一有任务来就开启非核心线程,而是先让任务排队,等对拍满了,才会迫不得已开启非核心线程,这样能最大程度节约资源。
线程池中的线程是怎么运行的
创建线程用到了
private boolean addWorker(Runnable firstTask, boolean core)
首先判断运行状态,然后更新工作线程数
然后创建一个Worker,执行真正的任务
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
那么加入任务队列的任务怎么执行的
在Worker真正执行的时候会一直在这里循环
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
当线程执行完一个任务就将task==null
在getTask中会去workQueue取任务,直到线程被中断抛出异常或者没有任务执行
try {
// 如果可以超时回收的话就调用workQueue的阻塞方法,在超时之后会返回null,worker的循环结束,然后被回收。
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
在线程池中有个很重要的参数就是阻塞队列,以LinkedBlockingQueue为例
提供了三个阻塞方法put,take,poll(long timeout, TimeUnit unit)
其实阻塞队列就是为生产者消费者模型服务的,当队列不为满的时候唤醒生产者,当队列不为空的时候唤醒消费者,所以利用阻塞队列可以很容易的写出一个生产者消费者模型。
在线程池主要是用到了后两个方法,为了超时机制,当我们有需要去写一个超时机制的时候可以借鉴一下阻塞队列。
怎么用线程池?
Java提供了几中常用的线程池
FixedThreadPool : 固定个数的线程池,阻塞队列大小无限,所以用不着非核心线程
CachedThreadPool:只有非核心的线程池,SynchronousQueue是一个空队列,只有当生产者和消费者同时放和取时才起作用,缺少任意一个就会阻塞,就是一旦有线程空下来去队列里取任务的时候,该队列会把到来的任务马上交给该线程,这样做的目的就是可以马上执行任务而且可以充分利用空闲线程
ScheduledThreadPool:核心线程数调用者指定,执行定时任务的线程池,DelayedWorkQueue无限大,非核心线程无效
SingleThreadPool:单个线程的线程池,保证任务顺序执行
关于核心线程数最大线程数以及阻塞队列的大小该怎么设置,有名人说当是CPU密集型任务就是计算较多的情况下线程数应该尽量少一点,如果是IO密集型线程数应该大一点,对客户端来说大部分都是IO密集型,所以线程数大一点没关系,推荐使用上面的cachedThreasPool,OKHttp就是用的这个线程池。
网友评论