美文网首页AndroidAndroid开发Android开发
(3)线程系列 - 一起解读ThreadUtils源码

(3)线程系列 - 一起解读ThreadUtils源码

作者: zhongjh | 来源:发表于2021-05-24 00:28 被阅读0次

开发第三方库的时候往往不会用到rxjava,会使用Thread来处理线程。那么就直接开门见山,让我们一步一步解读ThreadUtils源码。作者阅读源码习惯是先从public方法一个一个硬读下去,再配合使用方法来使用。

  • public static boolean isMainThread()
    这个方法很简单,调用Android原生代码Looper来判断是否是主线程,关于Android代码Looper在第四章上会有详解

  • public static Handler getMainHandler()
    代码是直接返回HANDLER,而这个HANDLER是一个全局静态主线程的变量 new Handler(Looper.getMainLooper());

  • public static void runOnUiThread(final Runnable runnable)
    在ui主线程上执行事件,会先通过Looper.myLooper()判断当前线程是否是ui主线程,如果是直接运行事件,如果不是,则通过上面的HANDLER来运行事件

  • public static ExecutorService getFixedPool(@IntRange(from = 1) final int size)
    该方法返回一个线程池,size参数则是线程池中线程数量,从注释和方法名中可以知道该方法FixedPool是个对线程数做限制的线程池,用于并发压力的场景下,那么我们沿着这个方法看看怎么实现创建线程池的

  • private static ExecutorService getPoolByTypeAndPriority(final int type)
    getFixedPool方法调用了该方法,该方法运行代码getPoolByTypeAndPriority(type, Thread.NORM_PRIORITY);Thread.NORM_PRIORITY是个优先级,优先级是5,而线程中是1-10,优先级数字越大则是更优先,为什么getFixedPool方法的size参数到这个方法变成了type参数了呢?因为其他type参数都是负数的,而这个传递必须是1以上,所以这个设置xx数量的线程方法,已经自动归纳为另一个类型了。

  • private static ExecutorService getPoolByTypeAndPriority(final int type, final int priority)
    该方法用到了全局静态Map<Integer, Map<Integer, ExecutorService>> TYPE_PRIORITY_POOLS,是个通过类型存储线程池的map,完整代码注释如下

    /**
     * 根据类型获取线程池
     * @param type 类型
     * @param priority 优先级
     * @return 线程池
     */
    private static ExecutorService getPoolByTypeAndPriority(final int type, final int priority) {
        // 同步map安全,防止线程不安全创建多个 Map线程池
        synchronized (TYPE_PRIORITY_POOLS) {
            ExecutorService pool;
            // 通过类型获取 Map线程池
            Map<Integer, ExecutorService> priorityPools = TYPE_PRIORITY_POOLS.get(type);
            if (priorityPools == null) {
                // 如果没有 Map线程池 则新建一个
                priorityPools = new ConcurrentHashMap<>();
                pool = ThreadPoolExecutor4Util.createPool(type, priority);
                // 加入线程池
                priorityPools.put(priority, pool);
                // 新建后加入 Map线程池
                TYPE_PRIORITY_POOLS.put(type, priorityPools);
            } else {
                // 根据线程池优先级获取线程池
                pool = priorityPools.get(priority);
                // 如果没有该线程池,则创建新的线程池
                if (pool == null) {
                    pool = ThreadPoolExecutor4Util.createPool(type, priority);
                    priorityPools.put(priority, pool);
                }
            }
            return pool;
        }
    }

从上面代码可知Map TYPE_PRIORITY_POOLS里面包含了Map ExecutorService,
也就是说线程池包含进了两层map,最外的第一层是key为线程池类型的,里面的第二层是key为优先级的。

  • ThreadPoolExecutor4Util.createPool
    在上面代码注释中可知这句核心关键代码,创建线程池,下面详细阶段这段代码

  • static final class ThreadPoolExecutor4Util extends ThreadPoolExecutor
    该类继承了ThreadPoolExecutor来创建线程池,分别创建了以下几种线程,跟rxjava是不是有点像呢

        /**
         * 创建线程池
         * @param type 类型
         * @param priority 优先级
         * @return 线程池
         */
        private static ExecutorService createPool(final int type, final int priority) {
            switch (type) {
                case TYPE_SINGLE:
                    // 创建 核心线程数为1,线程池最大线程数量为1,非核心线程空闲存活时长为0
                    // 只创建一个线程确保 顺序执行的场景,并且只有一个线程在执行
                    return new ThreadPoolExecutor4Util(1, 1,
                            0L, TimeUnit.MILLISECONDS,
                            new LinkedBlockingQueue4Util(),
                            new UtilsThreadFactory("single", priority)
                    );
                case TYPE_CACHED:
                    // 创建 核心线程数为0,线程池最大线程数量为128,非核心线程空闲存活时长为60秒
                    // 线程数为128个一般用于处理执行时间比较短的任务
                    return new ThreadPoolExecutor4Util(0, 128,
                            60L, TimeUnit.SECONDS,
                            new LinkedBlockingQueue4Util(true),
                            new UtilsThreadFactory("cached", priority)
                    );
                case TYPE_IO:
                    // 创建 核心线程数为可计算资源*2+1,线程池最大线程数量为可计算资源*2+1,非核心线程空闲存活时长为30秒
                    return new ThreadPoolExecutor4Util(2 * CPU_COUNT + 1, 2 * CPU_COUNT + 1,
                            30, TimeUnit.SECONDS,
                            new LinkedBlockingQueue4Util(),
                            new UtilsThreadFactory("io", priority)
                    );
                case TYPE_CPU:
                    // 创建 核心线程数为可计算资源+1,线程池最大线程数量为可计算资源*2+1,非核心线程空闲存活时长为30秒
                    return new ThreadPoolExecutor4Util(CPU_COUNT + 1, 2 * CPU_COUNT + 1,
                            30, TimeUnit.SECONDS,
                            new LinkedBlockingQueue4Util(true),
                            new UtilsThreadFactory("cpu", priority)
                    );
                default:
                    // 创建 核心线程数、线程池最大数量为自定义的,空闲存活时长为0
                    return new ThreadPoolExecutor4Util(type, type,
                            0L, TimeUnit.MILLISECONDS,
                            new LinkedBlockingQueue4Util(),
                            new UtilsThreadFactory("fixed(" + type + ")", priority)
                    );
            }
        }

除了创建,还重写了afterExecute方法和execute方法,不过这两个方法暂时没什么意义所以先不管。

在线程系列第二节讲到了创建线程池需要的ThreadFactory threadFactory(线程工厂)、BlockingQueue workQueue(任务队列),该ThreadUtils类自定义了线程工厂和任务队列,所以我们先了解该两类

  • private static final class LinkedBlockingQueue4Util extends LinkedBlockingQueue<Runnable>
    我们先了解LinkedBlockingQueue类,LinkedBlockingQueue这个队列接收到任务的时候,如果当前线程数小于核心线程数,则新建线程(核心线程)处理任务;如果当前线程数等于核心线程数,则进入队列等待。由于这个队列没有最大值限制,即所有超过核心线程数的任务都将被添加到队列中,这也就导致了 maximumPoolSize 的设定失效,因为总线程数永远不会超过 核心线程数。
    该类重写了offer方法,如果看过线程池的源码的同学,会发现如下图:


    是的,执行pool.execute后会执行队列的offer方法,那为什么重写offer方法呢,通过上面的解释可以知道,LinkedBlockingQueue的总线程数永远不会超过核心线程数,通过重写它,可以自定义TYPE_CACHED类型的线程池的核心线程为0,总线程数128。
  • static final class UtilsThreadFactory extends AtomicLong implements ThreadFactory
    线程工厂类,该类重写只是简单的创建了一个Thread返回

那么在这里我们基本知道了创建线程池的流程了,接下来,是我们调用该工具类时调用线程的整个流程是怎样的,通过断点调试,我们是最容易了解这个过程的,如图:


调用线程的整个流程
  • onClicktestSingle都是app上业务使用的方法
  • public static <T> void executeByCached(final BaseTask<T> baseTask)
    关键代码是直接创建了一个任务BaseTask传递到线程池使用,那么我们看看BaseTask方法,已经针对run方面的代码进行了全部注释,更多的注释可以下载源码观看
        @Override
        public void run() {
            // 判断是否循环计划内的
            if (isSchedule) {
                // 因为如果是在循环内的,那么runner还是之前的
                if (runner == null) {
                    // 判断当前状态如果是New,便赋值state=RUNNING,如果不是New,便返回
                    if (!state.compareAndSet(NEW, RUNNING)) {
                        return;
                    }
                    // 获取当前线程
                    runner = Thread.currentThread();
                    if (mTimeoutListener != null) {
                        Log.w("ThreadUtils", "Scheduled task doesn't support timeout.");
                    }
                } else {
                    // 如果不是RUNNING便直接返回
                    if (state.get() != RUNNING) {
                        return;
                    }
                }
            } else {
                // 判断当前状态如果是New,便赋值state=RUNNING,如果不是New,便返回
                if (!state.compareAndSet(NEW, RUNNING)) {
                    return;
                }
                // 获取当前线程
                runner = Thread.currentThread();
                if (mTimeoutListener != null) {
                    // 实例化 循环或延迟任务的线程池
                    mExecutorService = new ScheduledThreadPoolExecutor(1, (ThreadFactory) Thread::new);
                    // 调用了延迟运行任务
                    mExecutorService.schedule(new TimerTask() {
                        @Override
                        public void run() {
                            if (!isDone() && mTimeoutListener != null) {
                                timeout();
                                mTimeoutListener.onTimeout();
                            }
                        }
                    }, mTimeoutMillis, TimeUnit.MILLISECONDS);
                }
            }
            try {
                // 执行doInBackground方法获取值
                final T result = doInBackground();
                // 判断是否循环计划内的
                if (isSchedule) {
                    // 如果不是RUNNING便直接返回
                    if (state.get() != RUNNING) {
                        return;
                    }
                    getDeliver().execute(() -> onSuccess(result));
                } else {
                    // 判断当前状态如果是RUNNING,便赋值state=COMPLETING,如果不是RUNNING,便返回
                    if (!state.compareAndSet(RUNNING, COMPLETING)) {
                        return;
                    }
                    // 执行成功方法,getDeliver()已经封装了跳转ui线程
                    getDeliver().execute(() -> {
                        onSuccess(result);
                        onDone();
                    });
                }
            } catch (InterruptedException ignore) {
                // 被中断了,判断当前状态如果是CANCELLED,便赋值state=INTERRUPTED
                state.compareAndSet(CANCELLED, INTERRUPTED);
            } catch (final Throwable throwable) {
                // 如果出现异常了,判断当前状态如果是RUNNING,便赋值EXCEPTIONAL
                if (!state.compareAndSet(RUNNING, EXCEPTIONAL)) {
                    return;
                }
                // 执行成功方法,getDeliver()已经封装了跳转ui线程
                getDeliver().execute(() -> {
                    onFail(throwable);
                    onDone();
                });
            }
        }
  • cancel
    ThreadUtils的作者没明说Activity销毁是否要写cancel事件

从上面整个流程可以几个核心方法,我概括出来:

  1. 拥有者跟rxjava类似的doInBackground,onSucces等等方法
  2. 通过源码得知有延时、循环周期运行线程等方法
  3. 根据使用方式不一样来决定io,cpu线程数量,缓存时间,是否使用队列

相关文章

网友评论

    本文标题:(3)线程系列 - 一起解读ThreadUtils源码

    本文链接:https://www.haomeiwen.com/subject/nrajjltx.html