美文网首页知识点Android开发经验谈Android技术知识
Java 线程池之必懂应用-原理篇(上)

Java 线程池之必懂应用-原理篇(上)

作者: 小鱼人爱编程 | 来源:发表于2021-10-10 13:15 被阅读0次

    前言

    线程并发系列文章:

    Java 线程基础
    Java 线程状态
    Java “优雅”地中断线程-实践篇
    Java “优雅”地中断线程-原理篇
    真正理解Java Volatile的妙用
    Java ThreadLocal你之前了解的可能有误
    Java Unsafe/CAS/LockSupport 应用与原理
    Java 并发"锁"的本质(一步步实现锁)
    Java Synchronized实现互斥之应用与源码初探
    Java 对象头分析与使用(Synchronized相关)
    Java Synchronized 偏向锁/轻量级锁/重量级锁的演变过程
    Java Synchronized 重量级锁原理深入剖析上(互斥篇)
    Java Synchronized 重量级锁原理深入剖析下(同步篇)
    Java并发之 AQS 深入解析(上)
    Java并发之 AQS 深入解析(下)
    Java Thread.sleep/Thread.join/Thread.yield/Object.wait/Condition.await 详解
    Java 并发之 ReentrantLock 深入分析(与Synchronized区别)
    Java 并发之 ReentrantReadWriteLock 深入分析
    Java Semaphore/CountDownLatch/CyclicBarrier 深入解析(原理篇)
    Java Semaphore/CountDownLatch/CyclicBarrier 深入解析(应用篇)
    最详细的图文解析Java各种锁(终极篇)
    线程池必懂系列

    线程池系列文章:

    Java 线程池之线程返回值
    Java 线程池之必懂应用-原理篇(上)
    Java 线程池之必懂应用-原理篇(下)

    Java 线程池是面试、工作必须掌握的基础之一,使用线程池能够更好地规划应用CPU占用率,提高应用运行的流畅度,本篇将来探索线程池的应用与原理。
    通过本篇文章,你将了解到:

    1、为什么需要线程池
    2、自己实现简单线程池
    3、线程池原理
    4、总结

    1、为什么需要线程池

    名词由来

    池,顾名思义:就是装一堆东西的地方。当有需要的时候,从池子里拿东西,当不用的时候,就往池子里放。可以看出存放、取用都很方便。

    线程池类比

    咱们在打疫苗的时候,若是啥时候想打啥时候过去,到了医院,医生需要换上防护服,戴上手套,准备疫苗,准备电脑记录信息等。


    image.png

    接种者1 到了医院,医生进行各项准备工作,最后给接种者1打了疫苗后,卸下装备。此时接种者2也到了医院,医生又需要换上装备。可以看出医生的准备工作耗时费力,若是在固定的时间接种疫苗,医生就无需频繁换装备,既节约医生时间,也缩短了接种者的等待时间。如下图:


    image.png

    为什么需要线程池

    从上面的例子可知,因为医生的准备工作耗时费力,因此尽可能集中打一段时间再换装备。而对于计算机里线程也是类似的:

    1、线程切换需要切换上下文,切换上下文涉及到系统调用,占用系统资源。
    2、线程切换需要时间,成功创建线程后才能真正做事。
    3、线程开启后无法有效管理线程。

    基于以上原因,需要引入线程池。

    2、自己实现简单线程池

    简单Demo

        private void createThread() {
            Thread thread = new Thread(() -> {
                System.out.println("thread running...");
            });
            thread.start();
        }
    

    该线程执行完毕就结束了,现在想让它不结束:

        private void createThread() {
            Thread thread = new Thread(() -> {
                while (true) {
                    System.out.println("thread running...");   
                }
            });
            thread.start();
        }
    

    线程一直在运行着,外部想要提交Runnable 给它运行,那么需要有个共享的变量,选择队列作为共享变量:

    class ThreadPool {
        public static void main(String args[]) {
            ThreadPool threadPool = new ThreadPool();
            threadPool.createThread();
            threadPool.startRunnable();
        }
    
        BlockingQueue<Runnable> shareQueue = new LinkedBlockingQueue<>();
        private void createThread() {
            Thread thread = new Thread(() -> {
                while (true) {
                    try {
                        Runnable targetRunnable = shareQueue.take();
                        targetRunnable.run();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            thread.start();
        }
    
        public void startRunnable() {
            for (int i = 0; i < 10; i++) {
                int finalI = i;
                Runnable runnable = new Runnable() {
                    @Override
                    public void run() {
                        System.out.println("执行 runnable " + finalI);
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                };
                System.out.println("放入队列 runnable " + i);
                shareQueue.offer(runnable);
            }
        }
    }
    

    上述Demo里只开启了一个线程,该线程是个死循环。线程体里从共享队列shareQueue 取出Runnable,若有元素则拿出来执行,若没有则阻塞等待。
    而外部调用者可以往共享队列里存放Runnable,等待线程执行该Runnable。
    由此实现了只有一个线程在运行,却是可以执行不同的任务,也避免了每个任务都需要开启线程执行的情况。

    3、线程池原理

    线程池基本构成

    上面虽然实现了线程池,但是是乞丐版的,很明显地看出很多缺陷:

    1、线程一直在运行,不能停下来。
    2、只有一个线程在执行任务,其它任务需要排队。
    3、队列无限膨胀,消耗内存。
    4、其它缺点...

    作为通用的工具库,来看看Java 线程池是如何实现的。
    线程池的核心围绕着一个原子变量:ctl

    #ThreadPoolExecutor.java
        //初始化状态:线程池处在运行状态,当前线程数为0
        private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
        //用来表示线程数的位数个数,为29
        private static final int COUNT_BITS = Integer.SIZE - 3;
        //线程池线程最大个数,(1<<29) - 1
        private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    
        // 线程池的5种状态
        private static final int RUNNING    = -1 << COUNT_BITS;
        private static final int SHUTDOWN   =  0 << COUNT_BITS;
        private static final int STOP       =  1 << COUNT_BITS;
        private static final int TIDYING    =  2 << COUNT_BITS;
        private static final int TERMINATED =  3 << COUNT_BITS;
    
        //ctl 为int 类型,总共32位,包含了两个值,高3位表示线程池状态,低29位表示线程个数
        //提取线程池状态
        private static int runStateOf(int c)     { return c & ~CAPACITY; }
        //获取线程个数
        private static int workerCountOf(int c)  { return c & CAPACITY; }
        //将状态和线程个数存储在int 里
        private static int ctlOf(int rs, int wc) { return rs | wc; }
    
    image.png

    为什么需要29位表示线程数呢,因为线程池状态有5种,需要3位才能区分这5种状态。

    线程池执行任务

    ThreadPoolExecutor 是线程池最核心的类,实现了Executor 接口,并重写了execute(Runnable) 方法。
    当外部调用者需要线程池执行任务时,只需要调用ThreadPoolExecutor.execute(Runnable)方法即可,线程池里的某个线程体里将会执行Runnable,也即是执行了一次任务。

    #ThreadPoolExecutor.java
        public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            //获取ctl 变量值
            int c = ctl.get();
            //当前正在运行的线程数没有超过核心线程数
            if (workerCountOf(c) < corePoolSize) {
                //新加入的任务将会新建一个核心线程来执行它
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            //如果线程池还在运行,那么就往等待队列里添加任务
            if (isRunning(c) && workQueue.offer(command)) {
                //再次获取ctl 变量值
                int recheck = ctl.get();
                //再次检查线程池是否在运行
                if (! isRunning(recheck) && remove(command))
                    //若没有运行了,那么将任务从队列里移出,移出成功则执行拒绝策略(类似定义的回调)
                    reject(command);
                //若是当前没有线程在运行
                else if (workerCountOf(recheck) == 0)
                    //新增非核心线程执行新的任务,注意此时任务是放在队列里的,因此
                    //这里的第一个参数为null
                    addWorker(null, false);
            }
            //加入队列失败后,尝试直接新建非核心线程执行该任务
            else if (!addWorker(command, false))
                //执行拒绝任务
                reject(command);
        }
    

    可以看出有多次判断ctl的值,这是因为可能存在多个线程同时操作ctl的值,而ctl是AtomicInteger类型,底层使用的是CAS,我们知道CAS是无锁的,因此需要循环判断其状态。
    线程池很多地方都是依赖CAS,可以说理解了CAS就大部分读懂了线程池。
    CAS 可移步查看:Java Unsafe/CAS/LockSupport 应用与原理

    上述代码用流程图表示如下:


    image.png

    线程池几个概念

    上述流程涉及到几个概念:

    1、核心线程、核心线程数
    2、非核心线程
    3、最大线程数
    3、等待队列

    计算机世界很多时候可以在现实世界找到类比关系,还是以打疫苗为例:

    1、某个接种点只有4位医生可以打疫苗,这四位医生可类比4个线程,因为他们是常驻该接种点,因此核心医生-->核心线程
    2、4位医生同时只能接种4个人,当同时来的接种人数超过4人,那么就需要按顺序排队等候,这个等候队伍称为等待队列
    3、某天该接种点突然来了很多待接种者,队伍排得很长,实在排不下去了,于是需要其他地方的医生支援。这些赶过来的医生并非常驻此地,他们支援完成后就要回去了,因此可类比为:非核心线程
    4、核心线程 + 非核心线程 == 最大线程数 (类似接种点里最多接种医生的人数)。
    5、当然,有些接种点医生资源比较紧张,队伍太长了也没其他地方的医生过来支援,一天都打不完,于是剩下的待接种者被告知,今天打不了了你们回去吧,这个过程可类比:拒绝策略。即使有其他医生来支援,但是待接种者还是很多,超过了接种点的接种能力,这个时候新来的待接种者也是不能接种的,还是会被拒绝。

    image.png

    核心线程和非核心线程有啥区别呢?

    1、在没有达到最大核心线程数的时候,有新的任务到来都会尝试新建核心线程来执行新任务。
    2、核心线程常驻线程池,除非设置了超时销毁,否则一直等待执行新的任务。
    3、非核心线程执行完毕后,不管是否设置了超时销毁,只要没有任务执行了,就会退出线程执行。

    实际上线程池里淡化了线程本身的概念,只关注任务是否得到执行,而不关注任务被哪个线程执行,因此核心线程、非核心线程最主要的区别在于是否常驻线程池。
    核心线程、等待队列、非核心线程 三者执行任务顺序:


    image.png

    线程池管理线程

    上面提到过一个重要的方法:addWorker(xx)。

    #ThreadPoolExecutor.java
        private boolean addWorker(Runnable firstTask, boolean core) {
            //跳转标记,java里一般很少用,c/c++用得比较多
            retry://--------------------(1)
            for (;;) {
                //死循环,主要是用来判断线程池状态以及线程个数
                int c = ctl.get();
                //取出线程池状态
                int rs = runStateOf(c);
    
                //如果线程池已经关闭/关闭中,则无需再往里边添加了
                if (rs >= SHUTDOWN &&
                        ! (rs == SHUTDOWN &&
                                firstTask == null &&
                                ! workQueue.isEmpty()))
                    return false;
    
                for (;;) {
                    //取出线程个数
                    int wc = workerCountOf(c);
                    //当前需要开启核心线程数,而核心线程数已经满了(其它线程成功添加了任务到核心线程)
                    //或者需要开启非核心线程,但是超出了总的线程数
                    //这两种情况下,将不能再添加任务到线程池
                    if (wc >= CAPACITY ||
                            wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    //CAS 修改ctl,增加线程个数计数
                    //如果不成功(被别人改了),则继续循环
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    //成功增加计数
                    c = ctl.get();  // Re-read ctl
                    //状态发生改变了,则再重试
                    if (runStateOf(c) != rs)
                        //跳转到开头的标记
                        continue retry;
                }
            }
    
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                //创建Worker,将任务添加到Worker里,并创建线程(new Thread)
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    //线程创建成功
                    final ReentrantLock mainLock = this.mainLock;//-----------------(2)
                    //上锁
                    mainLock.lock();
                    try {
                        //获取线程池状态
                        int rs = runStateOf(ctl.get());
                        if (rs < SHUTDOWN ||
                                (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive())
                                throw new IllegalThreadStateException();
                            //添加worker到HashSet里
                            workers.add(w);
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            //标记添加成功
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                        //正式开启线程
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            //线程池添加任务成功的标记
            return workerStarted;
        }
    

    该方法有两个参数,第一个参数为Runnable,第二个为指示要添加的线程是否为核心线程。
    上述标记了两个重点:
    (1)
    为啥需要死循环?因为ctl变量可能存在多个线程修改的情况,而ctl 为AtomicInteger,底层为CAS,因此需要多次判断直至条件满足为止。

    (2)
    为啥需要锁?因为worker声明如下:

        private final HashSet<Worker> workers = new HashSet<>();
    

    该集合可能被多个线程操作,因此其添加(add)、删除(remove)需要上锁后才能安全操作。
    Worker 继承自AQS,利用AQS 实现了自己的一套锁(类似ReentrantLock)。
    其存放的主要信息如下:


    image.png

    线程对象存储在workers里,而线程池通过管理workers来控制核心/非核心线程数。

    线程池管理任务

    线程已经被创建,接下来看看如何来执行任务。
    Worker 实现了Runnable接口,因此必须要重写run()方法,当构造Thread 对象时传入了Worker作为Thread的Runnable,因此当Thread.start()后实际执行的Runnable为Worker里的run()方法,而该方法调用了runWorker(xx)。
    来看看其源码:

    #ThreadPoolExecutor.java
        final void runWorker(Worker w) {
            //当前线程
            Thread wt = Thread.currentThread();
            //取出任务
            Runnable task = w.firstTask;
            w.firstTask = null;
            w.unlock(); // allow interrupts
            boolean completedAbruptly = true;
            try {
                //两个条件------------>(1)
                while (task != null || (task = getTask()) != null) {
                    //worker 上锁
                    w.lock();------------>(2)
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                            (Thread.interrupted() &&
                                    runStateAtLeast(ctl.get(), STOP))) &&
                            !wt.isInterrupted())
                        //如果线程池已经被停止运行,并且该线程没有被中断过
                        //那么中断该线程
                        wt.interrupt();
                    try {
                        //执行任务前的回调(钩子)
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                            //真正执行任务
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            //执行任务后的回调
                            afterExecute(task, thrown);
                        }
                    } finally {
                        //置空
                        task = null;
                        //该线程完成任务计数
                        w.completedTasks++;
                        //释放锁
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                //线程退出执行
                processWorkerExit(w, completedAbruptly);
            }
        }
    

    上面标记出了两个重点:
    (1)
    在最开始的自己实现线程池Demo里,我们有演示过如何让线程一直在运行,而此处有两个判断条件:

    1、当前worker关联的第一个任务是否存在,若是则取出运行。这种判断对应于新增核心线程/非核心线程(非存放在队列里)的场景。也就是说,当线程开启后,若是关联了任务则拿出执行。
    2、当线程没有关联到第一个任务,这种判断对应于线程不是第一次执行或是线程第一次执行(有任务在队列里),此时从队列里取出任务执行。

    getTask()方法如下:

        private Runnable getTask() {
            boolean timedOut = false; // Did the last poll() time out?
    
            //死循环为了判断线程池状态
            for (;;) {
                int c = ctl.get();
                //取出状态
                int rs = runStateOf(c);
                //线程池关闭或是没有任务在等待
                if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                    //减少线程计数
                    decrementWorkerCount();
                    return null;
                }
                int wc = workerCountOf(c);
                //判断是否需要超时等待
                //1、allowCoreThreadTimeOut 外部设置了是否允许超时关闭核心线程
                //2、当前线程数是否大于核心线程数(也就是开启了非核心线程数)
                boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
                //总共有四种组合判断
                //timeout 指的是获取队列元素是否已经发生超时
                if ((wc > maximumPoolSize || (timed && timedOut))
                        && (wc > 1 || workQueue.isEmpty())) {
                    //若是超时发生,或者队列为空,那么尝试减少线程计数
                    //修改计数成功,则返回null
                    if (compareAndDecrementWorkerCount(c))
                        return null;
                    continue;
                }
    
                try {
                    //决定是否超时获取元素
                    //若是设定了超时获取元素,那么当超时时间耗尽后,poll方法将会返回,底层使用LockSupport
                    //若没有设定超时,那么一直阻塞直到队列有元素为止
                    Runnable r = timed ?
                            workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                            workQueue.take();
                    if (r != null)
                        return r;
                    //走到这超时了
                    timedOut = true;
                } catch (InterruptedException retry) {
                    timedOut = false;
                }
            }
        }
    

    可以看出:

    1、借助于阻塞队列的特性,在没有设置超时的情况下,核心线程将一直会阻塞在BlockingQueue.take()方法,进而线程一直被阻塞于此,直到有新的任务被放入线程池请求执行。这也是为什么核心线程能够常驻线程池的原因。
    2、当线程数超过了核心线程数,那么timed=true,也就是此时线程是获取队列元素时设置了超时的,若超时过了没有获取到元素,那么就会退出线程执行。这也是为什么非核心线程不能常驻线程池的原因。

    (2)
    此处为什么需要锁?
    前面提到过操作wokers集合需要锁:mainLock,它的目的是为了多线程对集合进行添加/删除操作的安全性进行考虑。每个woker同时只能由一个线程操作,似乎没有加锁的必要,其实Worker锁更重要的作用体现在:

    用来巧妙地判断线程是否在运行中(忙碌中)。
    当有任务执行的时候,Runnable.run()方法被上锁,执行结束后释放锁。因此当判断Woker没有获取锁时,表明它正在等待获取队列的元素,此时它是空闲的。

    判断了线程的空闲与否可以用来给外部中断线程池的执行提供依据。

    4、总结

    至此,我们了解线程池的核心优势:不频繁重复创建线程。依靠阻塞队列特性使得线程常驻,用图表示如下:


    image.png

    由于篇幅原因,剩下的线程池关闭、线程池一些重要的运行时状态、简单的创建线程池的几种方式等将在下篇分析,敬请关注!

    演示代码 若是有帮助,给github 点个赞呗~

    您若喜欢,请点赞、关注,您的鼓励是我前进的动力

    持续更新中,和我一起步步为营系统、深入学习Android/Java

    相关文章

      网友评论

        本文标题:Java 线程池之必懂应用-原理篇(上)

        本文链接:https://www.haomeiwen.com/subject/hfsholtx.html