AsyncTask 源码浅析

作者: 云天明送恒心 | 来源:发表于2017-04-20 10:35 被阅读79次

    AsyncTask 源码浅析

    作用

    是围绕HandlerThread开发的帮助类,方便开发者在子线程执行几秒钟以内的耗时任务,在主线程返回任务执行的结果。简而言之,是对异步操作的封装

    四个重要回调方法

    • onPreExecute()
    • doInBackground()
    • onProgressUpdate()
    • onPostExecute()

    只有doInBackground()在线程池执行,其余方法在主线程执行

    执行过程

    1. 编译期创建线程池

    private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
    private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
    private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
    private static final int KEEP_ALIVE_SECONDS = 30;
    private static final ThreadFactory sThreadFactory = new ThreadFactory() {
        private final AtomicInteger mCount = new AtomicInteger(1);
        public Thread newThread(Runnable r) {
            return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
        }
    };
    
    private static final BlockingQueue<Runnable> 
    sPoolWorkQueue =new LinkedBlockingQueue<Runnable>(128);
    public static final Executor THREAD_POOL_EXECUTOR;
    
    static {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                    CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, 
                    KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
                    sPoolWorkQueue, sThreadFactory);
        //超过闲置等待时间后允许回收所有线程,包括核心线程
        threadPoolExecutor.allowCoreThreadTimeOut(true);
        THREAD_POOL_EXECUTOR = threadPoolExecutor;
    }
    //默认的Executor
    public static final Executor SERIAL_EXECUTOR = new SerialExecutor();
    

    THREAD_POOL_EXECUTOR即AsyncTask的线程池之一,根据Math.max(2, Math.min(CPU_COUNT - 1, 4))设置线程池的核心线程数最小为2,最大为4,最大线程数CPU_COUNT * 2 + 1,线程闲置等待时间为30秒,任务队列LinkedBlockingQueue最大长度为128,线程工厂设置线程的name为AsyncTask # i++

    THREAD_POOL_EXECUTORSERIAL_EXECUTOR会被所有AsyncTask实例共用

    2. 构造方法

    只做了两件事,初始化mWorker(也就是Callable)和mFuture(也就是FutureTask) 属于Java并发包下的两个类,下面对他们进行简单的介绍

    • Callable 简单来说就是可以抛出异常的,有返回值的Runnable
    • FutureTask 扩展了RunnableFuture,而RunnableFuture又扩展了RunnableFuture。将会执行构造方法传入的Callable,并取得返回值

    虽然这两个类里的方法还没有被调用
    但是我们却可以很清晰的看见整个AsyncTask的工作流程了

        public AsyncTask() {
            //实现了Callable
            mWorker = new WorkerRunnable<Params, Result>() {
                public Result call() throws Exception {
                    //标识有任务被调用了
                    mTaskInvoked.set(true);
                    Result result = null;
                    try {
                        //标准的后台线程优先级,优先级不是太高 
                        Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
                        //调用我们的四个重要抽象方法之二
                        result = doInBackground(mParams);
                        Binder.flushPendingCommands();
                    } catch (Throwable tr) {
                        mCancelled.set(true);
                        throw tr;
                    } finally {
                        //Handler调用,暂且按下不表
                        postResult(result);
                    }
                    return result;
                }
            };
    
            mFuture = new FutureTask<Result>(mWorker) {
                @Override
                protected void done() {
                    try {
                        //处理未能顺利完成调用的情况
                        postResultIfNotInvoked(get());
                    } catch (InterruptedException e) {
                        android.util.Log.w(LOG_TAG, e);
                    } catch (ExecutionException e) {
                        throw new RuntimeException("An error occurred while executing doInBackground()",
                                e.getCause());
                    } catch (CancellationException e) {
                        postResultIfNotInvoked(null);
                    }
                }
            };
        }
    

    3. executeOnExecutor

    所有的执行命令最终都会调用这个方法
    所以是我们开始使用AsyncTask的第一站
    这个方法只能在主线程调用

        @MainThread
        public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
                Params... params) {
            if (mStatus != Status.PENDING) {
                switch (mStatus) {
                    case RUNNING:
                        throw new IllegalStateException("Cannot execute task:"
                                + " the task is already running.");
                    case FINISHED:
                        throw new IllegalStateException("Cannot execute task:"
                                + " the task has already been executed "
                                + "(a task can be executed only once)");
                }
            }
            //设置状态为RUNNING
            mStatus = Status.RUNNING;
            //调用我们四个重要抽象方法之一
            onPreExecute();
            //把Params传给我们实现Caller的Worker
            mWorker.mParams = params;
            //调用Executor执行,同时把我们的FutureTask传进去
            //默认的Executor是在编译器就初始化好的SERIAL_EXECUTOR,串行执行
            exec.execute(mFuture);
            return this;
        }
    

    接下来会调用Executor.execute()
    下面看一下我们默认的Executor,也就是SerialExecutor

        private static class SerialExecutor implements Executor {
            //线性双端队列
            final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
            Runnable mActive;
    
            public synchronized void execute(final Runnable r) {
                //提交一个Runnable到双端队列
                mTasks.offer(new Runnable() {
                    public void run() {
                        try {
                            //运行我们的mFuture,忘记了这是啥?回去看第二部分
                            r.run();
                        } finally {
                            //调度下一个
                            scheduleNext();
                        }
                    }
                });
                //如果没有正在活动的任务,就调度下一个
                if (mActive == null) {
                    scheduleNext();
                }
            }
            
            protected synchronized void scheduleNext() {
                if ((mActive = mTasks.poll()) != null) {
                    //终于提交到线程池执行了,THREAD_POOL_EXECUTOR是啥?忘了的话去看第一部分
                    THREAD_POOL_EXECUTOR.execute(mActive);
                }
            }
        }
    

    4. THREAD_POOL_EXECUTOR中的调用

    这一部分其实已经超出了我们分析的范围了,所以 我们不会去关心线程池的实现,而是理一理调用流程就好

    记住了,我们execute到线程池的Runnable,是pollArrayDequeRunnable,这个Runnable中调用了mFuture.run()

    我们关心的是传到线程池的Runnable

        public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            int c = ctl.get();
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            else if (!addWorker(command, false))
                reject(command);
        }
    

    这个方法没啥说的,就是做一下检查,然后调用addWorker(),这个方法里面,会找到一个由ThreadFactory创建的线程,这个第一部分我们有交代,然后在这个线程中调用Runnable.run(),最终调用到mFurture.run( )

        public void run() {
            if (state != NEW ||
                !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
                return;
            try {
                //构造方法传进来的callable,对应于AsyncTask中的mWorker,我们第二部分有讲
                Callable<V> c = callable;
                if (c != null && state == NEW) {
                    V result;
                    boolean ran;
                    try {
                        //调用mWorker.call(),并且取得返回值。我们第二部分有讲
                        result = c.call();
                        ran = true;
                    } catch (Throwable ex) {
                        result = null;
                        ran = false;
                        setException(ex);
                    }
                    if (ran)
                        set(result);
                }
            } finally {
                runner = null;
                int s = state;
                if (s >= INTERRUPTING)
                    handlePossibleCancellationInterrupt(s);
            }
        }
    

    这里面都是在子线程操作了哈,调用了mWorker.call(),回调了四个重要抽象方法的doInBackground()哈,然后这个方法的返回值作为result,这个result交给Handler了咯,完了后调用mFuture.done()

    5. Handler中的调用

    先看看mWorker.call()中的postResult(result);
    这里面都是在子线程执行的,所以用Handler返回主线程

        private Result postResult(Result result) {
            @SuppressWarnings("unchecked")
            //result是我们在doInBackground()返回的值
            Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,
                    new AsyncTaskResult<Result>(this, result));
            message.sendToTarget();
            return result;
        }
    
        private static Handler getHandler() {
            synchronized (AsyncTask.class) {
                if (sHandler == null) {
                    sHandler = new InternalHandler();
                }
                return sHandler;
            }
        }
    
        private static class InternalHandler extends Handler {
            public InternalHandler() {
                super(Looper.getMainLooper());
            }
    
            @SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})
            @Override
            public void handleMessage(Message msg) {
                AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
                switch (msg.what) {
                    case MESSAGE_POST_RESULT:
                        result.mTask.finish(result.mData[0]);
                        break;
                    case MESSAGE_POST_PROGRESS:
                        result.mTask.onProgressUpdate(result.mData);
                        break;
                }
            }
        }
    

    case MESSAGE_POST_RESULT: 调用了finish()

        private void finish(Result result) {
            if (isCancelled()) {
                onCancelled(result);
            } else {
                onPostExecute(result);
            }
            mStatus = Status.FINISHED;
        }
    

    最后调用onPostExecute(result),整个流程结束

    publishProgress()还是借助的AsyncTaskResult<DATA>,没什么好分析的,但是代码还是贴出来

        protected final void publishProgress(Progress... values) {
            if (!isCancelled()) {
                getHandler().obtainMessage(MESSAGE_POST_PROGRESS,
                        new AsyncTaskResult<Progress>(this, values)).sendToTarget();
            }
        }
    

    总结

    AsyncTask利用Java并发包下的FutureTask来封装了一次请求的信息,SerialExecutor来保证串行执行,利用THREAD_POOL_EXECUTOR线程池在子线程执行任务,用Handler返回主线程

    线程池中子线程执行的是在SerialExecutor中添加到ArrayDequeRunnable,这里面会串行的调用Runnable中的mFuture.run()mFuture.run()会调用mWorker.call()mWorker.call()会回调doInBackground()并且取得返回值

    优点

    • 足够的轻巧,全局维护仅一个线性的队列,一个线程池
    • 相比Handler来说,足够的方便
    • 编写好一个AsyncTask后,有一定的复用性

    缺点

    • 需要继承AsyncTask,如果它作为Activity的内部类,要小心内存泄漏
    • 虽然提供了一个API来取消一次请求,但是不一定能够取消掉
    • 一次事件的起因和结果是耦合在一起的
    • 除了足够轻巧,都不如RxJava,体现在你无法合理的控制流程,怎么合并多个任务?怎么串联多个任务?怎么拆分多个任务?流程中出现错误怎么办?

    终上所述,AsyncTask的使用价值以及不足够明显,我们不需要再使用它了

    相关文章

      网友评论

        本文标题:AsyncTask 源码浅析

        本文链接:https://www.haomeiwen.com/subject/hbgkzttx.html