美文网首页
多线程系列(六)AsyncTask源码分析

多线程系列(六)AsyncTask源码分析

作者: zskingking | 来源:发表于2018-12-14 17:17 被阅读124次

前言

AsyncTask是Android为开发者提供的一个轻量级执行异步任务的框架,如今异步框架被RxJava一统天下,AsyncTask面临着淘汰的窘境,那我为什么还要分析AsyncTask的源码呢?虽然AsyncTask已经很少有人用了,但并不代表它不是一个优秀的框架,其内部实现路程和设计思想还是非常值得我们去借鉴的,每一次源码的阅读何尝不是与大神们进行的一次心灵交互呢!!

1 概述

在早期Android开发者进行线程间通讯基本都会选择Thread+Handler,Thread执行完任务通过Handler把消息发送到指定的线程,从而实现线程间通讯,但该方式太过于繁琐,又要写线程又要写Handler,Google也想到了这一点,所以就推出了AsyncTask这个异步任务框架,使用起来非常简便。

2 AsyncTask基本使用

首先创建一个类继承AsyncTask

**
 * 三个泛型:
 *  1.为execute()传入的类型,与doInBackground对应
 *  2.进度更新的类型,与onProgressUpdate对应
 *  3.任务执行结果类型,与onPostExecute对应
 */
public class MyAsyncTask extends AsyncTask<String,Integer,String> {

    private Dialog mDialog;

    public MyAsyncTask(Dialog dialog){
        this.mDialog = dialog;
    }
    //任务执行前做一些附加操作,比如打开一个对话框
    @Override
    protected void onPreExecute() {
        super.onPreExecute();
        mDialog.show();
    }

    //在工作线程中执行,耗时任务写在该方法中
    //返回值为任务执行结果
    @Override
    protected String doInBackground(String... params) {
        Log.i("MyAsyncTask","start execution");
        //模拟耗时操作
        for(int i =0;i<100;i++){
            try {
                Thread.sleep(50);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //更新进度
            publishProgress(i);
        }
        return "success";
    }

     //可以在doInBackground()中调用publishProgress()来触发
    @Override
    protected void onProgressUpdate(Integer... values) {
        super.onProgressUpdate(values);
        Log.i("MyAsyncTask","progress "+values[0]);
    }

    //任务执行结果,参数为doInBackground()返回值
    @Override
    protected void onPostExecute(String s) {
        super.onPostExecute(s);
        Log.i("MyAsyncTask","result "+s);
        mDialog.dismiss();
    }

    //取消任务
    @Override
    protected void onCancelled() {
        super.onCancelled();
        mDialog.dismiss();
    }
}

没有方法的作用注释写的很清楚
使用方法

public class MainActivity extends AppCompatActivity {

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        AlertDialog.Builder builder = new AlertDialog.Builder(this);
        builder.setMessage("正在加载");
        Dialog dialog = builder.create();
        //如果进行的是一个网络请求,execute(Object obj)传入的应该是一个URL
        new MyAsyncTask(dialog).execute("");
    }
}

使用非常的简便,并且还可以在任务执行中更新进度条。

3 AsyncTask源码分析

AsyncTask在Android不同版本中存在略微差异,但实现原理都是一样的,本篇文章基于Android API26进行分析。

AsyncTask是一个抽象类,调用者使用时需要实现其doInBackground(Params... params)方法

public abstract class AsyncTask<Params, Progress, Result> 
     ...
     ...
    @WorkerThread
    protected abstract Result doInBackground(Params... params);
}

我们首先从excute()方法开始分析:

  //核心线程数
    private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
    //最大线程数
    private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
    //闲置时间
    private static final int KEEP_ALIVE_SECONDS = 30;
    //线程工厂类
    private static final ThreadFactory sThreadFactory = new ThreadFactory() {
        private final AtomicInteger mCount = new AtomicInteger(1);
        public Thread newThread(Runnable r) {
            return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
        }
    };

    //任务队列,队列长度为128
    private static final BlockingQueue<Runnable> sPoolWorkQueue =
            new LinkedBlockingQueue<Runnable>(128);
    
    public static final Executor THREAD_POOL_EXECUTOR;
    //自定义静态线程池,之所以定义为静态,是可以让多个AsyncTask对象共用同一个线程池
    static {

        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
                sPoolWorkQueue, sThreadFactory);
        threadPoolExecutor.allowCoreThreadTimeOut(true);
        THREAD_POOL_EXECUTOR = threadPoolExecutor;
    }

    public static final Executor SERIAL_EXECUTOR = new SerialExecutor();


    //该池只负责接收任务,任务最终还是由THREAD_POOL_EXECUTOR进行执行
    private static class SerialExecutor implements Executor {
        //接收任务的队列
        final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
        //当前正在执行的Runnable对象
        Runnable mActive;
        //将任务进行同步,使线程池同一时间只能加入一个任务
        public synchronized void execute(final Runnable r) {
            //多创建一个Runnable对象是为了对r进行一个封装,
            //避免调用者自己去调用scheduleNext()方法。
            mTasks.offer(new Runnable() {
                public void run() {
                    try {
                        r.run();
                    } finally {
                        scheduleNext();
                    }
                }
            });
            //队列为空
            if (mActive == null) {
                scheduleNext();
            }
        }

        protected synchronized void scheduleNext() {
            //从队列mTasks中取任务,如果有任务就交由线程池处理
            if ((mActive = mTasks.poll()) != null) {
                THREAD_POOL_EXECUTOR.execute(mActive);
            }
        }
    }

    //默认线程池
    private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;
    @MainThread
    public final AsyncTask<Params, Progress, Result> execute(Params... params) {
            return executeOnExecutor(sDefaultExecutor, params);
     }

    @MainThread
     public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
                                                                       Params... params) {
        if (mStatus != AsyncTask.Status.PENDING) {
            switch (mStatus) {
                //正在运行
                case RUNNING:
                    throw new IllegalStateException("Cannot execute task:"
                            + " the task is already running.");
                //已经结束
                case FINISHED:
                    throw new IllegalStateException("Cannot execute task:"
                            + " the task has already been executed "
                            + "(a task can be executed only once)");
            }
        }
        //置为运行状态
        mStatus = AsyncTask.Status.RUNNING;
        //做一些预备操作,比如开启一个对话框
        onPreExecute();
        mWorker.mParams = params;
        exec.execute(mFuture);
        return this;
    }

通过调用AsyncTask的excute()方法最终会由线程池去执行任务,AsyncTask内维护了两个线程池,分别用来管理任务和执行任务,两个线程池均为静态,这也可以让所有的AsyncTask对象公用一个线程池,避免了重复创建线程池产生的效率问题。还有一点需要注意,正在运行的AsyncTask不能再次调用execute(),否则会弹出IllegalStateException()异常,同时一个AsyncTask对象只能执行一次excute(),否则也会弹出IllegalStateException()异常。另外AsyncTask是串行的,结合代码可以看到当一个任务执行完毕后才会调用scheduleNext()从队列取出下一个任务然后执行。

当一些初始化工作完成后会通过线程池执行mFuture这个任务,mFuture是在AsyncTask构造方法中创建的,我们来看一下AsyncTask构造方法。

构造方法:


  public AsyncTask() {
        this((Looper) null);
    }

 public AsyncTask(@Nullable Handler handler) {
        this(handler != null ? handler.getLooper() : null);
    }
 public AsyncTask(@Nullable Looper callbackLooper) {
        //获取到一个Handler,默认为主线程Handler
        //该Handler决定接收执行结果的线程
        mHandler = callbackLooper == null || callbackLooper == Looper.getMainLooper()
                ? getMainHandler()
                : new Handler(callbackLooper);

        //创建一个WorkerRunnable
        mWorker = new WorkerRunnable<Params, Result>() {
            public Result call() throws Exception {
                mTaskInvoked.set(true);
                Result result = null;
                try {
                    Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
                    //noinspection unchecked
                    //进行异步操作
                    // // 即 我们使用过程中复写的方法
                    result = doInBackground(mParams);
                    Binder.flushPendingCommands();
                } catch (Throwable tr) {
                    mCancelled.set(true);
                    throw tr;
                } finally {
                    //异步操作结束后执行该方法
                    postResult(result);
                }
                return result;
            }
        };

        mFuture = new FutureTask<Result>(mWorker) {
            //任务取消和结束都会调用该方法
            //不同的是任务结束FutureTask内部会自行调用done()方法
            //任务取消需要手动的调用FutureTask的cancel方法然后cancel方法再对done调用
            //不同的调用方式传入不同的参数done方法也会进行不通的操作
            @Override
            protected void done() {
                try {
                    //任务结束
                    postResultIfNotInvoked(get());
                } catch (InterruptedException e) {
                    //任务出现中断异常
                    android.util.Log.w(LOG_TAG, e);
                } catch (ExecutionException e) {
                    //任务异常
                    throw new RuntimeException("An error occurred while executing doInBackground()",
                            e.getCause());
                } catch (CancellationException e) {
                    //任务取消
                    postResultIfNotInvoked(null);
                }
            }
        };

在AsyncTask中线程任务是通过Callbale进行封装的,不了解Callbale的同学可以参考我这篇文章
初始化最终是通过调用拥有Looper参数的构造方法,内部实例化了三个成员变量:mHandler 、mWorker 、mFuture

  • mHandler :该Handler决定接收执行结果的线程,默认获取主线程Handler
  • mWorker :进行耗时任务的封装
  • mFuture :用于监听任务结束、异常、取消

在Worker里面我们的耗时任务doInBackground(mParams)执行完毕后会通过postResult(result)对执行结果进行发送,我们来看一下postResult(result)源码:

    private static InternalHandler sHandler;
    private static Handler getMainHandler() {
        synchronized (AsyncTask.class) {
            if (sHandler == null) {
                sHandler = new InternalHandler(Looper.getMainLooper());
            }
            return sHandler;
        }
    }
    private Result postResult(Result result) {
        @SuppressWarnings("unchecked")
        Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,
                new AsyncTaskResult<Result>(this, result));
        message.sendToTarget();
        return result;
    }
    //通常在doInBackground()中调用
    @WorkerThread
    protected final void publishProgress(Progress... values) {
        //判断是否已经取消
        if (!isCancelled()) {
            getHandler().obtainMessage(MESSAGE_POST_PROGRESS,
                    new AsyncTaskResult<Progress>(this, values)).sendToTarget();
        }
    }

    //主线程Handler
    private static class InternalHandler extends Handler {
        public InternalHandler(Looper looper) {
            super(looper);
        }

        @SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})
        @Override
        public void handleMessage(Message msg) {
            AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
            switch (msg.what) {
                //通过postResult()进行发送
                case MESSAGE_POST_RESULT:
                    // There is only one result
                    result.mTask.finish(result.mData[0]);
                    break;
                //通过publishProgress()进行发送
                case MESSAGE_POST_PROGRESS:
                    //更新进度
                    result.mTask.onProgressUpdate(result.mData);
                    break;
            }
        }
    }

  private void finish(Result result) {
        //进行了取消操作
        if (isCancelled()) {
            onCancelled(result);
        } 
        //未进行取消操作
        else {
            onPostExecute(result);
        }
        //置为结束状态
        mStatus = MyAsyncTask.Status.FINISHED;
    }

调用了 postResult(Result result)方法后会将result以Message的形式发送到主线程,主线程Handler接收到消息后会调用AsyncTask的finish(result)方法,在finish(result)中会进行一个判断,如果进行了取消操作会调用onCancelled(result)否则会调用 onPostExecute(result),最后将mStatus 置为结束状态。

分析完了AsyncTask的源码我们来总结一下其优点:

  • 内部线程池为静态类型,所有任务共用一个线程池,避免了重复创建线程池
  • 通过Callable+FutureTask执行线程任务,可监听线程的结束、异常、取消
  • 通过publishProgress()对任务进度进行实时监听

总结

AsyncTask的定位是一款轻量级的Android异步框架,内部通过Executor+Handler进行封装,可以轻松实现线程间通信,整个源代码也就700行左右,所以我建议大家能够一行一行的看一遍。

乐观的人在每个危机里看到机会,悲观的人在每个机会里看见危机 - 丘吉尔

这篇文章是我多线程系列最后一篇文章,同时也是我两个月里面的第八篇文章,此期间不管工作多忙我都坚持每周发一篇文章,其实我一直都不认为自己是一个勤快的人,但回顾一下却发现过去的一年确实养成了很多好习惯,想想也还是挺欣慰的。本篇文章到此为止,下篇文章Android网络编程(一)传输层协议UDP、TCP

相关文章

网友评论

      本文标题:多线程系列(六)AsyncTask源码分析

      本文链接:https://www.haomeiwen.com/subject/ogqghqtx.html