美文网首页
Android线程池浅析

Android线程池浅析

作者: 落叶的位置丶 | 来源:发表于2017-05-10 18:29 被阅读0次

    引言

    在Android开发中,只要是耗时的操作都需要开启一个线程来执行。例如网络访问必须放到子线程中执行,否则会抛异常(NetworkOnMainThreadException),这样做的目的也是为了防止用户在主线程中做耗时操作,这样很容易引起ANR。那么有了线程为什么还需要线程池呢?

    线程的创建过程分为3步

    • 创建线程 T1
    • 执行线程 T2
    • 销毁线程 T2

    线程创建的总时间T=T1+T2+T3。 可以看出T1,T3是多线程本身的带来的开销,我们渴望减少T1,T3所用的时间,从而减少T的时间。但一些线程的使用者并没有注意到这一点,所以在程序中频繁的创建或销毁线程,这导致T1和T3在T中占有相当比例。显然这是突出了线程的弱点(T1,T3),而不是优点(并发性)。

    合理利用线程池能够带来三个好处。

    1. 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
    2. 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
    3. 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。但是要做到合理的利用线程池,必须对其原理了如指掌。

    线程池的使用

    首先我们来看下Java中线程池的使用。

    Executor

    Java中线程池基于Executor接口。Executor 接口中定义了一个方法 void execute(Runnable command),该方法接收一个 Runable 实例,它用来执行一个任务,任务即一个实现了 Runnable 接口的类。

    ExecutorService

    ExecutorService 继承自 Executor 接口,它提供了更丰富的实现多线程的方法。

    • shutdown()方法来平滑地关闭 ExecutorService,调用该方法后,将导致 ExecutorService 停止接受任何新的任务且等待已经提交的任务执行完成(已经提交的任务会分两类:一类是已经在执行的,另一类是还没有开始执行的),当所有已经提交的任务执行完毕后将会关闭 ExecutorService。因此我们一般用该接口来实现和管理多线程。
    • submit(Runnable task)方法与Executor的execute()方法效果一样,但是submit有返回值,如果我希望task执行完后,每个task告诉我它的执行结果,是成功还是失败,如果是失败,原因是什么,这时候就需要用到submit,通过返回的Future来获取到结果

    ThreadPoolExecutor

    ThreadPoolExecutor 是正真线程池的实现类,我们创建线程的时候一般也是使用这个类。先看下构造方法

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
    
    • corePoolSize 线程池的核心线程数,也可以理解为最小线程数,默认情况下,核心线程在线程池中是一直存活的,即使当前执行的线程数量小于核心线程数量,所有的核心线程均会被创建出来。如果将ThreadPoolExecutor的allowCoreThreadTimeOut设置为ture,那么核心线程就会有超时策略。
    • maximumPoolSize 线程池所能容纳的最大线程数,包括核心和非核心线程,当活动线程到达最大值,后续的任务就会被阻塞。
    • keepAliveTime 非核心线程闲置时的超时时长,当非核心线程空闲时间超过这个值,就会被终止。如果ThreadPoolExecutor的allowCoreThreadTimeOut设置为true,这个时间同样会作用在核心线程上;
    • unit 时间单位
    • workQueue 线程池中的任务队列,所有提交的Runable对象会存储在这个参数中
    • threadFactory 创建新线程时使用的factory
    • handler 当task出现异常时,会通过这个handler通知外界

    那么线程池针对不同的线程数量又是怎么执行的呢,主要可以分为以下几种情况

    • 线程数 < corePoolSize 所有核心线程都会用来处理任务。
    • 线程数 = corePoolSize 缓冲队列workQueue未满,任务被放入缓冲队列。
    • corePoolSize < 线程数 < maximumPoolSize 缓冲队列workQueue满,创建新的线程来处理被添加的任务。
    • corePoolSize < maximumPoolSize < 线程数 缓冲队列workQueue满,通过 handler所指定的策略来处理此任务。

    处理任务的优先级为:核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。 handler又有以下四种选择

    • CallerRunsPolicy 再次添加该task并执行execute方法
    • AbortPolicy 放弃该task并抛出RejectedExecutionException
    • DiscardPolicy 放弃当前task
    • DiscardOldestPolicy 放弃等待时间最久的task

    线程池的进一步封装

    看了ThreadPoolExecutor的简单介绍是不是觉得使用过程比较复杂呢,没关系,Java针对这一点还可以通过Executors的工厂方法配置,handler均使用的是defaultHandler即AbortPolicy只要线程数量大于核心线程数就会抛出异常需要自行处理。主要有以下五种:

    • newFixedThreadPool
    public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
    }
    

    核心线程数=最大线程数,并且没有超时策略。也就是说该线程池没有空闲状态,能最快速度响应。

    • newSingleThreadExecutor
    public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
    }
    

    核心线程数=最大线程数=1,没有超时策略。所有任务在一个线程中处理,不存在同步问题。handler为AbortPolicy。

    • newCachedThreadPool
    public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
    }
    

    核心线程=0 ,最大线程=Integer.MAX_VALUE,这就意味着没有线程数量的限制,超时时间为60毫秒。new SynchronousQueue<Runnable>这个任务队列比较特殊,简单理解为一个无法存储的任务队列。也就是说,当有新的任务到来,如果有空闲的线程,则将任务给空闲的线程处理,否则直接创建一个新的线程来处理任务。也就是说一旦有任务来就会立刻执行,但是消耗较大,所以比较适合用来处理大量的耗时较短的任务。

    • newScheduledThreadPool
    private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE,DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
                new DelayedWorkQueue());
    }
    public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new DelegatedScheduledExecutorService
                (new ScheduledThreadPoolExecutor(1));
    }
    

    核心线程为自定义 ,最大线程=Integer.MAX_VALUE,超时时间为10毫秒,也就是说一旦线程空闲便立即回收。适用于定时任务与周期性任务。

    • newWorkStealingPool (Java8中新引入)
    public static ExecutorService newWorkStealingPool() {
            return new ForkJoinPool
                (Runtime.getRuntime().availableProcessors(),
                 ForkJoinPool.defaultForkJoinWorkerThreadFactory,
                 null, true);
    }
    

    这个线程池和其它四个不太一样,是Java8中新引入的。线程数默认为主机CPU的可用核心数,且会动态的增加与减少,提交的任务执行顺序并不能得到保证。

    Android中的线程池

    了解了Java提供的线程池我们就来看下Android中是怎样使用线程池的。

    AsyncTask是Android中异步处理的轻量级框架,其中的实现就是一个线程池。

    public AsyncTask() {
        mWorker = new WorkerRunnable<Params, Result>() {
            public Result call() throws Exception {
                mTaskInvoked.set(true);
                Result result = null;
                try {
                    Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
                    //noinspection unchecked
                    result = doInBackground(mParams);
                    Binder.flushPendingCommands();
                } catch (Throwable tr) {
                    mCancelled.set(true);
                    throw tr;
                } finally {
                    postResult(result);
                }
                return result;
            }
        };
    
        mFuture = new FutureTask<Result>(mWorker) {
            @Override
            protected void done() {
                try {
                    postResultIfNotInvoked(get());
                } catch (InterruptedException e) {
                    android.util.Log.w(LOG_TAG, e);
                } catch (ExecutionException e) {
                    throw new RuntimeException("An error occurred while executing doInBackground()",
                            e.getCause());
                } catch (CancellationException e) {
                    postResultIfNotInvoked(null);
                }
            }
        };
    }
    

    在构造方法中初始化了两个变量mWorker与mFuture,这两个变量很重要

    • mWorker是一个WorkerRunnable
    private static abstract class WorkerRunnable<Params, Result> implements Callable<Result> {
        Params[] mParams;
    }
    

    WorkerRunnable是一个实现了Callable接口的类。

    public interface Callable<V> {
        V call() throws Exception;
    }
    

    Callable接口里面只定义了一个call方法,返回一个泛型对象。那么Callable到底有什么用呢?线程无论继承Thread类还是实现Runnable方法,执行完任务以后都无法直接返回结果。而Callable接口就弥补了这个缺陷,当call方法执行完毕以后会返回一个泛型对象。WorkerRunnable实现了Callable接口,也就是说我们可以调用mWorker.call()来获取返回的结果

    • mFuture是一个FutureTask
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }
    

    参数是一个Callable,也就是mWorker。FutureTask实现了RunnableFuture接口。

    public interface RunnableFuture<V> extends Runnable, Future<V> {
        void run();
    }
    

    这个接口又继承了Runable与Future<V>。
    Future<V>接口主要是针对Runnable和Callable任务的。
    提供了三种功能:

    1. 判断任务是否完成 boolean isDone()
    2. 能够中断任务 boolean cancel(boolean mayInterruptIfRunning);
    3. 能够获取任务执行的结果 V get();

    也就是说FutureTask既能够被线程执行,又能提供线程执行任务后返回的结果。

    看完了构造方法我们从execute()入手

    public final AsyncTask<Params, Progress, Result> execute(Params... params)
    {
        return executeOnExecutor(sDefaultExecutor, params);
    }
    

    在执行execute方法时传入了初始化时的参数与sDefaultExecutor

    public static final Executor SERIAL_EXECUTOR = new SerialExecutor();
    private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;
    private static class SerialExecutor implements Executor {
        final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
        Runnable mActive;
    
        public synchronized void execute(final Runnable r) {
            mTasks.offer(new Runnable() {
                public void run() {
                    try {
                        r.run();
                    } finally {
                        scheduleNext();
                    }
                }
            });
            if (mActive == null) {
                scheduleNext();
            }
        }
    
        protected synchronized void scheduleNext() {
            if ((mActive = mTasks.poll()) != null) {
                THREAD_POOL_EXECUTOR.execute(mActive);
            }
        }
    }
    

    sDefaultExecutor就是一个实现了Executor的基本类,execute中,在一个双端队列中不断的插入Runable对象。ArrayDeque的内部是使用数组形式来实现双端队列的,我们知道队列是FIFO的,只能在队头删除元素,队尾添加元素,而双端队列是在队头和队尾都能够删除和添加元素。需要注意的是ArrayDeque没有容量的限制,队列满了以后会自动进行扩充。

    THREAD_POOL_EXECUTOR则是AsyncTask中的关键线程池

    private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
    private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
    private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
    private static final int KEEP_ALIVE_SECONDS = 30;
    
    private static final ThreadFactory sThreadFactory = new ThreadFactory() {
            private final AtomicInteger mCount = new AtomicInteger(1);
    
            public Thread newThread(Runnable r) {
                return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
            }
        };
    
    public static final Executor THREAD_POOL_EXECUTOR;
    
    private static final BlockingQueue<Runnable> sPoolWorkQueue =
                new LinkedBlockingQueue<Runnable>(128);
                
    static {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
                sPoolWorkQueue, sThreadFactory);
        threadPoolExecutor.allowCoreThreadTimeOut(true);
        THREAD_POOL_EXECUTOR = threadPoolExecutor;
    }
    

    AsyncTask在静态代码块中初始化了一个ThreadPoolExecutor,核心线程数=[2,4/CPU核心数-1],最大线程数=CPU核心数*2+1,超时时间为30秒,队列长度为128,handler为AbortPolicy。并且设置了allowCoreThreadTimeOut(true),即核心线程超时可被回收。

    public final AsyncTask<Params, Progress, Result> execute(Params... params) {
            return executeOnExecutor(sDefaultExecutor, params);
    }
    
    public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
                Params... params) {
        if (mStatus != Status.PENDING) {
            switch (mStatus) {
                case RUNNING:
                    throw new IllegalStateException("Cannot execute task:"
                            + " the task is already running.");
                case FINISHED:
                    throw new IllegalStateException("Cannot execute task:"
                            + " the task has already been executed "
                            + "(a task can be executed only once)");
            }
        }
    
        mStatus = Status.RUNNING;
    
        onPreExecute();
    
        mWorker.mParams = params;
        exec.execute(mFuture);
    
        return this;
    }
    

    在Asynck调用execute()时,mWorker获取到参数,sDefaultExecutor开始执行execute方法。

    public synchronized void execute(final Runnable r) {
        mTasks.offer(new Runnable() {
            public void run() {
                try {
                    r.run();
                } finally {
                    scheduleNext();
                }
            }
        });
        if (mActive == null) {
            scheduleNext();
        }
    }
    
    protected synchronized void scheduleNext() {
        if ((mActive = mTasks.poll()) != null) {
            THREAD_POOL_EXECUTOR.execute(mActive);
        }
    }
    

    回过来看,第一次执行时,mActive=null这时候便会执行scheduleNext(),当队列中有数据的时候便会把数据取出来放进线程池中执行。在执行THREAD_POOL_EXECUTOR.execute(mActive)时,因为mFuture实现了Runable接口,这个时候便会调用run方法

    public void run() {
        if (state != NEW ||
            !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
    

    其中的callable就是mWorker,此时会调用mWorker的call()方法

    mWorker = new WorkerRunnable<Params, Result>() {
        public Result call() throws Exception {
            mTaskInvoked.set(true);
    
            Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
            //noinspection unchecked
            return postResult(doInBackground(mParams));
        }
    };
    

    这个就是在构造方法中创建的,在call中实现了doInBackground方法,并把返回值给了postResult,postResult中的逻辑就是获取一个Message,然后发送该Message给Handler处理。

    简单回顾下,AsyncTask每次将doInBackground()中的操作添加到线程池中执行,执行完后由handler传递数据。

    总结

    在Android开发中对于线程池的使用可能仅仅只是一些异步的框架中有封装,我们很少真正使用,仅仅new一个Thread,对于一些频繁的请求与周期性的操作不如尝试试用下线程池。

    举个例子:
    我们需要每隔10秒进行某个操作,这时你可能会考虑用一个定时器,每次开一个线程去请求,这样T1与T2的时间开销是很大的,此时使用线程池便能大大优化性能。

    如有错误的地方欢迎大家留言指正探讨。

    相关文章

      网友评论

          本文标题:Android线程池浅析

          本文链接:https://www.haomeiwen.com/subject/jxigtxtx.html