Android 的系统异步工具类AsyncTask源码解析

作者: walker_lee0707 | 来源:发表于2017-04-24 10:28 被阅读0次

    由于从wordpress将文章倒回,发现格式有点混乱,经努力调整后依然有部分的格式还未调教好,请多多包涵.

    分析AsyncTask或者阐述AsyncTask的博文有很多,本着授人予鱼不如授人予渔的想法我想通过自主学习的方式去探索这个api,如果有阐述不当的地方欢迎各位大神不吝斧正.本文将通过源码学习分析以及demo实例验证的方式彻底的了解AsyncTask运行的原理.

    随着开源社区的兴旺,在目前android的开发中有着各种异步的工具以及框架,现在比较热门的当属Rxjava+RxAndroid这个React的开源库,至于这个从Android1.5版本就开始提供的异步工具AsyncTask反而经常出现在Android各类异步实现文章中的反面例子.我写这篇文章有以下的意图:一是通过源码和实例去探讨为什么AsyncTask一直被吐槽.二是了解学习AsyncTask的实现方式(只有了解原理,在使

    用中遇到各式问题才有对策).

    关于AsyncTask的介绍以及使用例子可以移步官方开发者网站,这里就不多费口舌.AsyncTask的存在的原因是在Android应用运行过程中,耗时的任务不能放在UI线程也就是我们常说的主线程中执行,必须开启线程在后台执行,如果有执行结果要通知到页面上的话需要通过handler进行子线程和UI线程的通信,而AsyncTask就是应运而生去简化这一过程的api.

    现在进入实现原理的正题,通过学习源码的过程可以设想如果是要求自己实现一个这样的异步工具应当怎么做,任何创造的前提是从模仿开始.笔者这边使用的sdk version 23的源码.首先是类的定义:

    public abstract class AsyncTask {

        private static final String LOG_TAG = "AsyncTask";

        private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();

        private static final int CORE_POOL_SIZE = CPU_COUNT + 1;

        private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;

        private static final int KEEP_ALIVE = 1;

        private static final ThreadFactory sThreadFactory = new ThreadFactory() {

        private final AtomicInteger mCount = new AtomicInteger(1);

        public Thread newThread(Runnable r) {

             return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());

        }

        };

        private static final BlockingQueue sPoolWorkQueue =new LinkedBlockingQueue(128);

    /**

    * An {@link Executor} that can be used to execute tasks in parallel.

    */

    public static final Executor THREAD_POOL_EXECUTOR

    = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE,

    TimeUnit.SECONDS, sPoolWorkQueue, sThreadFactory);

    AsyncTask是抽象类,根据使用的需要定义了三个泛型分别是传入的参数,执行的Progress,以及执行的结果.通过这一小段的源码可以看到AsyncTask拥有一个静态final的THREAD_POOL_EXECUTOR,这个线程池能够运行的线程最大数是通过 cpu核心数*2+1 计算得出.并且在一个进程中无论开发者写出多少个AsyncTask的实现子类最终管理线程的只是这个大小为128的线程池.如果连续添加超过128个任务,线程池就爆了(这也是被其他开发者诟病的地方,不过这样的开发写法是否符合规范也待商榷).现在接着往下看:

    /**

    * An {@link Executor} that executes tasks one at a time in serial

    * order.  This serialization is global to a particular process.

    */

    public static final Executor SERIAL_EXECUTOR = new SerialExecutor();

    private static final int MESSAGE_POST_RESULT = 0x1;

    private static final int MESSAGE_POST_PROGRESS = 0x2;

    private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;

    private static InternalHandler sHandler;

    private final WorkerRunnable mWorker;

    private final FutureTask mFuture;

    private volatile Status mStatus = Status.PENDING;

    private final AtomicBoolean mCancelled = new AtomicBoolean();

    private final AtomicBoolean mTaskInvoked = new AtomicBoolean();

    private static class SerialExecutor implements Executor {

    final ArrayDeque mTasks = new ArrayDeque();

    Runnable mActive;

    public synchronized void execute(final Runnable r) {

    mTasks.offer(new Runnable() {

    public void run() {

    try {

    r.run();

    } finally {

    scheduleNext();

    }}});

    if (mActive == null) {

    scheduleNext();

    }}

    protected synchronized void scheduleNext() {

        if ((mActive = mTasks.poll()) != null) {

         THREAD_POOL_EXECUTOR.execute(mActive);

    }}}

    刚才执行的线程池构建好了,现在轮到Exector SERIAL_EXECUTOR,看这个对象的命名为serial,感觉像是单个按顺序的执行器(非并发),我们接着往下看:

    private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;

    默认的executor为SERIAL_EXECUTOR 并带上了volatile(这里带上volatile类型修饰符的原因是在多线程编程中开发者可以通过调用AsyncTask.setDefaultExecutor(Executor executor)使用自定义的Executor替换SERIAL_EXECUTOR.此处先不急着看SERIAL_EXECUTOR.接着是

    private static InternalHandler sHandler;

    又是一个静态的对象InternalHandler,贴出类声明:

    private static class InternalHandler extends Handler {

    public InternalHandler() {

    super(Looper.getMainLooper());

    }

    @SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})

    @Override

    public void handleMessage(Message msg) {

    AsyncTaskResult result = (AsyncTaskResult) msg.obj;

    switch (msg.what) {

    case MESSAGE_POST_RESULT:

    // There is only one result

    result.mTask.finish(result.mData[0]);

    break;

    case MESSAGE_POST_PROGRESS:

    result.mTask.onProgressUpdate(result.mData);

    break;

    }

    }

    }

    根据对象名称有internal说明是一个内部的事件handler,构造函数带有super(Looper.getMainLooper())说明是这个Handler的handleMessage(Message msg)的方法是在主线程中调用,分别有两个事件是处理result信息和progress信息这里也就满足了之前类设计需求需要在主线程中反馈执行结果.剩下的

    private final WorkerRunnable mWorker;

    private final FutureTask mFuture;

    private volatile Status mStatus = Status.PENDING;

    private final AtomicBoolean mCancelled = new AtomicBoolean();

    private final AtomicBoolean mTaskInvoked = new AtomicBoolean();

    将放在通过调用类的构造函数以及执行的流程中讲诉.以上我们将AsyncTask的属性分析完毕,接下来通过实例调用过程来进行源码旅程.Demo的例子可以查看Github.

    我们创建一个AsyncTask的子类TimeConsumingTask,模拟后台耗时的操作以及反馈结果给UI线程.

    public class TimeConsumingTask extends AsyncTask {

    private static String TAG = "TimeConsumingTask";

    private WeakReference mHandler;

    public static final String TAG_RESULT = "result";

    private static volatile int sExecutionCount ;

    public TimeConsumingTask(Handler handler) {

    mHandler = new WeakReference(handler);

    }

    @Override

    protected void onPreExecute() {

    Log.d(TAG, "onPreExecute");

    }

    @Override

    protected Boolean doInBackground(Integer... params) {

    Log.d(TAG,"execute count is :"+ ++sExecutionCount);

    for (Integer num : params) {

    try {

    Thread.sleep(num * 1000);

    //will call onProgressUpdate()

    publishProgress(num);

    } catch (InterruptedException e) {

    e.printStackTrace();

    return false;

    }}

    return true;

    }

    @Override

    protected void onProgressUpdate(Integer... values) {

    for (Integer value : values) {

    Log.d(TAG, "onProgressUpdate result: " + value);

    if (null != mHandler.get()) {

    Message message = new Message();

    message.what = value;

    mHandler.get().sendMessage(message);

    }

    }

    }

    @Override

    protected void onPostExecute(Boolean aBoolean) {

    Log.d(TAG, "onPostExecute result: " + aBoolean);

    if (null != mHandler.get()) {

    Message message = new Message();

    message.getData().putBoolean(TAG_RESULT, aBoolean);

    mHandler.get().sendMessage(message);

    }}}

    这里构造方法我是通过传入一个弱引用的主线程的handler进行运行结果的反馈,虽然只是demo还是希望写的严谨些.子类会率先调用父类的构造方法,此时我们来关注AsyncTask的构造方法,里面包含了刚才未说明的两个属性WorkerRunnable和FutureTask,FutureTask是java1.5引入的api,源码说明是A cancellable asynchronous computation.具体的可以建议读者去好好学习下,当下android很多流行的开源库中实现都离不开FutureTask,这里就不做介绍.我们来看AsyncTask的构造函数:

    /**

    * Creates a new asynchronous task. This constructor must be invoked on the UI thread.

    */

    public AsyncTask() {

    mWorker = new WorkerRunnable() {

    public Result call() throws Exception {

    mTaskInvoked.set(true);

    Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);

    //noinspection unchecked

    Result result = doInBackground(mParams);

    Binder.flushPendingCommands();

    return postResult(result);

    }

    };

    mFuture = new FutureTask(mWorker) {

    @Override

    protected void done() {

    try {

    postResultIfNotInvoked(get());

    } catch (InterruptedException e) {

    android.util.Log.w(LOG_TAG, e);

    } catch (ExecutionException e) {

    throw new RuntimeException("An error occurred while executing doInBackground()",

    e.getCause());

    } catch (CancellationException e) {

    postResultIfNotInvoked(null);

    }}};

    }

    private static abstract class WorkerRunnable implements Callable {

    Params[] mParams;

    }

    private void postResultIfNotInvoked(Result result) {

    final boolean wasTaskInvoked = mTaskInvoked.get();

    if (!wasTaskInvoked) {

    postResult(result);

    }}

    private Result postResult(Result result) {

    @SuppressWarnings("unchecked")

    Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,

    new AsyncTaskResult(this, result));

    message.sendToTarget();

    return result;

    }

    我们可以看到WorkerRunnable是实现Callable在call方法中修改了运行标识|设置线程优先级|以及真正的运行doInBackground(Param)的方法,以及在运行结束后调用postResult(result).其中 Binder.flushPendingCommands();这句的调用可能很多朋友不甚熟悉,可以点击方法跳转至源码查看说明,这里留个悬念.在FutureTask的主要是重写了done()的回调进行如果是未调用后台方法就结束了异步任务的判断,并抛出异常,此处可以发现好的api是在方法命名中就清楚的告诉了阅读者意图.此处的getHandler()就是通过lazyInit的方式获取刚才说到的变量InternalHandler进行通信.

    我们看完了构造方法,现在来看AsyncTask的调用方法,AsyncTask的api设计十分简单,可调用方法execute(Params...)/cancel(boolean mayInterruptIfRunning)/...,接下来看下一般继承AsyncTask一般要重写的四个方法:onPreExecute(),doInBackground(Param...), onProgressUpdate(Progress... values) ,onPostExecute(Result result),接下来会讲解这四个方法在源码中被调用的时机.我们先看下核心方法execute(Params...).进入源码:

    @MainThread

    public final AsyncTask execute(Params... params) {

    return executeOnExecutor(sDefaultExecutor, params);

    }

    @MainThread

    public final AsyncTask executeOnExecutor(Executor exec,

    Params... params) {

    if (mStatus != Status.PENDING) {

    switch (mStatus) {

    case RUNNING:

    throw new IllegalStateException("Cannot execute task:"

    + " the task is already running.");

    case FINISHED:

    throw new IllegalStateException("Cannot execute task:"

    + " the task has already been executed "

    + "(a task can be executed only once)");

    }}

    mStatus = Status.RUNNING;

    onPreExecute();

    mWorker.mParams = params;

    exec.execute(mFuture);

    return this;

    }

    在方法头加入了google annotation@MainThread,如果还不了解google annotation的朋友也可以上官方开发者网站上搜索进行了解.说明execute方法需要运行在主线程中.execute的方法调用了executeOnExecutor并传入刚才看到的sDefaultExecutor也就是SERIAL_EXECUTOR以及AsyncTask中三个泛型中的第一个Param作为参数.这里在executeOnExecutor运行中对Status进行了检查,如果非PENDING态则抛出异常.Status只有PENDING/RUNNING/FINISHED,三种状态,初始化的时候是PENDING态,这也是我们不能对同一个task反复调用execute的原因.更改状态之后就调用了 onPreExecute();所以开发者继承AsyncTask重写的四大方法中第一个onPreExecute()是运行在主线程中.

    exec.execute(mFuture);

    AsyncTask的sDefaultExecutor开始执行我们在构造函数中初始化的futureTask了.此处让我们将目光转向sDefaultExecutor的默认赋值SERIAL_EXECUTOR:

    private static class SerialExecutor implements Executor {

    final ArrayDeque mTasks = new ArrayDeque();

    Runnable mActive;

    public synchronized void execute(final Runnable r) {

    mTasks.offer(new Runnable() {

    public void run() {

    try {

    r.run();

    } finally {

    scheduleNext();

    }}});

    if (mActive == null) {

    scheduleNext();

    }

    }

    protected synchronized void scheduleNext() {

    if ((mActive = mTasks.poll()) != null) {

    THREAD_POOL_EXECUTOR.execute(mActive);}}}

    好的代码总是简洁明快,通过ArrayDeque 存储传入的任务,并在scheduleNext通过THREAD_POOL_EXECUTOR.execute(task)的方式进行任务调用,这里就不多做阐述了,这个Executor做了和它类名一样的实行就是序列化执行列表中的任务.SERIAL_EXECUTOR是在3.0版本后加入的,说明3.0版本后AsyncTask默认不是并行化执行任务而是顺序执行.也就是说在3.0之前的AsyncTask可以同时有5个任务在执行,而3.0之后的AsyncTask同时只能有1个任务在执行.

    之前在文章中已经讲述在WorkerRunnable的call()方法中调用了耗时操作方法doInBackground(Params...),那在后台执行的过程中更新UI的方法onProgressUpdate(Progress... values)又是怎么被调用的,根据官方文档在doInBackground方法中需要post结果到主线程的时候会调用publishProgress(Progress...)方法.而这个方法和主线程有关系我相信读者已经有一个概念该方法中是通过往IntervalHandler发出消息来实现:

    @WorkerThread

    protected final void publishProgress(Progress... values) {

    if (!isCancelled()) {

    getHandler().obtainMessage(MESSAGE_POST_PROGRESS,

    new AsyncTaskResult(this, values)).sendToTarget();

    }

    }

    这边提一下方法头的googleAnnotation @WorkerThread说明该方法只能在工作线程中被调用.

    Demo地址:在使用Demo时候可以通过DDMS的Thread监控功能进行AsyncTask的线程池监控验证.

    这里只是抛砖引玉写一篇AsyncTask的源码分析,希望各位大神能热心参与android异步编程的讨论.

    Android异步一些轻量实现的讨论文章:https://medium.com/@ali.muzaffar/handlerthreads-and-why-you-should-be-using-them-in-your-android-apps-dc8bf1540341#.6de7zdbii

    相关文章

      网友评论

        本文标题:Android 的系统异步工具类AsyncTask源码解析

        本文链接:https://www.haomeiwen.com/subject/uvvxittx.html