目录
从零实现ImageLoader(一)—— 架构
从零实现ImageLoader(二)—— 基本实现
从零实现ImageLoader(三)—— 线程池详解
从零实现ImageLoader(四)—— Handler的内心独白
从零实现ImageLoader(五)—— 内存缓存LruCache
从零实现ImageLoader(六)—— 磁盘缓存DiskLruCache
异步加载
既然是异步加载那新开线程自然是必不可少的。一个线程怎么样?这种情况下图片得一个一个依次加载,效率未免太低了。那每张图片新开一个线程怎么样?在图片过多的情况下,线程数量也会迅速随之增长,系统资源消耗太多严重,也不能接受。这时候就是线程池ExecutorService
这个线程管理工具登场的时候了。
public class Dispatcher {
private final String mUrl;
private final ExecutorService mExecutorService;
public Dispatcher(String url, ExecutorService executorService) {
mUrl = url;
mExecutorService = executorService;
}
public void into(ImageView imageView) {
mExecutorService.execute(() -> {
try {
Bitmap image = get();
//这一句将代码切换到主线程,下一篇文章再详细解释
ImageLoader.HANDLER.post(() -> imageView.setImageBitmap(image));
} catch (IOException e) {
e.printStackTrace();
}
});
}
...
}
ImageLoader
类负责线程池的创建:
public class ImageLoader {
...
private static final int MAX_THREAD_NUM = 3;
private final ExecutorService mExecutorService;
private ImageLoader(Context context) {
//防止单例持有Activity的Context导致内存泄露
mContext = context.getApplicationContext();
mExecutorService = Executors.newFixedThreadPool(MAX_THREAD_NUM);
}
public Dispatcher load(String url) {
return new Dispatcher(url, mExecutorService);
}
}
这样异步加载就实现完成了,测试一下:
public class MainActivity extends Activity {
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
ImageView imageView = findViewById(R.id.image);
ImageLoader.with(this)
.load("https://i.redd.it/20mplvimm8ez.jpg")
.into(imageView);
}
}
效果图
线程池使用
当然我们今天的重点不在异步加载,而是在线程池上。
Executors
我们平时使用线程池只需要调用Executors.new**ThreadPool()
方法,甚至都不需要关心创建的类是什么。那今天就从Executors
入手去探寻线程池的庐山真面目:
public class Executors {
...
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
}
可以看到,平时使用最频繁的这几个方法基本都是直接创建了ThreadPoolExecutor
类,只是参数有所不同。唯一比较特殊的ScheduledThreadPoolExecutor
也继承自ThreadPoolExecutor
。
ThreadPoolExecutor构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
-
corePoolSize
:线程池的核心线程数。当线程数小于corePoolSize
时会创建一个线程去执行任务,当线程数达到corePoolSize
时会将任务放入等待队列。如果没有手动调用核心线程超时,这些线程在创建后会一直存在。 -
maximumPoolSize
:线程池允许创建的最大线程数。在等待队列添加满之后,线程池会创建临时线程用来处理任务,临时队列在超时后会自动结束。而临时线程与核心线程的总数不能超过maximumPoolSize
。 -
keepAliveTime
:临时线程超时时间。 -
unit
:keepAliveTime
时间单位。 -
workQueue
:线程池的等待队列。当线程数达到corePoolSize
时会任务会被放入该等待队列。 -
threadFactory
:线程工厂。用于创建新线程。 -
handler
:饱和处理策略。当线程池关闭或者线程数达到maximumPoolSize
时,任务被放入该handler
。
在看完上面的一系列参数,可能还是一脸懵逼。其实大家可以线程池当做一个工厂,这个工厂负责对一些半成品进行加工,而核心线程就是这个工厂的工人。
每个工人同时只能处理一个半成品,多余的半成品就被放入仓库,等哪个工人处理完了手头的半成品再来仓库取,这个仓库就是等待队列。
但是仓库也有限制,随着半成品越来越多仓库也放不下了,这时候工厂就请来一些临时工来帮忙,等工厂的任务轻了之后再请他们回去,这些临时工就是临时线程。
可是工厂的资金也是有限的不能同时请太多的工人,这个资金限制就是maximumPoolSize,而这些既没有工人处理,仓库又放不下的半成品就要想个办法处理了。是直接把它们丢掉?还是退回给半成品厂商?这就是饱和策略需要决定的了。
等待队列
- 同步移交队列:任务不在队列中存储,而是直接交给工作线程。这时可以使用
SynchronousQueue
实现,该队列保证在插入时必须有另一个线程在等待获取,如果没有则插入失败。Executors.newCachedThreadPool()
使用的就是该队列。 - 无界队列:例如无界
LinkedBlockingQueue
。ExecutorService.newFixedThreadPool
和ExecutorService.newSingleThreadExecutor
使用的就是该队列。 - 有界队列:例如有界
LinkedBlockingQueue
和ArrayBlockingQueue
。有界队列避免了无界队列无限制的增加导致资源耗尽的问题。
饱和策略
这里很明显是策略模式,ThreadPoolExecutor
给我们提供了四个已经实现好的饱和策略,不过我们也可以选择自己实现:
-
AbortPolicy
:抛出RejectedExecutionException
。 -
CallerRunsPolicy
:将任务放到调用execute()
所在线程执行,也就是直接调用任务的run()
方法。 -
DiscardPolicy
:直接丢弃任务,不做任何处理。 -
DiscardOldestPolicy
:将等待队列头部的任务删除,再重新执行此任务。
套路
说了这么多,那到底应该怎么选择呢?其实只要大致遵循一个规律,如果是计算密集型的任务,线程池的大小设为CPU的数目加1通常是最优的,而如果是I/O密集型的任务就可以设置的大一些,比如2倍的CPU的数目。当然,具体的数目就要在运行过程中慢慢调试了。
线程池原理
讲完了线程池的使用,接下来就是线程池的原理了。这次的分析都基于Android 7.1.1的源码,其他版本的可能会在细节上有一些差异,不过大的方向不会有问题。
类结构
在了解ThreadPoolExecutor
的实现之前我们首先对类的继承结构要有一个整体的把握:
Executor
是Java提供的用于简化线程管理的接口,用户只需通过execute()
方法传入Runnable
的实现,由Executor
决定使用哪个线程处理同时负责线程的创建、运行和关闭。
ExecutorService
,这个是我们平时使用线程池最用的了,在Executor
的基础上又加入了submit()
、shutdown()
等等一些方便用户自主管理任务的方法。
AbstractExecutorService
类实现了submit()
等一系列方法,不过最主要的execute()
依然留给了子类也就是今天的主角ThreadPoolExecutor
去实现。
概览
ThreadPoolExecutor
实际上使用的是生产者/消费者模型,在分析具体的代码之前我们先看一下这个流程图,有一个大概的印象。
execute()
关于execute()
的处理过程,Java源码有很详细的注释,这里我把它翻译为中午供大家参考。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* 三步走:
*
* 1. 如果当前运行的线程少于核心线程数,尝试开启一个线程并将command作为
* 它的第一个任务(作者注:这里的第一个任务在后面会有所解释)。调用
* runWorker通过原子性的方式重新检查线程池运行状态和工作线程数,如果
* 不能添加线程则返回false
*
* 2. 如果任务可以成功入列,我们依然要再次检查线程池是否已经关闭以及
* 是否需要添加一个新线程(因为存在一种情况是在上次检查之后所有的线程都已经
* 死光光了(作者注:至于为什么必须保证至少一个线程存活,我们在后面的
* runWorker方法中会找到答案))。如果线程池已经关闭,则将之前加入
* 队列的command弹出;如果已经没有线程存活,则添加一个新线程。
*
* 3. 如果不能将任务入列,我们会尝试添加一个新线程。如果失败了,要不
* 就是线程池已经关闭了,要不就是线程已经饱和了(作者注:线程数达到最大值),
* 这时候我们就将这个任务加入饱和策略。
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
这里有一点需要注意的是,这个ctl
变量的含义。其实ctl
就是一个保存了线程池运行状态以及线程数的原子整形变量:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
它的高3位存储线程池运行状态,即RUNNING
、SHUTDOWN
、STOP
、TIDYING
、TERMINATED
五个状态,低位存储运行的线程数,可以用isRunning()
判断线程池是否处于运行状态,用workerCountOf
获取运行的线程数。这里的ctl
的用法非常巧妙,强烈推荐大家去看一下源码,这里由于篇幅所限就不再多说了。
addWorker()
private boolean addWorker(Runnable firstTask, boolean core) {
// 这一段for循环用来将ctl的值加1,如果线程池关闭或者线程数量
// 达到限制,则直接返回false。
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 判断线程是否关闭或者已经没有需要执行的任务
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 根据core判断当前线程数量是否已经达到限制
// 如果core为true,则线程数不能大于核心线程数
// 否则不能大于最大线程数
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 将ctl的值加1,如果成功则跳出外循环
if (compareAndIncrementWorkerCount(c))
break retry;
// 判断再次期间线程池状态是否已经发生改变,如果是则
// 重新开始外循环,否则在内循环中再次尝试对ctl值加1
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 在得到主锁mainLock之后再次检查线程池的状态
// 如果已经关闭则不再添加
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // 检查线程t能否启动
throw new IllegalThreadStateException();
// 将线程t加入线程集合
workers.add(w);
// 更新目前为止线程最多时达到的数目,与最大线程数
// 无关,调试用
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 正式开始运行线程t
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
addWorker()
方法很有意思,它用一个core
参数来区分是添加核心线程还是添加临时线程,一个方法可以有不同的功能。这也是为什么上面的流程图里,添加核心线程和临时线程的箭头上只有一个addWorker
方法。
而addWorker()
添加线程的逻辑可以分为四步:
- 确保线程数不超过限制,并将
ctl
的计数加1。 - 将
firstTask
封装为Worker
。 - 将
Worker
加入线程集合workers
。 - 启动
Worker
的线程。
有人已经注意到addWorker()
的参数名有点奇怪,明明只添加了一个任务为什么要叫firstTask
呢?在addWorker()
的代码里firstTask
传入了Worker
的构造器,后面一系列操作就都是相对Worker
执行的,那Worker
又对firstTask
做了什么?
Worker
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker. */
public void run() {
runWorker(this);
}
...
}
可以看到Worker
也继承了Runnable
接口,在构造方法里Worker
通过ThreadFactory
新开了一个线程,而传入的Runnable
却是自己,所以之前addWorker()
里的代码t.start()
最终执行的将会是Worker
的run()
方法,也就是runWorker()
。
主循环runWorker
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
这里的逻辑初看起来可能是一头雾水,其实很简单。重点是这里的while循环,首先会判断firstTask
是否为空,如果不为空则分四步:
- 调用
beforeExecute()
方法。 - 调用
task
的run()
方法。 - 调用
afterExecute()
方法。 - 将
task
赋值为空。
下一次循环task
必定为空,于是执行task = getTask()
,这条语句是将Runnable
任务从等待队列workQueue
里取出来赋值给task
,于是再次执行上面四步,直到线程池关闭或者等待超时。
每个Worker
创建的线程在执行完属于自己的任务后,还会继续执行等待队列中的任务,所以这个firstTask
也可以当做每个线程的启动任务,这就是它为什么被叫做firstTask
的原因,也是runWorker
方法为什么被称为主循环的原因,线程池的设计者巧妙的用这一方法实现了线程的复用。
这也解答了之前的许多疑问:
- 为什么没有专门处理的等待队列的线程?原因就在于每个线程都是处理等待队列的线程。
- 为什么在
execute()
方法中将任务加入等待队列时,必须保证至少有一个线程存活?这是为了确保存在存活线程去执行等待队列中的任务。
总结
我们这次实现了图片的异步加载,不过将重点放在了线程池的使用及其原理上,设计者的各种巧思也是让我们叹为观止,大家如果有空可以自己尝试看一下源码,一定不会让你们失望。下一篇文章我们将要讲解的是Handler,敬请期待。
网友评论