美文网首页
Android 线程池ThreadPoolExecutor详解

Android 线程池ThreadPoolExecutor详解

作者: Bfmall | 来源:发表于2021-04-07 10:20 被阅读0次

    前言
    多线程并发是我们在开发中经常遇到的问题,提及线程池,首先我们得了解线程的相关知识。关于线程的详情介绍本文就不提及了,有不太清楚的朋友可以自行查找相关资料,下面简要概述一下进程和线程的概念,为后续内容(线程池)做铺垫。

    进程:
    每个app运行时前首先创建一个进程,该进程是由Zygote fork出来的,用于承载App上运行的各种Activity/Service等组件。
    进程对于上层应用来说是完全透明的,这也是google有意为之,让App程序都是运行在Android Runtime。大多数情况一个App就运行在一个进程中,除非在AndroidManifest.xml中配置Android:process属性,或通过native代码fork进程。

    线程:
    线程对应用来说非常常见,比如每次new Thread().start都会创建一个新的线程。该线程与App所在进程之间资源共享,从Linux角度来说进程与线程除了是否共享资源外,并没有本质的区别,都是一个task_struct结构体,在CPU看来进程或线程无非就是一段可执行的代码,CPU采用CFS调度算法,保证每个task都尽可能公平的享有CPU时间片。

    上述引自 :知乎 Gityuan的回答

    本文就以下几个问题展开讲解:

    1.线程池的基本概念。
    2.采用线程池的优势。
    3.Android 中常用的几种线程池。
    4.如何终止某个线程任务。
    一、关于线程池
    Android中的线程池的概念来源于Java中的Executor,它们的使用基本是一致的。Executor是一个接口,真正的线程池的实现为ThreadPoolExecutor。ThreadPoolExecutor提供了一系列参数来配置线程池,Android中常用的几种线程池都是通过对ThreadPoolExecutor进行不同配置来实现的。

    ThreadPoolExecutor 构造方法
    ThreadPoolExecutor 有多个重载方法,但最终都调用了这个构造方法:

    public ThreadPoolExecutor(int corePoolSize,
                                 int maximumPoolSize,
                                 long keepAliveTime,
                                 TimeUnit unit,
                       BlockingQueue<Runnable> workQueue,
                             ThreadFactory threadFactory)
    

    我们可以看到,这个构造方法里一共有7个参数,其参数的含义如下:

    corePoolSize: 线程池中核心线程的数量,能够同时执行的任务数量。

    maximumPoolSize:除去缓冲队列中等待的任务,最大能容纳的任务数(其实是包括了核心线程池数量)。

    keepAliveTime: 非核心线程超出workQueue的等待任务的存活时间,就是指maximumPoolSize里面的等待任务的存活时间。当系统中非核心线程闲置时间超过keepAliveTime之后,则会被回收。如果ThreadPoolExecutor的allowCoreThreadTimeOut属性设置为true,则该参数也表示核心线程的超时时长。

    unit: keepAliveTime这个参数的单位,有纳秒、微秒、毫秒、秒、分、时、天等。

    workQueue: 阻塞等待线程的队列,一般使用new LinkedBlockingQueue<Runnable>()这个,如果不指定容量,会一直往里边添加,没有限制,workQueue永远不会满,一般选择没有容量上限的队列。该队列主要用来存储已经被提交但是尚未执行的任务。存储在这里的任务是由ThreadPoolExecutor的execute方法提交来的。

    threadFactory: 为线程池提供创建新线程的功能,这个我们一般使用默认即可。

    handler: 当任务数超过maximumPoolSize时,对任务的处理策略,默认策略是拒绝添加
    执行流程:当线程数小于corePoolSize时,每添加一个任务,则立即开启线程执行
    当corePoolSize满的时候,后面添加的任务将放入缓冲队列workQueue等待;
    当workQueue也满的时候,看是否超过maximumPoolSize线程数,如果超过,默认拒绝执行,如果没有超过,则创建线程立即执行。默认情况下,当线程池无法处理新线程时,会抛出一个RejectedExecutionException。

    举例说明:
    假如:corePoolSize=2,maximumPoolSize=3,workQueue容量为8;
    最开始,执行的任务A,B,此时corePoolSize已用完,再次执行任务C,则
    C将被放入缓冲队列workQueue中等待着,如果后来又添加了7个任务,此时workQueue已满,
    则后面再来的任务将会和maximumPoolSize比较,由于maximumPoolSize为3
    因为有2个在corePoolSize中运行了,所以只能容纳1个了,那么会立即创建线程执行。那么后面来的任务默认都会被拒绝--通常都会报异常。

    线程池创建:

    executor = new ThreadPoolExecutor(
                corePoolSize, //3
                maximumPoolSize,//5,当缓冲队列满,但是未达到最大线程数,创建线程立即执行,否则报异常。 
                keepAliveTime,  //最大线程数中的线程执行完后,会继续等待一段时间。
                unit,   //等待时间的单位
                new LinkedBlockingQueue<Runnable>(),//缓冲队列,超出核心线程池的任务会被放入缓存队列中等待
                Executors.defaultThreadFactory(),//创建线程的工厂类
                new ThreadPoolExecutor.AbortPolicy()//当最大线程池也超出的时候,则拒绝执行
                );
    

    两个执行的方法
    ThreadPoolExecutor有两个方法可以供我们执行,分别是submit()和execute(),我们先来看看这两个方法到底有什么差异

    execute()方法源码:

    public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            //获得当前线程的生命周期对应的二进制状态码
            int c = ctl.get();
            //判断当前线程数量是否小于核心线程数量,如果小于就直接创建核心线程执行任务,创建成功直接跳出,失败则接着往下走.
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            //判断线程池是否为RUNNING状态,并且将任务添加至队列中.
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                //审核下线程池的状态,如果不是RUNNING状态,直接移除队列中
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                    //如果当前线程数量为0,则单独创建线程,而不指定任务.
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            //如果不满足上述条件,尝试创建一个非核心线程来执行任务,如果创建失败,调用reject()方法.
            else if (!addWorker(command, false))
                reject(command);
        }
    

    submit()方法源码:

    public <T> Future<T> submit(Callable<T> task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task);
            //还是通过调用execute
            execute(ftask);
            //最后会将包装好的Runable返回
            return ftask;
        }
    
    //将Callable<T> 包装进FutureTask中
        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            return new FutureTask<T>(callable);
        }
    
    //可以看出FutureTask也是实现Runnable接口,因为RunableFuture本身就继承了Runnabel接口
    public class FutureTask<V> implements RunnableFuture<V> {
        .......
    }
    
    public interface RunnableFuture<V> extends Runnable, Future<V> {
        /**
         * Sets this Future to the result of its computation
         * unless it has been cancelled.
         */
        void run();
    }
    

    从上面两个方法的源码我们可以分析出几个结论,

    submit()其实还是需要调用execute()去执行任务的,不同是submit()将包装好的任务进行了返回,他会返回一个Future对象。
    从execute()方法中,不难看出addWorker()方法, 是创建线程(核心线程,非核心线程)的主要方法,而reject()方法为线程创建失败的回调。
    所以,通常情况下,在不需要线程执行返回结果值时,我们使用execute 方法。 而当我们需要返回值时,则使用submit方法,他会返回一个Future对象。Future不仅仅可以获得一个结果,他还可以被取消,我们可以通过调用future的cancel()方法,取消一个Future的执行。 比如我们加入了一个线程,但是在这过程中我们又想中断它,则可通过sumbit 来实现。

    二、采用线程池的优势?

    1. 避免线程频繁创建消毁。
      虽然采用Thread 创建线程可以实现耗时操作,但线程的大量创建和销毁,会造成过大的性能开销。

    2.避免系统资源紧张。
    当大量的线程一起运作的时候,可能会造成资源紧张,上面也介绍过线程底层的机制就是切分CPU的时间,而大量的线程同时存在时可能造成互相抢占资源的现象发生,从而导致阻塞的现象。

    3.更好地管理线程。
    以下载功能为例,一般情况下,会有限制最大并发下载数目,而利用线程池我们可以灵活根据实际需求来设置同时下载的最大量、串行执行下载任务顺序、实现队列等待等功能。

    三、Android 中常用的几种线程池。
    3.1 FixedThreadPool
    它的源码如下:

    public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
    

    从源码我们可以看出两个特征:

    1.它只有一个传入参数,即固定核心线程数

    它只提供了一个nThreads,供外部传入进来,并且它的核心线程数和最大线程数是一样的。这说明在FixedThreadPool中没有非核心线程,所有的线程都是核心线程。

    1. 线程的超时时间为0。

    这说明核心线程即使在没有任务可执行的时候,也不会被销毁,这样可让FixedThreadPool更快速的响应请求。最后的线程队列是一个LinkedBlockingQueue,但是LinkedBlockingQueue却没有参数,这说明线程队列的大小为Integer.MAX_VALUE(2^31 - 1)

    从以上源码参数我们对FixedThreadPool的工作特点应该也有大体的理解了,接下来我们继续分析其他几个线程池。

    3.2 SingleThreadExecutor
    它的源码如下:

    public static ExecutorService newSingleThreadExecutor() {  
        return new FinalizableDelegatedExecutorService  
            (new ThreadPoolExecutor(1, 1,  
                                    0L, TimeUnit.MILLISECONDS,  
                                    new LinkedBlockingQueue<Runnable>()));  
    
    

    从源码我们可以很容易发现 SingleThreadExecutor和FixedThreadPool很像,不同的是SingleThreadExecutor的核心线程数只有1, 也就是只能同时有一个线程被执行。使用它可以避免我们处理线程同步问题。
    打个比喻,它就类似于排队买票,一次只能同时处理一个人的事务。其实如果我们把FixedThreadPool的参数传为1,就和SingleThreadExecutor的作用一致了。

    3.3 CachedThreadPool
    它的源码如下:

    public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
    

    从源码可以看到,CachedThreadPool中是没有核心线程的,但是它的最大线程数却为Integer.MAX_VALUE,另外,CachedThreadPool是有线程超时机制的,它的超时时间为60秒。

    由于最大线程数为无限大,所以每当添加一个新任务进来的时候,如果线程池中有空闲的线程,则由该空闲的线程执行新任务;如果没有空闲线程,则创建新线程来执行任务。

    根据CachedThreadPool的特点,在有大量耗时短的任务请求时,可使用CachedThreadPool,因为当CachedThreadPool中没有新任务的时候,它里边所有的线程都会因为60秒超时而被终止。

    3.4 ScheduledThreadPool
    它的源码如下:

    public ScheduledThreadPoolExecutor(int corePoolSize) {  
        super(corePoolSize, Integer.MAX_VALUE,  
              DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,  
              new DelayedWorkQueue());  
    } 
    

    从源码可以看出,它的核心线程数量是固定的,但是非核心线程无穷大。当非核心线程闲置时,则会被立即回收。
    ScheduledThreadPool也是四个当中唯一一个具有定时定期执行任务功能的线程池。它适合执行一些周期性任务或者延时任务。

    延时启动任务示例:

    ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
                Runnable runnable = new Runnable(){
                    @Override
                    public void run() {
                        //TODO method();
                    }
                };
            
            //延迟一秒执行
            scheduledExecutorService.schedule(runnable, 1, TimeUnit.SECONDS);
    

    延时周期启动任务示例:

    ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
                Runnable runnable = new Runnable(){
                    @Override
                    public void run() {
                        //TODO method();
                    }
                };
            //延迟三秒后,执行周期一秒的定时任务
            scheduledExecutorService.scheduleAtFixedRate(runnable, 3, 1, TimeUnit.SECONDS);
    

    四、如何终止线程池中的某个线程任务?
    一般线程执行完run方法之后,线程就正常结束了,因此有如下几种方式来实现:

    4.1 利用 Future 和 Callable。
    步骤:

    实现 Callable 接口
    调用 pool.submit() 方法,返回 Future 对象
    用 Future 对象来获取线程的状态。

    private void cancelAThread() {
            ExecutorService pool = Executors.newFixedThreadPool(2);
              
              Callable<String> callable = new Callable<String>() {
                  
                @Override
                public String call() throws Exception {
                    System.out.println("test");
                    return "true";
                }
            };
              
            Future<String> f = pool.submit(callable);
              
            System.out.println(f.isCancelled());
            System.out.println(f.isDone());
            f.cancel(true);
      
        }
    

    关于 Future 和 Callable 的介绍,推荐看这篇文章,内容很详细: 《Android并发编程之白话文详解Future,FutureTask和Callable》

    4.2 利用 volatile 关键字,设置退出flag, 用于终止线程。

    public class ThreadSafe extends Thread {
        public volatile boolean isCancel = false; 
            public void run() { 
            while (!isCancel){
               //TODO method(); 
            }
        } 
    }
    

    4.3 interrupt()方法终止线程,并捕获异常。

    public class ThreadSafe extends Thread {
      
       @Override
        public void run() { 
            while (!isInterrupted()){ //非阻塞过程中通过判断中断标志来退出
                try{
                  //TODO method(); 
                  //阻塞过程捕获中断异常来退出
                }catch(InterruptedException e){
                    e.printStackTrace();
                    break;//捕获到异常之后,执行break跳出循环。
                }
            }
        } 
    }
    

    参考文献:

    书籍 -《Android 开发艺术探索》(作者:任玉刚)
    CSDN 博客 - 《Android并发编程之白话文详解Future,FutureTask和Callable》
    CSDN 博客 - 《深入理解在Android中线程池的使用》

    相关文章

      网友评论

          本文标题:Android 线程池ThreadPoolExecutor详解

          本文链接:https://www.haomeiwen.com/subject/gyhekltx.html