美文网首页Android开发经验谈Android知识程序员
从零实现ImageLoader(三)—— 线程池详解

从零实现ImageLoader(三)—— 线程池详解

作者: GavinLi369 | 来源:发表于2017-08-23 19:04 被阅读364次

目录

从零实现ImageLoader(一)—— 架构
从零实现ImageLoader(二)—— 基本实现
从零实现ImageLoader(三)—— 线程池详解
从零实现ImageLoader(四)—— Handler的内心独白
从零实现ImageLoader(五)—— 内存缓存LruCache
从零实现ImageLoader(六)—— 磁盘缓存DiskLruCache

异步加载

既然是异步加载那新开线程自然是必不可少的。一个线程怎么样?这种情况下图片得一个一个依次加载,效率未免太低了。那每张图片新开一个线程怎么样?在图片过多的情况下,线程数量也会迅速随之增长,系统资源消耗太多严重,也不能接受。这时候就是线程池ExecutorService这个线程管理工具登场的时候了。

public class Dispatcher {
    private final String mUrl;
    private final ExecutorService mExecutorService;

    public Dispatcher(String url, ExecutorService executorService) {
        mUrl = url;
        mExecutorService = executorService;
    }

    public void into(ImageView imageView) {
        mExecutorService.execute(() -> {
            try {
                Bitmap image = get();
                //这一句将代码切换到主线程,下一篇文章再详细解释
                ImageLoader.HANDLER.post(() -> imageView.setImageBitmap(image));
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
    }

    ...
}

ImageLoader类负责线程池的创建:

public class ImageLoader {
    ...
    
    private static final int MAX_THREAD_NUM = 3;
    private final ExecutorService mExecutorService;

    private ImageLoader(Context context) {
        //防止单例持有Activity的Context导致内存泄露
        mContext = context.getApplicationContext();
        mExecutorService = Executors.newFixedThreadPool(MAX_THREAD_NUM);
    }

    public Dispatcher load(String url) {
        return new Dispatcher(url, mExecutorService);
    }
}

这样异步加载就实现完成了,测试一下:

public class MainActivity extends Activity {
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        ImageView imageView = findViewById(R.id.image);
        ImageLoader.with(this)
                .load("https://i.redd.it/20mplvimm8ez.jpg")
                .into(imageView);
    }
}
效果图

线程池使用

当然我们今天的重点不在异步加载,而是在线程池上。

Executors

我们平时使用线程池只需要调用Executors.new**ThreadPool()方法,甚至都不需要关心创建的类是什么。那今天就从Executors入手去探寻线程池的庐山真面目:

public class Executors {
    ...

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
}

可以看到,平时使用最频繁的这几个方法基本都是直接创建了ThreadPoolExecutor类,只是参数有所不同。唯一比较特殊的ScheduledThreadPoolExecutor也继承自ThreadPoolExecutor

ThreadPoolExecutor构造方法

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
  • corePoolSize:线程池的核心线程数。当线程数小于corePoolSize时会创建一个线程去执行任务,当线程数达到corePoolSize时会将任务放入等待队列。如果没有手动调用核心线程超时,这些线程在创建后会一直存在。
  • maximumPoolSize:线程池允许创建的最大线程数。在等待队列添加满之后,线程池会创建临时线程用来处理任务,临时队列在超时后会自动结束。而临时线程与核心线程的总数不能超过maximumPoolSize
  • keepAliveTime:临时线程超时时间。
  • unitkeepAliveTime时间单位。
  • workQueue:线程池的等待队列。当线程数达到corePoolSize时会任务会被放入该等待队列。
  • threadFactory:线程工厂。用于创建新线程。
  • handler:饱和处理策略。当线程池关闭或者线程数达到maximumPoolSize时,任务被放入该handler

在看完上面的一系列参数,可能还是一脸懵逼。其实大家可以线程池当做一个工厂,这个工厂负责对一些半成品进行加工,而核心线程就是这个工厂的工人

每个工人同时只能处理一个半成品,多余的半成品就被放入仓库,等哪个工人处理完了手头的半成品再来仓库取,这个仓库就是等待队列

但是仓库也有限制,随着半成品越来越多仓库也放不下了,这时候工厂就请来一些临时工来帮忙,等工厂的任务轻了之后再请他们回去,这些临时工就是临时线程

可是工厂的资金也是有限的不能同时请太多的工人,这个资金限制就是maximumPoolSize,而这些既没有工人处理,仓库又放不下的半成品就要想个办法处理了。是直接把它们丢掉?还是退回给半成品厂商?这就是饱和策略需要决定的了。

等待队列

  • 同步移交队列:任务不在队列中存储,而是直接交给工作线程。这时可以使用SynchronousQueue实现,该队列保证在插入时必须有另一个线程在等待获取,如果没有则插入失败。Executors.newCachedThreadPool()使用的就是该队列。
  • 无界队列:例如无界LinkedBlockingQueueExecutorService.newFixedThreadPoolExecutorService.newSingleThreadExecutor使用的就是该队列。
  • 有界队列:例如有界LinkedBlockingQueueArrayBlockingQueue。有界队列避免了无界队列无限制的增加导致资源耗尽的问题。

饱和策略

这里很明显是策略模式,ThreadPoolExecutor给我们提供了四个已经实现好的饱和策略,不过我们也可以选择自己实现:

  • AbortPolicy:抛出RejectedExecutionException
  • CallerRunsPolicy:将任务放到调用execute()所在线程执行,也就是直接调用任务的run()方法。
  • DiscardPolicy:直接丢弃任务,不做任何处理。
  • DiscardOldestPolicy:将等待队列头部的任务删除,再重新执行此任务。

套路

说了这么多,那到底应该怎么选择呢?其实只要大致遵循一个规律,如果是计算密集型的任务,线程池的大小设为CPU的数目加1通常是最优的,而如果是I/O密集型的任务就可以设置的大一些,比如2倍的CPU的数目。当然,具体的数目就要在运行过程中慢慢调试了。

线程池原理

讲完了线程池的使用,接下来就是线程池的原理了。这次的分析都基于Android 7.1.1的源码,其他版本的可能会在细节上有一些差异,不过大的方向不会有问题。

类结构

在了解ThreadPoolExecutor的实现之前我们首先对类的继承结构要有一个整体的把握:

类结构

Executor是Java提供的用于简化线程管理的接口,用户只需通过execute()方法传入Runnable的实现,由Executor决定使用哪个线程处理同时负责线程的创建、运行和关闭。

ExecutorService,这个是我们平时使用线程池最用的了,在Executor的基础上又加入了submit()shutdown()等等一些方便用户自主管理任务的方法。

AbstractExecutorService类实现了submit()等一系列方法,不过最主要的execute()依然留给了子类也就是今天的主角ThreadPoolExecutor去实现。

概览

ThreadPoolExecutor实际上使用的是生产者/消费者模型,在分析具体的代码之前我们先看一下这个流程图,有一个大概的印象。

流程图

execute()

关于execute()的处理过程,Java源码有很详细的注释,这里我把它翻译为中午供大家参考。

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * 三步走:
         *
         * 1. 如果当前运行的线程少于核心线程数,尝试开启一个线程并将command作为
         * 它的第一个任务(作者注:这里的第一个任务在后面会有所解释)。调用
         * runWorker通过原子性的方式重新检查线程池运行状态和工作线程数,如果
         * 不能添加线程则返回false
         *
         * 2. 如果任务可以成功入列,我们依然要再次检查线程池是否已经关闭以及
         * 是否需要添加一个新线程(因为存在一种情况是在上次检查之后所有的线程都已经
         * 死光光了(作者注:至于为什么必须保证至少一个线程存活,我们在后面的
         * runWorker方法中会找到答案))。如果线程池已经关闭,则将之前加入
         * 队列的command弹出;如果已经没有线程存活,则添加一个新线程。
         *
         * 3. 如果不能将任务入列,我们会尝试添加一个新线程。如果失败了,要不
         * 就是线程池已经关闭了,要不就是线程已经饱和了(作者注:线程数达到最大值),
         * 这时候我们就将这个任务加入饱和策略。
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

这里有一点需要注意的是,这个ctl变量的含义。其实ctl就是一个保存了线程池运行状态以及线程数的原子整形变量:

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

它的高3位存储线程池运行状态,即RUNNINGSHUTDOWNSTOPTIDYINGTERMINATED五个状态,低位存储运行的线程数,可以用isRunning()判断线程池是否处于运行状态,用workerCountOf获取运行的线程数。这里的ctl的用法非常巧妙,强烈推荐大家去看一下源码,这里由于篇幅所限就不再多说了。

addWorker()

    private boolean addWorker(Runnable firstTask, boolean core) {
        // 这一段for循环用来将ctl的值加1,如果线程池关闭或者线程数量
        // 达到限制,则直接返回false。
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 判断线程是否关闭或者已经没有需要执行的任务
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                // 根据core判断当前线程数量是否已经达到限制
                // 如果core为true,则线程数不能大于核心线程数
                // 否则不能大于最大线程数
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 将ctl的值加1,如果成功则跳出外循环
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                // 判断再次期间线程池状态是否已经发生改变,如果是则
                // 重新开始外循环,否则在内循环中再次尝试对ctl值加1
                c = ctl.get(); 
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // 在得到主锁mainLock之后再次检查线程池的状态
                    // 如果已经关闭则不再添加
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // 检查线程t能否启动
                            throw new IllegalThreadStateException();
                        // 将线程t加入线程集合
                        workers.add(w);
                        // 更新目前为止线程最多时达到的数目,与最大线程数
                        // 无关,调试用
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    // 正式开始运行线程t
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

addWorker()方法很有意思,它用一个core参数来区分是添加核心线程还是添加临时线程,一个方法可以有不同的功能。这也是为什么上面的流程图里,添加核心线程和临时线程的箭头上只有一个addWorker方法。

addWorker()添加线程的逻辑可以分为四步:

  1. 确保线程数不超过限制,并将ctl的计数加1。
  2. firstTask封装为Worker
  3. Worker加入线程集合workers
  4. 启动Worker的线程。

有人已经注意到addWorker()的参数名有点奇怪,明明只添加了一个任务为什么要叫firstTask呢?在addWorker()的代码里firstTask传入了Worker的构造器,后面一系列操作就都是相对Worker执行的,那Worker又对firstTask做了什么?

Worker

    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;

        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker. */
        public void run() {
            runWorker(this);
        }

        ...
    }

可以看到Worker也继承了Runnable接口,在构造方法里Worker通过ThreadFactory新开了一个线程,而传入的Runnable却是自己,所以之前addWorker()里的代码t.start()最终执行的将会是Workerrun()方法,也就是runWorker()

主循环runWorker

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

这里的逻辑初看起来可能是一头雾水,其实很简单。重点是这里的while循环,首先会判断firstTask是否为空,如果不为空则分四步:

  1. 调用beforeExecute()方法。
  2. 调用taskrun()方法。
  3. 调用afterExecute()方法。
  4. task赋值为空。

下一次循环task必定为空,于是执行task = getTask(),这条语句是将Runnable任务从等待队列workQueue里取出来赋值给task,于是再次执行上面四步,直到线程池关闭或者等待超时。

每个Worker创建的线程在执行完属于自己的任务后,还会继续执行等待队列中的任务,所以这个firstTask也可以当做每个线程的启动任务,这就是它为什么被叫做firstTask的原因,也是runWorker方法为什么被称为主循环的原因,线程池的设计者巧妙的用这一方法实现了线程的复用。

这也解答了之前的许多疑问:

  • 为什么没有专门处理的等待队列的线程?原因就在于每个线程都是处理等待队列的线程。
  • 为什么在execute()方法中将任务加入等待队列时,必须保证至少有一个线程存活?这是为了确保存在存活线程去执行等待队列中的任务。

总结

我们这次实现了图片的异步加载,不过将重点放在了线程池的使用及其原理上,设计者的各种巧思也是让我们叹为观止,大家如果有空可以自己尝试看一下源码,一定不会让你们失望。下一篇文章我们将要讲解的是Handler,敬请期待。

相关文章

网友评论

    本文标题:从零实现ImageLoader(三)—— 线程池详解

    本文链接:https://www.haomeiwen.com/subject/igzkdxtx.html