Android 中的“子线程”解析

作者: SheHuan | 来源:发表于2018-07-27 10:07 被阅读72次

    Android 中线程可分为主线程子线程两类,其中主线程也就是UI线程,它的主要这作用就是运行四大组件、处理界面交互。子线程则主要是处理耗时任务,也是我们要重点分析的。

    首先 Java 中的各种线程在 Android 里是通用的,Android 特有的线程形态也是基于 Java 的实现的,所以有必要先简单的了解下 Java 中的线程,本文主要包括以下内容:

    • Thread、Runnable
    • Callable、Future
    • 线程池
    • IntentService、HandlerThread
    • AsyncTask

    一、Thread、Runnable

    在 Java 中要创建子线程可以直接继承Thread类,重写run()方法:

    public class MyThread extends Thread {
            @Override
            public void run() {
                
            }
        }
    // 启动线程
    new MyThread().start();
    

    或者实现Runnable接口,然后用Thread执行Runnable,这种方式比较常用:

    public class MyRunnable implements Runnable {
            @Override
            public void run() {
    
            }
        }
    // 启动线程
    new Thread(new MyRunnable()).start();
    

    简单的总结下:

    • Runnable 可以实现多个线程共享资源,可以参考网上卖票的例子
    • Runnable 可以避免 Java 中的单继承的限制
    • 无法直接得到任务的执行结果

    二、Callable、Future

    CallableRunnable类似,都可以用来处理具体的耗时任务逻辑的,但是但具体的差别在哪里呢?看一个小例子:

    定义 MyCallable 实现了 Callable接口,和之前Runnablerun()方法对比下,call()方法是有返回值的哦,泛型就是返回值的类型:

    public class MyCallable implements Callable<String> {
            @Override
            public String call() throws Exception {
                Log.e("call", "task start");
                Thread.sleep(2000);
                Log.e("call", "task finish");
                return "hello thread";
            }
        }
    

    一般会通过线程池来执行Callable(线程池相关内容后边会讲到),执行结果就是一个Future对象:

            // 创建一个线程池
            ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
            // 执行任务
            Future<String> result = cachedThreadPool.submit(new MyCallable());
            try {
                // 获取执行结果
                Log.e("result", result.get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
    

    可以看到,通过线程池执行 MyCallable 对象返回了一个 Future 对象,取出执行结果。

    Future是一个接口,从其内部的方法可以看出它提供了取消任务(有坑!!!)、判断任务是否完成、获取任务结果的功能:

    public interface Future<V> {
        boolean cancel(boolean mayInterruptIfRunning);
        boolean isCancelled();
        boolean isDone();
        V get() throws InterruptedException, ExecutionException;
        V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
    }
    

    Future接口有一个FutureTask实现类,同时FutureTask也实现了Runnable接口,并提供了两个构造函数:

    public FutureTask(Callable<V> callable) {
    }
    
    public FutureTask(Runnable runnable, V result) {
    }
    

    FutureTask一个参数的构造函数来改造下上边的例子:

    ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
    FutureTask<String> futureTask = new FutureTask<>(new MyCallable());
    cachedThreadPool.submit(futureTask);
    try {
         Log.e("result", futureTask.get());
    } catch (InterruptedException e) {
         e.printStackTrace();
    } catch (ExecutionException e) {
         e.printStackTrace();
    }
    

    FutureTask内部有一个done()方法,代表Callable中的任务已经结束,可以用来获取执行结果:

    FutureTask<String> futureTask = new FutureTask<String>(new MyCallable()){
                @Override
                protected void done() {
                    super.done();
                    try {
                        Log.e("result", get());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (ExecutionException e) {
                        e.printStackTrace();
                    }
                }
            };
    

    所以Future + Callable的组合可以更方便的获取子线程任务的执行结果,更好的控制任务的执行,主要的用法先说这么多了,其实AsyncTask内部也是类似的实现!

    注意,Future并不能取消掉运行中的任务,这点在后边的AsyncTask解析中有提到。

    三、线程池

    Java 中线程池的具体的实现类是ThreadPoolExecutor,继承了Executor接口,这些线程池在 Android 中也是通用的。使用线程池的好处:

    • 方便对线程进行管理
    • 线程复用,避免大量创建、销毁线程带来的性能开销
    • 可控制线程的最大并发数,避免线程之间抢占资源造成阻塞

    常用的构造函数如下:

    public ThreadPoolExecutor(
                    // 线程池的核心线程数,如果设置allowCoreThreadTimeOut属性为true,当闲置时间大于keepAliveTime会被终止掉,否则会一直存活不受keepAliveTime影响
                    int corePoolSize, 
                    // 线程池能容纳的最大线程数,超过该数量的将会被阻塞
                    int maximumPoolSize,
                    // 线程闲置的超时时间
                    long keepAliveTime,
                    // 超时时间的单位
                    TimeUnit unit,
                    // 线程池的任务队列,保存通过execute()提交的Runnable,如果任务队列已满,则后续任务不被执行
                    BlockingQueue<Runnable> workQueue,
                    // 创建新的线程
                    ThreadFactory threadFactory) {
        }
    

    一个常规线程池可以按照如下方式来实现:

    public class ThreadPool {
        // CPU核心数
        private int CPU_COUNT = Runtime.getRuntime().availableProcessors();
        // 可同时下载的任务数(核心线程数)
        private int CORE_POOL_SIZE = CPU_COUNT;
        // 线程池容纳的最大线程数
        private int MAX_POOL_SIZE = 2 * CPU_COUNT + 1;
        // 超时时间
        private long KEEP_ALIVE = 10L;
    
        private ThreadPoolExecutor THREAD_POOL_EXECUTOR;
    
        private ThreadFactory sThreadFactory = new ThreadFactory() {
            private final AtomicInteger mCount = new AtomicInteger();
    
            @Override
            public Thread newThread(@NonNull Runnable runnable) {
                return new Thread(runnable, "download_task#" + mCount.getAndIncrement());
            }
        };
    
        private ThreadPool() {
        }
    
        public static ThreadPool getInstance() {
            return SingletonHolder.instance;
        }
    
        private static class SingletonHolder {
            private static final ThreadPool instance = new ThreadPool();
        }
    
        private ThreadPoolExecutor getThreadPoolExecutor() {
            if (THREAD_POOL_EXECUTOR == null) {
                THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(
                        CORE_POOL_SIZE,
                        MAX_POOL_SIZE,
                        KEEP_ALIVE,
                        TimeUnit.SECONDS,
                        new LinkedBlockingDeque<Runnable>(),
                        sThreadFactory);
            }
            return THREAD_POOL_EXECUTOR;
        }
    
        public void execute(Runnable command) {
            getThreadPoolExecutor().execute(command);
        }
    }
    

    执行任务:

    ThreadPool.getInstance().execute(new Runnable() {
                @Override
                public void run() {
                    // do something
                }
            });
    

    基于ThreadPoolExecutor,系统扩展了几类具有新特性的线程池:

    // 创建一个线程数量固定的线程池,核心线程数就是线程池能容纳的最大线程数,同时线程将一直存活,除非线程池被关闭
    // 如果任何线程在执行期间因故障而终止,在关闭之前,如果需要,新线程将取代它执行后续任务,
    // 如果所有线程都处于活动状态,新来的任务需要等待,直到有空闲线程
    ExecutorService fixedThreadPool = Executors.newFixedThreadPool();
    
    // 该线程池只有非核心线程,数量为Integer.MAX_VALUE
    // 按需创建线程,没有空闲线程时会创建新线程,否则复用空闲线程,任何任务都会被立即执行
    // 超时时间为60秒,当线程池长时间闲置时线程都会被终止掉,几乎不占用系统资源
    // 任务队列比较特殊是SynchronousQueue,而非一般的LinkedBlockingQueue
    ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
    
    // 创建一个核心线程数固定的线程池,可容纳的最大线程数量为Integer.MAX_VALUE,非核心线程的超时时间为0
    // 主要用于执行定时任务和有固定周期的任务
    ExecutorService scheduledThreadPool = Executors.newScheduledThreadPool();
    
    // 该线程池中只有一个核心线程,所有任务都通过该线程执行
    // 任务之间不用考虑线程同步的问题
    ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
    
    // Android api level 24 新加的
    // 会更加所需的并行层次来动态创建和关闭线程,试图减少任务队列的大小,所以比较适于高负载的环境
    // 也比较适用于当执行的任务会创建更多任务,如递归任务
    ExecutorService workStealingPool =  Executors.newWorkStealingPool();
    

    线程池可以通过execute()submit()方法开始执行任务,主要差别从方法的声明就可以看出,由于submit()有返回值,可以方便得到任务的执行结果:

    void execute(Runnable command)
    Future<?> submit(Runnable task)
    <T> Future<T> submit(Callable<T> task)
    <T> Future<T> submit(Runnable task, T result)
    

    要关闭线程池可以使用如下方法:

    void shutdown()
    List<Runnable> shutdownNow()
    

    四、IntentService、HandlerThread

    1、基本使用

    IntentService 是 Android 中一种特殊的 Service,可用于执行后台耗时任务,任务结束时会自动停止,由于属于系统的四大组件之一,相比一般线程具有较高的优先级,不容易被杀死。用法和普通 Service 基本一致,只需要在onHandleIntent()中处理耗时任务即可:

    public class DomainService extends IntentService {
        public DomainService() {
            super("DomainService");
        }
    
        @Override
        protected void onHandleIntent(@Nullable Intent intent) {
            // 处理耗时任务
        }
    
        @Override
        public void onDestroy() {
            super.onDestroy();
        }
    }
    

    至于 HandlerThread,它是 IntentService 内部实现的重要部分,细节内容会在 IntentService 源码中说到。

    2、源码解析

    IntentService 首次创建被启动的时候其生命周期方法onCreate()会先被调用,所以我们从这个方法开始分析:

    @Override
    public void onCreate() {
        super.onCreate();
        HandlerThread thread = new HandlerThread("IntentService[" + mName + "]");
        thread.start();
        mServiceLooper = thread.getLooper();
        mServiceHandler = new ServiceHandler(mServiceLooper);
    }
    

    这里出现了 HandlerThread 和 ServiceHandler 两个类,先搞明白它们的作用,以便后续的分析。

    首先看 HandlerThread 的核心实现:

    public class HandlerThread extends Thread {
    
        public HandlerThread(String name) {
            super(name);
            mPriority = Process.THREAD_PRIORITY_DEFAULT;
        }
    
        @Override
        public void run() {
            mTid = Process.myTid();
            Looper.prepare();
            synchronized (this) {
                mLooper = Looper.myLooper();
                notifyAll();
            }
            Process.setThreadPriority(mPriority);
            onLooperPrepared();
            Looper.loop();
            mTid = -1;
        }
    

    首先它继承了 Thread 类,可以当做子线程来使用,并在 run()方法中创建了一个消息循环系统、开启消息循环。

    ServiceHandler 是 IntentService 的内部类,继承了 Handler,具体内容后续分析:

        private final class ServiceHandler extends Handler {
        }
    

    现在回过头来看 onCreate()方法主要是一些初始化的操作, 首先创建了一个 thread 对象,并启动线程,然后用其内部的 Looper 对象 创建一个 mServiceHandler 对象,将子线程的 Looper 和 ServiceHandler 建立了绑定关系,可以在子线程使用 mServiceHandler 来发送处理消息了。

    生命周期方法 onStartCommand()方法会在 IntentService 每次被启动时调用,一般会这里处理启动 IntentService 传递 Intent 解析携带的数据:

        @Override
        public int onStartCommand(@Nullable Intent intent, int flags, int startId) {
            onStart(intent, startId);
            return mRedelivery ? START_REDELIVER_INTENT : START_NOT_STICKY;
        }
    

    又调用了start()方法:

        @Override
        public void onStart(@Nullable Intent intent, int startId) {
            Message msg = mServiceHandler.obtainMessage();
            msg.arg1 = startId;
            msg.obj = intent;
            mServiceHandler.sendMessage(msg);
        }
    

    就是用mServiceHandler发送了一条包含startIdintent的消息,消息的发送还是在主线程进行的,接下来消息的接收、处理就是在子线程进行的:

    private final class ServiceHandler extends Handler {
            public ServiceHandler(Looper looper) {
                super(looper);
            }
            @Override
            public void handleMessage(Message msg) {
                onHandleIntent((Intent)msg.obj);
                stopSelf(msg.arg1);
            }
        }
    

    当接收到消息时,通过onHandleIntent()方法在子线程处理 intent 对象,onHandleIntent()方法执行结束后,通过stopSelf(msg.arg1)等待所有消息处理完毕后终止服务。

    为什么消息的处理是在子线程呢?这里涉及到 Handler 的内部消息机制,简单的说,因为ServiceHandler使用的Looper对象就是在HandlerThread这个子线程类里创建的,并通过Looper.loop()开启消息循环,不断从消息队列(单链表)中取出消息,并执行,截取loop()的部分源码:

    for (;;) {
        Message msg = queue.next(); // might block
        if (msg == null) {
             // No message indicates that the message queue is quitting.
             return;
        }
        msg.target.dispatchMessage(msg);
    }
    

    dispatchMessage()方法间接会调用handleMessage()方法,所以最终onHandleIntent()就在子线程中划线执行了,即HandlerThreadrun()方法。

    这就是 IntentService 实现的核心,通过HandlerThread + Hanlder把启动 IntentService 的 Intent 从主线程切换到子线程,实现让 Service 可以处理耗时任务的功能!

    五、AsyncTask

    1、基本使用

    AsyncTask 是 Android 中轻量级的异步任务抽象类,它的内部主要由线程池以及 Handler 实现,在线程池中执行耗时任务并把结果通过 Handler 机制中转到主线程以实现UI操作。典型的用法如下:

        /**
         * 三个泛型参数Params、 Progress、 Result分别表示耗时任务输入参数类型、进度类型、返回的结果类型
         */
        public class DownloadAsyncTask extends AsyncTask<String, Integer, String> {
            /**
             * 在主线程执行,可在耗时任务开始前做一些准备工作
             */
            @Override
            protected void onPreExecute() {
                super.onPreExecute();
                Log.e("onPreExecute", "download prepare");
            }
    
            /**
             * 在线程池中执行耗时任务
             *
             * @param urls 输入参数
             * @return 耗时任务的结果
             */
            @Override
            protected String doInBackground(String... urls) {
                String url = "";
                for (String temp : urls) {
                    // 执行耗时任务
                    try {
                        for (int i = 0; i <= 100; i += 10) {
                            // publishProgress()用来更新任务进度,会调用onProgressUpdate()方法
                            publishProgress(i);
                            Thread.sleep(20);
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    url = temp;
                }
    
                return url;
            }
    
            /**
             * 在主线程执行,进度更新时会被调用
             *
             * @param values values[0]代表进度
             */
            @Override
            protected void onProgressUpdate(Integer... values) {
                super.onProgressUpdate(values);
                Log.e("onProgressUpdate", values[0] + "%");
            }
    
            /**
             * 在主线程执行,耗时任务执行结束
             *
             * @param url doInBackground()的返回值
             */
            @Override
            protected void onPostExecute(String url) {
                super.onPostExecute(url);
                Log.e("onPostExecute", url + " download finish");
            }
        }
    

    从 Android3.0 开始,AsyncTask 默认是串行执行的:

    new DownloadAsyncTask().execute("url-1");
    new DownloadAsyncTask().execute("url-2");
    

    如果需要并行执行可以这么做:

    new DownloadAsyncTask().executeOnExecutor(AsyncTask.THREAD_POOL_EXECUTOR, "url-1");
    new DownloadAsyncTask().executeOnExecutor(AsyncTask.THREAD_POOL_EXECUTOR, "url-2");
    
    2、源码解析

    AsyncTask 的源码不多,还是比较容易理解的。根据上边的用法,可以从execute()方法开始我们的分析:

        @MainThread
        public final AsyncTask<Params, Progress, Result> execute(Params... params) {
            return executeOnExecutor(sDefaultExecutor, params);
        }
    

    看到@MainThread注解了吗?所以execute()方法需要在主线程执行哦!

    进而又调用了executeOnExecutor()

    public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
                Params... params) {
            if (mStatus != Status.PENDING) {
                switch (mStatus) {
                    case RUNNING:
                        throw new IllegalStateException("Cannot execute task:"
                                + " the task is already running.");
                    case FINISHED:
                        throw new IllegalStateException("Cannot execute task:"
                                + " the task has already been executed "
                                + "(a task can be executed only once)");
                }
            }
            mStatus = Status.RUNNING;
            onPreExecute();
            mWorker.mParams = params;
            exec.execute(mFuture);
            return this;
        }
    

    可以看到,当任务正在执行或者已经完成,如果又被执行会抛出异常!回调方法onPreExecute()最先被执行了。

    传入的sDefaultExecutor参数,是一个自定义的串行线程池对象,所有任务在该线程池中排队执行:

    private static class SerialExecutor implements Executor {
            final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
            Runnable mActive;
    
            public synchronized void execute(final Runnable r) {
                mTasks.offer(new Runnable() {
                    public void run() {
                        try {
                            r.run();
                        } finally {
                            scheduleNext();
                        }
                    }
                });
                if (mActive == null) {
                    scheduleNext();
                }
            }
    
            protected synchronized void scheduleNext() {
                if ((mActive = mTasks.poll()) != null) {
                    THREAD_POOL_EXECUTOR.execute(mActive);
                }
            }
        }
    

    可以看到SerialExecutor线程池仅用于任务的排队,THREAD_POOL_EXECUTOR线程池才是用于执行真正的任务,就是我们线程池部分讲到的ThreadPoolExecutor

    static {
            ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                    CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
                    sPoolWorkQueue, sThreadFactory);
            threadPoolExecutor.allowCoreThreadTimeOut(true);
            THREAD_POOL_EXECUTOR = threadPoolExecutor;
        }
    

    再回到executeOnExecutor()方法中,那么exec.execute(mFuture)就是触发线程池开始执行任务的操作了。

    executeOnExecutor()方法中的mWorker是什么?mFuture是什么?答案在 AsyncTask 的构造函数中:

    public AsyncTask(@Nullable Looper callbackLooper) {
            mHandler = callbackLooper == null || callbackLooper == Looper.getMainLooper()
                ? getMainHandler()
                : new Handler(callbackLooper);
    
            mWorker = new WorkerRunnable<Params, Result>() {
                public Result call() throws Exception {
                    mTaskInvoked.set(true);
                    Result result = null;
                    try {
                        Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
                        //noinspection unchecked
                        result = doInBackground(mParams);
                        Binder.flushPendingCommands();
                    } catch (Throwable tr) {
                        mCancelled.set(true);
                        throw tr;
                    } finally {
                        postResult(result);
                    }
                    return result;
                }
            };
    
            mFuture = new FutureTask<Result>(mWorker) {
                @Override
                protected void done() {
                    try {
                        postResultIfNotInvoked(get());
                    } catch (InterruptedException e) {
                        android.util.Log.w(LOG_TAG, e);
                    } catch (ExecutionException e) {
                        throw new RuntimeException("An error occurred while executing doInBackground()",
                                e.getCause());
                    } catch (CancellationException e) {
                        postResultIfNotInvoked(null);
                    }
                }
            };
        }
    

    原来mWorker是一个Callable对象,mFuture是一个FutureTask对象,继承了Runnable接口。所以mWorkercall()方法会在mFuturerun()方法中执行,所以mWorkercall()方法在线程池得到执行!

    同时doInBackground()方法就在call()中方法,所以我们自定义的耗时任务逻辑得到执行,不就是我们第二部分讲的那一套吗!

    doInBackground()的返回值会传递给postResult()方法:

    private Result postResult(Result result) {
            @SuppressWarnings("unchecked")
            Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,
                    new AsyncTaskResult<Result>(this, result));
            message.sendToTarget();
            return result;
        }
    

    就是通过 Handler 将最终的耗时任务结果从子线程发送到主线程,具体的过程是这样的,getHandler()得到的就是 AsyncTask 构造函数中初始化的mHandlermHander又是通过getMainHandler()赋值的:

    private static Handler getMainHandler() {
            synchronized (AsyncTask.class) {
                if (sHandler == null) {
                    sHandler = new InternalHandler(Looper.getMainLooper());
                }
                return sHandler;
            }
        }
    

    可以在看到sHandler是一个InternalHandler类对象:

    private static class InternalHandler extends Handler {
            public InternalHandler(Looper looper) {
                super(looper);
            }
    
            @SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})
            @Override
            public void handleMessage(Message msg) {
                AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
                switch (msg.what) {
                    case MESSAGE_POST_RESULT:
                        // There is only one result
                        result.mTask.finish(result.mData[0]);
                        break;
                    case MESSAGE_POST_PROGRESS:
                        result.mTask.onProgressUpdate(result.mData);
                        break;
                }
            }
        }
    

    所以getHandler()就是在得到在主线程创建的InternalHandler对象,所以
    就可以完成耗时任务结果从子线程到主线程的切换,进而可以进行相关UI操作了。
    当消息是MESSAGE_POST_RESULT时,代表任务执行完成,finish()方法被调用:

    private void finish(Result result) {
            if (isCancelled()) {
                onCancelled(result);
            } else {
                onPostExecute(result);
            }
            mStatus = Status.FINISHED;
        }
    

    如果任务没有被取消的话执行onPostExecute(),否则执行onCancelled()

    如果消息是MESSAGE_POST_PROGRESSonProgressUpdate()方法被执行,根据之前的用法可以onProgressUpdate()的执行需要我们手动调用publishProgress()方法,就是通过 Handler 来发送进度数据:

    protected final void publishProgress(Progress... values) {
            if (!isCancelled()) {
                getHandler().obtainMessage(MESSAGE_POST_PROGRESS,
                        new AsyncTaskResult<Progress>(this, values)).sendToTarget();
            }
        }
    

    进行中的任务如何取消呢?AsyncTask 提供了一个cancel(boolean mayInterruptIfRunning),参数代表是否中断正在执行的线程任务,但是呢并不靠谱,cancel()的方法注释中有这么一段:

    Calling this method will result in {@link #onCancelled(Object)} being
    invoked on the UI thread after {@link #doInBackground(Object[])}
    returns. Calling this method guarantees that {@link #onPostExecute(Object)}
    is never invoked. After invoking this method, you should check the
    value returned by {@link #isCancelled()} periodically from
    {@link #doInBackground(Object[])} to finish the task as early as
    possible.

    大致意思就是调用cancel()方法后,onCancelled(Object)回调方法会在doInBackground()之后被执行而onPostExecute()将不会被执行,同时你应该doInBackground()回调方法中通过isCancelled()来检查任务是否已取消,进而去终止任务的执行!

    所以只能自己动手了:

      @Override
      protected String doInBackground(String... urls) {
          .........
          if (isCancelled()){
               // 手动抛出异常,并自己捕获或者直接return
          }
          .........        
      }
    

    AsyncTask 整体的实现流程就这些了,源码是最好的老师,自己跟着源码走一遍有些问题可能就豁然开朗了!

    相关文章

      网友评论

        本文标题:Android 中的“子线程”解析

        本文链接:https://www.haomeiwen.com/subject/xsqqpftx.html