美文网首页
AsyncTask原理分析

AsyncTask原理分析

作者: 左上偏右 | 来源:发表于2016-12-27 17:02 被阅读58次

    线程池ThreadPoolExecutor
    JDK5带来的一大改进就是Java的并发能力,它提供了三种并发武器:并发框架Executor,并发集合类型如ConcurrentHashMap,并发控制类如CountDownLatch等;圣经《Effective Java》也说,尽量使用Exector而不是直接用Thread类进行并发编程。
    AsyncTask内部也使用了线程池处理并发;线程池通过ThreadPoolExector
    类构造,这个构造函数参数比较多,它允许开发者对线程池进行定制,我们先看看这每个参数是什么意思,然后看看Android是以何种方式定制的。
    ThreadPoolExecutor的其他构造函数最终都会调用如下的构造函数完成对象创建工作:
    1234567

    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

    corePoolSize: 核心线程数目,即使线程池没有任务,核心线程也不会终止(除非设置了allowCoreThreadTimeOut参数)可以理解为“常驻线程”
    maximumPoolSize: 线程池中允许的最大线程数目;一般来说,线程越多,线程调度开销越大;因此一般都有这个限制。
    keepAliveTime: 当线程池中的线程数目比核心线程多的时候,如果超过这个keepAliveTime的时间,多余的线程会被回收;这些与核心线程相对的线程通常被称为缓存线程
    unit: keepAliveTime的时间单位
    workQueue: 任务执行前保存任务的队列;这个队列仅保存由execute提交的Runnable任务
    threadFactory: 用来构造线程池的工厂;一般都是使用默认的;
    handler: 当线程池由于线程数目和队列限制而导致后续任务阻塞的时候,线程池的处理方式。

    那么,当一个新的任务到达的时候,线程池中的线程是如何调度的呢?(别慌,讲这么一大段线程池的知识,是为了理解AsyncTask;Be Patient)
    如果线程池中线程的数目少于corePoolSize,就算线程池中有其他的没事做的核心线程,线程池还是会重新创建一个核心线程;直到核心线程数目到达corePoolSize(常驻线程就位)
    如果线程池中线程的数目大于或者等于corePoolSize,但是工作队列workQueue没有满,那么新的任务会放在队列workQueue中,按照FIFO的原则依次等待执行;(当有核心线程处理完任务空闲出来后,会检查这个工作队列然后取出任务默默执行去)
    如果线程池中线程数目大于等于corePoolSize,并且工作队列workQueue满了,但是总线程数目小于maximumPoolSize,那么直接创建一个线程处理被添加的任务。
    如果工作队列满了,并且线程池中线程的数目到达了最大数目maximumPoolSize,那么就会用最后一个构造参数handler
    处理;**默认的处理方式是直接丢掉任务,然后抛出一个异常。

    总结起来,也即是说,当有新的任务要处理时,先看线程池中的线程数量是否大于 corePoolSize,再看缓冲队列 workQueue 是否满,最后看线程池中的线程数量是否大于 maximumPoolSize。另外,当线程池中的线程数量大于 corePoolSize 时,如果里面有线程的空闲时间超过了 keepAliveTime,就将其移除线程池,这样,可以动态地调整线程池中线程的数量。

    我们以API 22为例,看一看AsyncTask里面的线程池是以什么参数构造的;AsyncTask里面有“两个”线程池;一个THREAD_POOL_EXECUTOR
    一个SERIAL_EXECUTOR
    ;之所以打引号,是因为其实SERIAL_EXECUTOR
    也使用THREAD_POOL_EXECUTOR
    实现的,只不过加了一个队列弄成了串行而已,那么这个THREAD_POOL_EXECUTOR
    是如何构造的呢?
    123456789

    private static final int CORE_POOL_SIZE = CPU_COUNT + 1;private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;private static final int KEEP_ALIVE = 1;private static final BlockingQueue<Runnable> sPoolWorkQueue = new LinkedBlockingQueue<Runnable>(128); public static final Executor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE, TimeUnit.SECONDS, sPoolWorkQueue, sThreadFactory);

    可以看到,AsyncTask里面线程池是一个核心线程数为CPU + 1
    ,最大线程数为CPU * 2 + 1
    ,工作队列长度为128的线程池;并且没有传递handler
    参数,那么使用的就是默认的Handler(拒绝执行).
    那么问题来了:
    如果任务过多,那么超过了工作队列以及线程数目的限制导致这个线程池发生阻塞,那么悲剧发生,默认的处理方式会直接抛出一个异常导致进程挂掉。假设你自己写一个异步图片加载的框架,然后用AsyncTask实现的话,当你快速滑动ListView的时候很容易发生这种异常;这也是为什么各大ImageLoader都是自己写线程池和Handlder的原因。

    这个线程池是一个静态变量;那么在同一个进程之内,所有地方使用到的AsyncTask默认构造函数构造出来的AsyncTask都使用的是同一个线程池,如果App模块比较多并且不加控制的话,很容易满足第一条的崩溃条件;如果你不幸在不同的AsyncTask的doInBackgroud里面访问了共享资源,那么就会发生各种并发编程问题。

    在AsyncTask全部执行完毕之后,进程中还是会常驻corePoolSize个线程;在Android 4.4 (API 19)以下,这个corePoolSize是hardcode的,数值是5;API 19改成了cpu + 1
    ;也就是说,在Android 4.4以前;如果你执行了超过五个AsyncTask;然后啥也不干了,进程中还是会有5个AsyncTask线程;不信,你看:


    Handler
    AsyncTask里面的handler很简单,如下(API 22代码):
    12345

    private static final InternalHandler sHandler = new InternalHandler();public InternalHandler() { super(Looper.getMainLooper());}

    注意,这里直接用的主线程的Looper;如果去看API 22以下的代码,会发现它没有这个构造函数,而是使用默认的;默认情况下,Handler会使用当前线程的Looper,如果你的AsyncTask是在子线程创建的,那么很不幸,你的onPreExecute
    和onPostExecute
    并非在UI线程执行,而是被Handler post到创建它的那个线程执行;如果你在这两个线程更新了UI,那么直接导致崩溃。这也是大家口口相传的AsyncTask必须在主线程创建的原因。
    另外,AsyncTask里面的这个Handler是一个静态变量,也就是说它是在类加载的时候创建的;如果在你的APP进程里面,以前从来没有使用过AsyncTask,然后在子线程使用AsyncTask的相关变量,那么导致静态Handler初始化,如果在API 16以下,那么会出现上面同样的问题;这就是AsyncTask必须在主线程初始化 的原因。
    事实上,在Android 4.1(API 16)以后,在APP主线程ActivityThread的main函数里面,直接调用了AscynTask.init
    函数确保这个类是在主线程初始化的;另外,init这个函数里面获取了InternalHandler
    的Looper,由于是在主线程执行的,因此,AsyncTask的Handler用的也是主线程的Looper。这个问题从而得到彻底的解决。
    AsyncTask是并行执行的吗?
    现在知道AsyncTask内部有一个线程池,那么派发给AsyncTask的任务是并行执行的吗?
    答案是不确定。在Android 1.5刚引入的时候,AsyncTask的execute
    是串行执行的;到了Android 1.6直到Android 2.3.2,又被修改为并行执行了,这个执行任务的线程池就是THREAD_POOL_EXECUTOR
    ,因此在一个进程内,所有的AsyncTask都是并行执行的;但是在Android 3.0以后,如果你使用execute
    函数直接执行AsyncTask,那么这些任务是串行执行的;(你说蛋疼不)源代码如下:
    123

    public final AsyncTask<Params, Progress, Result> execute(Params... params) { return executeOnExecutor(sDefaultExecutor, params);}

    这个sDefaultExecutor
    就是用来执行任务的线程池,那么它的值是什么呢?继续看代码:
    1

    private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;

    因此结论就来了:Android 3.0以上,AsyncTask默认并不是并行执行的
    为什么默认不并行执行?
    也许你不理解,为什么AsyncTask默认把它设计为串行执行的呢?
    由于一个进程内所有的AsyncTask都是使用的同一个线程池执行任务;如果同时有几个AsyncTask一起并行执行的话,恰好AysncTask的使用者在doInbackgroud
    里面访问了相同的资源,但是自己没有处理同步问题;那么就有可能导致灾难性的后果!
    由于开发者通常不会意识到需要对他们创建的所有的AsyncTask对象里面的doInbackgroud
    做同步处理,因此,API的设计者为了避免这种无意中访问并发资源的问题,干脆把这个API设置为默认所有串行执行的了。如果你明确知道自己需要并行处理任务,那么你需要使用executeOnExecutor(Executor exec,Params... params)
    这个函数来指定你用来执行任务的线程池,同时为自己的行为负责。(处理同步问题)
    实际上《Effective Java》里面有一条原则说的就是这种情况:不要在同步块里面调用不可信的外来函数。这里明显违背了这个原则:AsyncTask这个类并不知道使用者会在doInBackgroud
    这个函数里面做什么,但是对它的行为做了某种假设。
    如何让AsyncTask并行执行?
    正如上面所说,如果你确定自己做好了同步处理,或者你没有在不同的AsyncTask里面访问共享资源,需要AsyncTask能够并行处理任务的话,你可以用带有两个参数的executeOnExecutor
    执行任务:
    1234567

    new AsyncTask<Void, Void, Vo @Override protected Void doInBackground(Void... params) { // do something return null; }}.executeOnExecutor(AsyncTask.THREAD_POOL_EXECUTOR);

    更好的AsyncTask
    从上面的分析得知,AsyncTask有如下问题:
    默认的AsyncTask如果处理的任务过多,会导致程序直接崩溃;
    AsyncTask类必须在主线程初始化,必须在主线程创建,不然在API 16以下很大概率崩溃。
    如果你曾经使用过AsyncTask,以后不用了;在Android 4.4以下,进程内也默认有5个AsyncTask线程;在Android 4.4以上,默认有CPU + 1
    个线程。
    Android 3.0以上的AsyncTask默认是串行执行任务的;如果要并行执行需要调用低版本没有的API,处理麻烦。

    因此我们对系统的AsyncTask做了一些修改,在不同Android版本提供一致的行为,并且提高了使用此类的安全性,主要改动如下:
    添加对于任务过多导致崩溃的异常保护;在这里进行必要的数据统计上报工作;如果出现这个问题,说明AsyncTask不适合这种场景了,需要考虑重构;
    移植API 22对于Handler的处理;这样就算在线程创建异步任务,也不会有任何问题;
    提供串行执行和并行执行的execute
    方法;默认串行执行,如果明确知道自己在干什么,可以使用executeParallel
    并行执行。
    在doInbackgroud
    里面频繁崩溃的地方加上try..catch
    ;自己处理数据上报工作。

    完整代码见gist,BetterAsyncTask
    原文地址:http://weishu.me/2016/01/18/dive-into-asynctask/


    大体流程

    • excute()方法中首先直接调用preExcute()方法
    • AsyncTask的构造方法构造了mWorkder这个WorkerRunnable对象.再用mWorker构造一个FutureTask对象丢到线程池里面去执行
    • AsyncTask的成员变量有个InternalHandler,构造的时候进行初始化.
    • FutureTask转调mWorker的run方法最后在线程池的子线程调用doInBackGround()方法.
    • 然后FutureTask调用setResult()方法把运行结果返回,完成后调用FutureTask的done方法.这里面用InternalHandler发了个消息给主线程,最后拿到结果调用 finish()

    • FutureTask类
      它需要Callable接口类型的参数,在AsyncTask类中,创建了WorkerRunnable的实现类和FutureTask类,在run方法中调用
    private static abstract class WorkerRunnable<Params, Result> 
                                    implements Callable<Result> {
        Params[] mParams;
    }
        
        
        
    public AsyncTask() {
       mWorker = new WorkerRunnable<Params, Result>() {
            public Result call() throws Exception {
                //调用熟悉的doInBackground方法,在子线程中执行,
                return postResult(doInBackground(mParams));
            }
        };
    
        mFuture = new FutureTask<Result>(mWorker) {
            @Override
            protected void done() {
                try {
                    postResultIfNotInvoked(get());
                } catch (InterruptedException e) {
                    android.util.Log.w(LOG_TAG, e);
                } catch (ExecutionException e) {
                    throw new RuntimeException("An error occured 
    while executing doInBackground()",
                            e.getCause());
                } catch (CancellationException e) {
                    postResultIfNotInvoked(null);
                }
            }
        };
    }
    
    //执行execute方法时
    public final AsyncTask<Params, Progress, Result> execute(Params... params) {
        onPreExecute();
        mWorker.mParams = params;
        //将任务添加到Executor的实现类ThreadPoolExecutor里面
        exec.execute(mFuture);
    }
    
    //FutureTask任务
    public class FutureTask<V> implements RunnableFuture<V>{
        private volatile int state;//任务运行的状态
        //构造函数java.util.concurrent.Callable
        public FutureTask(Callable<V> callable) {
            
        }
    
        //取消任务通过线程执行interrupt
    
        public boolean cancel(boolean mayInterruptIfRunning) {
             try {
                Thread t = runner;
                if (t != null)
                    t.interrupt();
            } finally { // final state
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
    
        }
        
        //此方法调用awaitDone,它会阻塞线程,一直等到线程执行完成
        public V get() throws InterruptedException, ExecutionException {
            int s = state;
            if (s <= COMPLETING)
                s = awaitDone(false, 0L);
            return report(s);
        }
        
        //变量c就是构造函数传的Callable<V> callable对象
        public void run() {
           if (c != null && state == NEW) {
                V result;
                result = c.call();
                //call里面的任务执行完毕之后,在set方法里面调用finishCompletion方法,
                  之后调用done方法表示任务执行完成。
                set(result);
            }
        }
        
        //消除和信号,所有等待的线程,调用done()
        private void finishCompletion() {
            // assert state > COMPLETING;
            for (WaitNode q; (q = waiters) != null;) {
                if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                    for (;;) {
                        Thread t = q.thread;
                        if (t != null) {
                            q.thread = null;
                            LockSupport.unpark(t);
                        }
                        WaitNode next = q.next;
                        if (next == null)
                            break;
                        q.next = null; // unlink to help gc
                        q = next;
                    }
                    break;
                }
            }
            done();
            callable = null;        // to reduce footprint
        }
        
        //异步任务执行完成,回调
        protected void done() { 
        
        }
    }
    

    ThreadPoolExecutor

    任务执行在这个类里面用到了ReentrantLock锁,实现了ExecutorService接口。

    //addWorker方法第二个参数core含义:true表示核心线程5,false表示非核心线程128
    public void execute(Runnable command) {
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            //调用handler.rejectedExecution抛出异常
            reject(command);
    }
    
     private boolean addWorker(Runnable firstTask, boolean core) {
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            for (;;) {
                int wc = workerCountOf(c);//当前work线程数量
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }
    
        Worker w = new Worker(firstTask);
        Thread t = w.thread;
    
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            workers.add(w);
        } finally {
            mainLock.unlock();
        }
    
        t.start();
        return true;
    }
    
    

    InternalHandler

    任务执行完时在子线程,此时会发送一个handler消息,handler接收到这个消息就会调用AsyncTask的finish方法,接着调用onPostExecute。

    private void finish(Result result) {
        if (isCancelled()) {
            onCancelled(result);
        } else {
            onPostExecute(result);
        }
        mStatus = Status.FINISHED;
    }
    
    private static class InternalHandler extends Handler {
        public void handleMessage(Message msg) {
            AsyncTaskResult result = (AsyncTaskResult) msg.obj;
            switch (msg.what) {
                case MESSAGE_POST_RESULT:
                    result.mTask.finish(result.mData[0]);
                    break;
                case MESSAGE_POST_PROGRESS:
                    result.mTask.onProgressUpdate(result.mData);
                    break;
            }
        }
    }
    

    异步任务执行的结果,主线程无法轻易的获取

    Java FutureTask 异步任务操作提供了便利性

    1.获取异步任务的返回值
    2.监听异步任务的执行完毕
    3.取消异步任务

    doBackground(call)
    call的返回值在Future的done方法中获取

    ->onPostExecute

    new MyTask().execute();

    实例化:
    new AsyncTask() -> new FutureTask()

    执行:
    Executor.execute(mFuture) -> SerialExecutor.myTasks(队列)
    -> (线程池)THREAD_POOL_EXECUTOR.execute

    线程池中的所有线程,为了执行异步任务

    CORE_POOL_SIZE 核心线程数
    MAXIMUM_POOL_SIZE 最大线程数量
    KEEP_ALIVE 1s闲置回收
    TimeUnit.SECONDS 时间单位
    sPoolWorkQueue 异步任务队列
    sThreadFactory 线程工厂


    如果当前线程池中的数量小于corePoolSize,创建并添加的任务。
    如果当前线程池中的数量等于corePoolSize,缓冲队列 workQueue未满,那么任务被放入缓冲队列、等待任务调度执行。
    如果当前线程池中的数量大于corePoolSize,缓冲队列workQueue已满,并且线程池中的数量小于maximumPoolSize,新提交任务会创建新线程执行任务。
    如果当前线程池中的数量大于corePoolSize,缓冲队列workQueue已满,并且线程池中的数量等于maximumPoolSize,新提交任务由Handler处理。
    当线程池中的线程大于corePoolSize时,多余线程空闲时间超过keepAliveTime时,会关闭这部分线程。

    public static final Executor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE, TimeUnit.SECONDS, sPoolWorkQueue, sThreadFactory);

    1.线程池容量不够抛出异常
    2.内存泄露
    3.一个线程,一个异步任务(?)

    测试代码

    public class FutureTest1 {
    
        public static void main(String[] args) {
            System.out.println("main ID: " + Thread.currentThread().getId());
            Task work = new Task();
            FutureTask<Integer> future = new FutureTask<Integer>(work) {
                // 异步任务执行完成,回调
                @Override
                protected void done() {
                    try {
                        System.out.println("done:" + get());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (ExecutionException e) {
                        e.printStackTrace();
                    }
                }
    
            };
            // 线程池(使用了预定义的配置)
            ExecutorService executor = Executors.newCachedThreadPool();
            executor.execute(future);
    
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
            // 取消异步任务
            // future.cancel(true);
    
            try {
                // 阻塞,等待异步任务执行完毕
                System.out.println(future.get()); // 获取异步任务的返回值
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    
        // 异步任务
        static class Task implements Callable<Integer> {
    
            // 返回异步任务的执行结果
            @Override
            public Integer call() throws Exception {
                int i = 0;
                for (; i < 10; i++) {
                    try {
                        System.out.println(Thread.currentThread().getName() + "_"
                                + i);
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println("ID: " + Thread.currentThread().getId());
    
                return i;
            }
    
        }
    
    }
    
    

    相关文章

      网友评论

          本文标题:AsyncTask原理分析

          本文链接:https://www.haomeiwen.com/subject/rfmjvttx.html