美文网首页
android AsyncTask 源码分析

android AsyncTask 源码分析

作者: Yapple | 来源:发表于2021-05-27 11:41 被阅读0次
    使用

    AsyncTask的使用不是本文的重点,我相信读者也不是来了解这个的,所以就先简单的介绍下它的使用,帮助大家回忆。
    AsyncTask的实现:

    public class DownloadTask extends AsyncTask<Object, Integer, Boolean> {
    
        /**
         * 在执行任务之前调用,运行在主线程,可用来做布局更新等
         */
        @Override
        protected void onPreExecute() {
            super.onPreExecute();
        }
    
        /**
         * 执行后台任务,运行在子线程
         */
        @Override
        protected Boolean doInBackground(Object... objects) {
            return false;
        }
    
        /**
         * 更新进度,运行在主线程,在doInBackground方法中调用publishProgress(Integer... values)会自动调用该方法
         */
        @Override
        protected void onProgressUpdate(Integer... values) {
            super.onProgressUpdate(values);
        }
    
        /**
         * 后台任务结束后调用,运行在主线程,参数aBoolean是doInBackground()方法的返回值
         */
        @Override
        protected void onPostExecute(Boolean aBoolean) {
            super.onPostExecute(aBoolean);
        }
    
        /**
         * 主动调用cancel()方法,或者doInBackground()方法抛出异常
         */
        @Override
        protected void onCancelled() {
            super.onCancelled();
        }
    }
    

    调用:

        DownloadTask downloadTask = new DownloadTask();
        downloadTask.execute(object1, object2);
        //这里的参数object1, object2最终会传给doInBackground(Object... objects);
    
    源码思路

    先说下整体的实现思路:创建线程池执行耗时任务,通过handler将进度、结果等送回主线程。

    细节方面,我们需要研究下,该线程池是怎样的线程池,它的执行逻辑是怎样的:核心线程数,最大线程数,等待队列,拒绝策略等细节。顺便再了解下上面代码中的onPreExecute(),onProgressUpdate()等生命周期是在什么时候执行的。

    开始研究

    我们就从我们使用的地方开始下手:
    DownloadTask downloadTask = new DownloadTask();
    downloadTask.execute(object1, object2);

    1.下面是其构造方法:
    public AsyncTask(@Nullable Looper callbackLooper) {
            mHandler = callbackLooper == null || callbackLooper == Looper.getMainLooper()
                ? getMainHandler()
                : new Handler(callbackLooper);
    
            mWorker = new WorkerRunnable<Params, Result>() {
                public Result call() throws Exception {
                    //设置标志,表示工作任务开始执行
                    mTaskInvoked.set(true);
                    Result result = null;
                    try {
                        Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
                        //noinspection unchecked
                        result = doInBackground(mParams);
                        Binder.flushPendingCommands();
                    } catch (Throwable tr) {
                        mCancelled.set(true);
                        throw tr;
                    } finally {
                        postResult(result);
                    }
                    return result;
                }
            };
    
            mFuture = new FutureTask<Result>(mWorker) {
                @Override
                protected void done() {
                    try {
                        postResultIfNotInvoked(get());
                    } catch (InterruptedException e) {
                        android.util.Log.w(LOG_TAG, e);
                    } catch (ExecutionException e) {
                        throw new RuntimeException("An error occurred while executing doInBackground()",
                                e.getCause());
                    } catch (CancellationException e) {
                        postResultIfNotInvoked(null);
                    }
                }
            };
        }
    

    结构很简单,一共初始化了三个全局变量mHandler,mWorker,mFuture。

    mHandler,从上面的代码看,是创建了一个绑定callbackLooper的handler,所以讲道理如果callbackLooper是另一个子线程的Looper对象,它也可是实现两个子线程之间的数据交互。但是,由于该构造方法被@hide隐藏了,我们使用的默认无参构造器,最终会使callbackLooper为null,也就是说mHandler赋值为getMainHandler()绑定了主线程。

    下面是该handler实现的handleMessage方法,一个是处理结果,一个是处理进度的,这里就先不展开细说了,后边分析到工作线程执行到各个阶段时再细说。

            @Override
            public void handleMessage(Message msg) {
                AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
                switch (msg.what) {
                    case MESSAGE_POST_RESULT:
                        // There is only one result
                        result.mTask.finish(result.mData[0]);
                        break;
                    case MESSAGE_POST_PROGRESS:
                        result.mTask.onProgressUpdate(result.mData);
                        break;
                }
            }
    

    mWorker,WorkerRunnable是抽象类,implements自Callable接口,对线程池框架比较熟悉的可以知道它将在子线程中调用执行,了解到这点就够了。

    在mWorker实现的call()方法中最主要的一行代码doInBackground(mParams);它在这里执行了我们的工作任务,然后在catch异常的时候设置了标志位:mCancelled.set(true),最终在finally中执行了postResult()方法。postResult()的逻辑:

    private Result postResult(Result result) {
            //getHandler()会返回mHandler,也就是上面介绍的绑定了主线程Looper对象的Handler
            Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,
                    new AsyncTaskResult<Result>(this, result));
            message.sendToTarget();
            return result;
        }
    

    这就和我们上面讲到的handleMessage对Message的处理联系了起来,最终会执行finish()方法,并在这里调用我们实现的onCancelled()或者onPostExecute()生命周期:

    private void finish(Result result) {
            //isCancelled()方法会判断是否是取消导致,
            //比如上面说的执行mWorker.call()时catch异常和我们做活动调用AsyncTask的cancel()方法会使isCancelled()返回ture
            if (isCancelled()) {
                onCancelled(result);
            } else {
                onPostExecute(result);
            }
            mStatus = Status.FINISHED;
        }
    

    WorkerRunnable除了implements Callable接口还添加了个mParams数组,也就是doInBackground(mParams),执行时会传入的参数

    private static abstract class WorkerRunnable<Params, Result> implements Callable<Result> {
            Params[] mParams;
    }
    

    mFuture FutureTask本篇不做深入的了解,它的目的是获取子线程运算结果和提供子线程的中断方法。上面代码对于mFuture的初始化可以看到它将mWorker封装在了内部,所以以后的操作对象都变为了mFuture。mWork只是提供了call()方法(Callable接口)和参数mParams。

    2.execute:

    构造方法就是初始化了三个变量,mHandler,mWorker,mFuture。接下来我们看下execute()执行了哪些步奏:downloadTask.execute(object1, object2);

        public final AsyncTask<Params, Progress, Result> execute(Params... params) {
            //内部调用了executeOnExecutor方法
            return executeOnExecutor(sDefaultExecutor, params);
        }
        public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
                Params... params) {
            if (mStatus != Status.PENDING) {
                switch (mStatus) {
                    case RUNNING:
                        throw new IllegalStateException("Cannot execute task:"
                                + " the task is already running.");
                    case FINISHED:
                        throw new IllegalStateException("Cannot execute task:"
                                + " the task has already been executed "
                                + "(a task can be executed only once)");
                }
            }
    
            mStatus = Status.RUNNING;
    
            onPreExecute();
            //在这里将mWorker的mParams变量赋值
            mWorker.mParams = params;
            exec.execute(mFuture);
    
            return this;
        }
    

    可以看到execute()就是调用了executeOnExecutor()方法。executeOnExecutor()的前部分就是判断了当前AsyncTask的状态只有在Status.PENDING的状态才可以正常运行,并且在运行后立即将状态改为了Status.RUNNING,也就是说同一个AsyncTask对象只能execute()一次。

    接着就是调用了onPreExecute();也就是我们执行任务前在主线程做一些初始化的操作,比如加载界面。

    最后最重要的工作exec.execute(mFuture)来执行我们的工作任务。毋庸置疑,该方法最终会执行到mWorker的call()方法。我们现在要研究的就是这个Executor对象(即调用executeOnExecutor()时传入的sDefaultExecutor),它是怎样的线程池,我们传入的FutureTask对象将被如何调用。

    sDefaultExecutor

    终于我们要开始研究AsyncTask的核心内容了sDefaultExecutor(其实不是)。为什么说不是呢,因为sDefaultExecutor只是一个任务安排者,并不真正的创建线程执行任务。看下sDefaultExecutor的源码:

        /**
        *sDefaultExecutor最终会指向SerialExecutor对象
        **/
        private static class SerialExecutor implements Executor {
            final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
            Runnable mActive;
            //SerialExecutor并没有任何地方创建线程,execute也只是将任务放进ArrayDeque队列中
            //之前我们传入的是FutureTask对象,而这里需要的参数是Runnable对象是因为,    
            //FutureTask也实现了Runnable接口,并将Callable接口(即mWorker)的call()方法在run()方法中调用
            public synchronized void execute(final Runnable r) {
                mTasks.offer(new Runnable() {
                    public void run() {
                        try {
                            r.run();
                        } finally {
                            scheduleNext();
                        }
                    }
                });
                if (mActive == null) {
                    scheduleNext();
                }
            }
    
            protected synchronized void scheduleNext() {
                if ((mActive = mTasks.poll()) != null) {
                    //THREAD_POOL_EXECUTOR才是最终执行任务的线程池
                    THREAD_POOL_EXECUTOR.execute(mActive);
                }
            }
        }
    

    ok,我们先看下它是怎样安排任务的。1.将任务runnable稍作包装提交到mTasks中并插入到队列尾部。2.判断mActive是否为null,如果不为null则执行scheduleNext()。

    第一次运行AsyncTask时肯定为null,所以会立即执行scheduleNext(),可以看到mTasks.poll()会从队列首部取出任务赋值给mActive,并且通过真正的线程池THREAD_POOL_EXECUTOR去执行。

    当任务完成后会在finally中调用scheduleNext()方法去查看mTasks队列中是否有剩余任务,如果有则安排,没有则mActive置为null。保证下次创建新的AsyncTask时,mActive为正确值(因为sDefaultExecutor为static变量,全局只有一个sDefaultExecutor对象),这也表示如果短时间内创建多个AsyncTask,并且每个AsyncTask执行时间还比较长,那后执行AsyncTask.execute()方法的任务将需要等待前面的任务执行完成才能开始。也就是说AsyncTask其实是串行执行的。
    若想并行执行也是有方法的,直接调用executeOnExecutor(THREAD_POOL_EXECUTOR, params);跳过sDefaultExecutor的代理。

    THREAD_POOL_EXECUTOR

    终于到了研究线程池的这一步了

    /**
         * An {@link Executor} that can be used to execute tasks in parallel.
         */
        public static final Executor THREAD_POOL_EXECUTOR;
    
        static {
            ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                    CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(), sThreadFactory);
            threadPoolExecutor.setRejectedExecutionHandler(sRunOnSerialPolicy);
            THREAD_POOL_EXECUTOR = threadPoolExecutor;
        }
    

    核心线程数:CORE_POOL_SIZE = 1,最大线程数:MAXIMUM_POOL_SIZE = 20,空闲线程持续时间KEEP_ALIVE_SECONDS = 3(秒),SynchronousQueue这是等待队列,线程工厂: sThreadFactory,最后设置了拒绝策略sRunOnSerialPolicy。
    SynchronousQueue没有容量,是无缓冲等待队列,是一个不存储元素的阻塞队列。也就是说最多同时执行20个线程任务。再有新的任务则会执行拒绝策略sRunOnSerialPolicy,我们看下sRunOnSerialPolicy的源码:

        private static final int BACKUP_POOL_SIZE = 5;
        private static final RejectedExecutionHandler sRunOnSerialPolicy =
                new RejectedExecutionHandler() {
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                android.util.Log.w(LOG_TAG, "Exceeded ThreadPoolExecutor pool size");
                // As a last ditch fallback, run it on an executor with an unbounded queue.
                // Create this executor lazily, hopefully almost never.
                synchronized (this) {
                    if (sBackupExecutor == null) {
                        sBackupExecutorQueue = new LinkedBlockingQueue<Runnable>();
                        sBackupExecutor = new ThreadPoolExecutor(
                                BACKUP_POOL_SIZE, BACKUP_POOL_SIZE, KEEP_ALIVE_SECONDS,
                                TimeUnit.SECONDS, sBackupExecutorQueue, sThreadFactory);
                        sBackupExecutor.allowCoreThreadTimeOut(true);
                    }
                }
                sBackupExecutor.execute(r);
            }
        };
    

    很简单,又创建了个备用线程池,只有BACKUP_POOL_SIZE个核心线程(5个),然后核心线程也会超时,毕竟备用线程池,逻辑上来讲需要也不需要长时间保留。sBackupExecutorQueue的容量是Integer.Max,所以说若短时间执行大量任务,则会将后续的任务都添加在sBackupExecutorQueue中,然后等待备用线程的调用,这时候,主线程池的即使结束了任务,也不会去执行新的任务,sBackupExecutorQueue中的任务将由备用线程池的5个线程去执行。这点上没有注意到可能会影响AsyncTask的使用效率:

    如下图每隔3秒,子线程的任务完成后,只会有5个新的任务执行(log中的starttask) image.png

    总结:

    整体来说AsyncTask思路很简单,就是创建线程池,在子线程执行任务前调用onPreExecute(),在子线程中调用doInBackground(),然后执行过程中调用publishProgress()通过handler发送进度到主线程执行onProgressUpdate(),最后执行完成后再通过handler发送Result到主线程执行onPostExecute()或者onCancelled()
    需要多注意的就是对于线程池要有一定的了解,否则读起源码来将有些困难。

    相关文章

      网友评论

          本文标题:android AsyncTask 源码分析

          本文链接:https://www.haomeiwen.com/subject/zjmxsltx.html