美文网首页Java 并发编程
Java并发编程(三):线程池

Java并发编程(三):线程池

作者: yeonon | 来源:发表于2018-12-09 16:50 被阅读0次

    1 概述

    线程池即包含一个或者多个线程的集合,类似的还有对象池,数据库连接池,他们本质上都是一样的,只是因为集合里的元素类型不同,所以名字不同而已。之所以要使用线程池这种模式,是因为创建线程是有一定开销的,如果在线程使用频繁且线程生命周期不长的场景(例如Web环境下,一个请求响应的生命周期可能非常短)下,创建线程、销毁线程的开销绝对不容忽视,线程池可以重用线程,当线程处理完任务之后不会直接销毁,而是根据某种策略来决定是应该销毁还是将线程重新放入池中,以此降低线程创建和销毁的开销。

    Java里的线程池与工作队列是密切相关的,在工作队列里保存了所有等待执行的任务,当有线程空闲的时候就会从工作队列里取出一个任务,执行任务,执行完毕后返回线程池,等待下一个任务。ThreadPoolExecutor类即线程池的实现类,该类实现了Executor接口,所以属于Executor框架里的一员,Executor框架是一个任务执行的抽象,目的是提供一种将“任务提交”和“任务执行”分离的机制。下面是ThreadPoolExecutor类的继承体系:

    igW01I.png

    2 线程池的使用

    在J.U.C包下有一个Executors类,该类包含了很多和线程池有关静态工厂方法,例如newFixedThreadPool,newWorkStealingPool等,通过这些静态工厂方法,我们可以很方便的使用线程池。下面是一个示例:

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(4);
        for (int i = 0; i < 4; i++) {
            executorService.execute(() -> {
                System.out.println("hello, world");
            });
        }
        executorService.shutdown();
        executorService.awaitTermination(1000, TimeUnit.SECONDS);
    }
    

    代码中首先调用Executors.newFixedThreadPool()方法获取了一个ExecutorService实例,这个ExecutorService实例可以说是使用Executor框架的基础,很多操作例如execute(),submit(),shutdown()等都是通过该类的对象来操作的。在调用newFixedThreadPool()的时候,传入了一个参数4,这里代表线程池中有4个线程(但实际上,并不是那么简单,稍后介绍到ThreadPoolExecutor源码的时候,会详细介绍构造函数中各个参数的意义),然后调用了4次execute方法,其实就是提交了4个任务的意思(这里的4只是我随意设的,实际上和newFixedThreadPool的参数没有什么直接关系,完全可以设置成100,意思就是提交100个任务),提交任务之后,Executor框架会根据线程池的配置以及执行策略来执行任务。

    在把任务提交完之后(并不意味着任务也执行完了),可以调用shutdown()方法,该方法的功能是让Executor框架停止接受新的任务,最后调用awaitTermination()方法,该方法是一个阻塞方法,会阻塞当前线程,当所有的任务都执行完毕或者设置的时间到期之后,线程会被唤醒,继续执行代码。

    这是最基本的线程池使用方法,直接使用Executors类的静态工厂方法获取ExecutorService实例,下面简单介绍一下该类下和线程池有关的几个静态工厂方法:

    • newCachedThreadPool,创建一个带有缓冲功能的线程池,线程池的最大容纳量是Integer.MAX_VALUE,但如果没有任务提交的时候,并不会把线程存在线程池里,比较适合线程使用频繁的场景。
    • newFixedThreadPool,创建一个固定容量的线程池,线程的最大容量即传入的参数值,在没有任务提交的情况下,线程会被保存在池里,使用场景比较宽泛。
    • newScheduledThreadPool,创建一个可以调度的线程池,例如每3s执行一次任务等。
    • newSingleThreadExecutor,创建只有一个线程的线程池。
    • newSingleThreadScheduledExecutor,创建只有一个线程的线程池,该线程池具有调度的功能。
    • newWorkStealingPool,创建一个具有工作窃取功能的线程池,工作窃取是一种高效的模式,例如A线程很忙,B线程很闲,B线程就可以从与A线程绑定的队列末尾取出任务并执行,属于一种非公平的模式,但CPU利用率有很大的提升,不会出现“一核有难,七核围观”的情况。Java7出现的Fork-join框架非常依赖这种模式。

    读者朋友可以自己试试这些个方法,体验一下有什么不同,这里我就不再逐一举例说明了。

    3 ThreadPoolExecutor类

    这一节是本文最核心,最重要的部分,也是最复杂的部分(我写的时候都觉得难以下笔)。主要介绍三个方面:

    1. ThreadPoolExecutor的构造函数及其各个参数的具体意义。
    2. ThreadPoolExecutor的几个重要方法源码分析。
    3. 如何自定义线程池。

    3.1 ThreadPoolExecutor的构造函数及其各个参数的具体意义

    ThreadPoolExecutor有很多重载的构造函数,但只是参数的个数不同而已,最根本的构造函数是有7个参数的那个重载形式,其他的构造函数最终都会调用这个构造函数,其源码如下所示:

        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.acc = System.getSecurityManager() == null ?
                    null :
                    AccessController.getContext();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }
    
    

    关键是参数的意义,其内部逻辑无非就是检查参数合法性以及对字段进行赋值而已。下面来逐一看看各个参数的意义:

    • corePoolSize,即核心线程数量,也是线程池至少要维护的数量,如果当前运行的线程少于这个值且有新的任务进来,那么就创建一个新的线程直接执行任务。如果当前运行的线程大于这个值,但小于maximumPoolSize,那么该任务会被提交到阻塞队列中,等待被调度,如果阻塞队列满了,才会创建新的线程去执行任务。
    • maximumPoolSize,即线程池最大容量,如果当前运行线程数量等于这个值,即已经达到上限了,而且理论上此时队列应该是已经满了,所以如果再有任务提交,那么该任务会被拒绝,拒绝逻辑根据配置的值可能会不同。
    • keepAliveTime,线程池维护线程所允许的空闲时间,当线程池中的线程大于corePoolSize时,如果此时没有新的任务提交,核心线程之外的线程也不会立即被销毁,而是等待一段时间后,如果仍然没有任务提交才会销毁。这个时间如果设置的合适,可以大大提高线程池性能,如果不合适,可能会造成性能降低,所以要小心设置该值。
    • unit,keepAliveTime设置的时间单位。
    • workQueue,工作队列,必须是BlockingQueue的子类,工作队列主要有两种类型,分别是有界队列和无界队列,两种类似各有各的优缺点,适合的场景也不同,需要慎重选择。
    • threadFactory,线程工厂,线程都是通过线程工程创建的,该参数非常重要。
    • handler,拒绝策略,在介绍maximumPoolSize有提到过如果当前运行的线程数量已经达到maximumPoolSize,新提交的任务会被拒绝,而拒绝的策略就是根据这个参数决定的,JDK默认实现了4中策略,分别是:
      1. AbortPolicy,直接抛出异常,是默认策略。
      2. CallerRunsPolicy,用调用者所在线程来执行任务。
      3. DiscardOldestPolicy,丢弃阻塞队列中最靠前的任务,即最老的任务,然后执行新提交的任务。
      4. DiscardPolicy,直接丢弃任务,也是一个常用的策略。

    大家可以去看看Executors里和线程池有关的方法源码,然后再对照这里的各个参数的意义,可能会有惊喜!

    3.2 ThreadPoolExecutor的几个重要方法源码分析

    阅读源码最重要的就是找到入口点,找到入口点,然后慢慢的跟着走下去,遇到不懂的方法再进到该方法里去看它的源码,这样就不容易迷失在复杂的源码海洋里了。ThreadPoolExecutor.execute就是其中一个入口点,下面就从该方法开始,慢慢阅读源码,揭开ThreadPoolExecutor的神秘面纱!

    下面是execute的源码(由于篇幅限制,我删除了一些注释):

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        //clt记录着runState和workerCount,ctl是一个AtomicIneger类型的变量
        int c = ctl.get();
        //workerCountOf()会返回当前活跃的线程数,如果小于corePoolSize就执行内部逻辑
        if (workerCountOf(c) < corePoolSize) {
            //addWorker()会尝试创建一个Worker并启动和其绑定的线程,command就是任务
            //如果成功启动,该方法会返回true,否则返回false
            if (addWorker(command, true))
                return;
            //再次获取ctl值
            c = ctl.get();
        }
        //如果当前线程处于RUNNING状态并且工作队列没满
        //如果队列已经满的话,workQueue.offer()会返回false
        if (isRunning(c) && workQueue.offer(command)) {
            //重新取得ctl值
            int recheck = ctl.get();
            //如果不是处于运行状态,由于之前的addWorker方法会把任务放入队列里,所以调用remove()方法来移除任务,完事之后调用reject来执行拒绝逻辑
            if (! isRunning(recheck) && remove(command))
                reject(command);
            //如果当前活跃的线程数是0,那么就直接添加任务
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //如果能执行到这里,说明线程已经不是RUNNING状态或者工作队列已满(隐含的条件是workerCount >= corePoolSize)。如果此时执行任务失败,就执行拒绝逻辑。
        else if (!addWorker(command, false))
            reject(command);
    }
    

    ig59bV.png

    上图就是execute的逻辑流程图(图是网上找的,在最后我会给出出处)。

    下面是addWorker()的源码:

        private boolean addWorker(Runnable firstTask, boolean core) {
            retry:
            for (;;) {
                int c = ctl.get();
                //获取运行状态
                int rs = runStateOf(c);
    
                //SHUTDOWN状态表示不再接受新任务,如果rs >= SHUTDOWN,那就继续后面的判断
                //后面的判断有三个:
                //1. 如果rs == SHUTDOWN,说明处于不再接受任务的状态
                //2. 如果firstTask == null,说明这是空任务
                //3. !workQueue.isEmpty(),如果队列不空,该表达式返回true
                //如果以上三个条件都成立,记得还有最外层的!符号,即要以上三个条件都不成力并且rs >= SHUTDOWN,这整个if才会成立,最后返回false,表示添加worker失败。
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
    
                for (;;) {
                    //获取workcount
                    int wc = workerCountOf(c);
                    //如果workcount大于CAPACITY,就直接返回false,表示添加worker失败
                    //或者,如果当前core参数为true,即表示添加核心线程,那么如果workcount大于corePoolSize,那么就不应该创建新线程,所以会返回false,如果当前core为false,即表示添加非核心线程,如果workcount大于maximumPoolSize,那么也不应该继续创建线程,最后返回false即可。
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    //使用CAS来增加workerCount,可能需要多次尝试
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    //重新的ctl值
                    c = ctl.get();  
                    //如果此时的ctl值和rs值不相等,说明状态以及改变,然后继续从头开始执行循环
                    if (runStateOf(c) != rs)
                        continue retry;
                }
            }
            
            //线程启动标志
            boolean workerStarted = false;
            //添加到worker队列的标志
            boolean workerAdded = false;
            Worker w = null;
            try {
                //创建一个新的Worker对象
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    //加锁
                    mainLock.lock();
                    try {
                        //获取当前的状态
                        int rs = runStateOf(ctl.get());
                        
                        //如果小于SHUTDOWN,表示是RUNNING状态,那么就判断t是否是活跃状态,如果是的话,就抛出异常,因为可能由于并发的原因,该线程已经被其他的Worker使用了
                        //如果现在状态是SHUTDOWN状态且任务是空任务,并且线程处于活跃状态,那么就抛出异常
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) 
                                throw new IllegalThreadStateException();
                            workers.add(w);
                            //如果一切正常,就继续执行下面的逻辑
                            //获取worker集合的大小
                            int s = workers.size();
                            //如果大小大于largestPoolSize,就将largestPoolSize设置为s
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            //能走到这,说明worker已经被添加到worker集合里了
                            workerAdded = true;
                        }
                    } finally {
                        //解锁
                        mainLock.unlock();
                    }
                    //如果worker已经被添加
                    if (workerAdded) {
                        //启动线程
                        t.start();
                        //线程启动标志设置为true
                        workerStarted = true;
                    }
                }
            } finally {
                //如果线程没有启动
                if (! workerStarted)
                    //调用添加addWorkerFailed()的逻辑
                    addWorkerFailed(w);
            }
            //最后返回线程启动标志
            return workerStarted;
        }
    

    上面的两段源码,都用到了worker,那么这个worker是什么呢?下面是Worker类的源码:

    private final class Worker
            extends AbstractQueuedSynchronizer
            implements Runnable
        {
            /**
             * This class will never be serialized, but we provide a
             * serialVersionUID to suppress a javac warning.
             */
            private static final long serialVersionUID = 6138294804551838833L;
    
            /** Thread this worker is running in.  Null if factory fails. */
            final Thread thread;
            /** Initial task to run.  Possibly null. */
            Runnable firstTask;
            /** Per-thread task counter */
            volatile long completedTasks;
    
            /**
             * Creates with given first task and thread from ThreadFactory.
             * @param firstTask the first task (null if none)
             */
            Worker(Runnable firstTask) {
                setState(-1); // inhibit interrupts until runWorker
                this.firstTask = firstTask;
                this.thread = getThreadFactory().newThread(this);
            }
    
            /** Delegates main run loop to outer runWorker  */
            public void run() {
                runWorker(this);
            }
    
            // Lock methods
            //
            // The value 0 represents the unlocked state.
            // The value 1 represents the locked state.
    
            protected boolean isHeldExclusively() {
                return getState() != 0;
            }
    
            protected boolean tryAcquire(int unused) {
                if (compareAndSetState(0, 1)) {
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
                return false;
            }
    
            protected boolean tryRelease(int unused) {
                setExclusiveOwnerThread(null);
                setState(0);
                return true;
            }
    
            public void lock()        { acquire(1); }
            public boolean tryLock()  { return tryAcquire(1); }
            public void unlock()      { release(1); }
            public boolean isLocked() { return isHeldExclusively(); }
    
            void interruptIfStarted() {
                Thread t;
                if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    }
                }
            }
        }
    

    因为涉及到AbstractQueuedSynchronizer,即AQS,这里先不介绍其相关的方法,其实只要知道Worker保存着什么东西,也基本能理解了。Worker还实现了Runnable接口,所以也完全可以作为一个任务提交给线程,实际上,Worker就是对线程的封装,ThreadPool的各个操作的对象主要就是Worker,并不会直接操作Thread。

    Worker里两个重要字段是thread和firstTask,即线程实例和具体的任务。thread实例是由ThreadFactory产生的,调用ThreadFactory.newThread()即可获得一个thread实例,该方法接受一个Runnable实例,Worker类在获取thread传入的参数是this,即worker自己本身,从这可以看出,Worker和Thread其实是一个双向绑定的关系,当thead调用start方法的时候,会执行worker里run()逻辑,worker里的run只调用了一个runWorker()方法,该方法会判断各种状态,然后决定采用哪种方式执行任务,例如如果当前的任务为空,那么就尝试从工作队列里取出任务,然后执行。这里我就不带着大家分析runWroker方法以及其他方法了,比较不是专门源码分析的文章,希望大家能自己去看看,肯定能加深对源码的理解。

    3.3 自定义线程池

    我将要介绍的“自定义线程池”并不是指从头开始写一个线程池,而是指的自定义自己的线程工厂,拒绝策略等参数。如果是从头开始写一个线程池,那无异于“重复造轮子”,而且算是比较复杂的一个项目了,底层要考虑的东西很多,各个线程之间如何协调,如何防止发生死锁,如何防止内存泄露等问题都需要考虑。

    我们可以通过继承ThreadPoolExecutor来重写部分方法,因为ThreadPoolExecutor本身设计的时候就是可扩展的,可定制的,也预留了一些方法来让扩展者自行实现,例如beforeExecute()和afterExecute()等方法。同样,ThreadFactory和RejectedExecutionHandler都是我们可以自己实现的。下面是一个示例:

    //继承ThreadPoolExecutor,重写方法,最好还是不要重写execute等重要的方法,可能会造成逻辑混乱
    public class MyThreadPool extends ThreadPoolExecutor {
    
    
        public MyThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        }
    
        @Override
        protected void beforeExecute(Thread t, Runnable r) {
            System.out.println("before execute");
            System.out.println(t + " will execute!");
        }
    
        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            System.out.println("after execute");
            System.out.println(Thread.currentThread() + "finish execute");
        }
    }
    
    //实现ThreadFactory接口,重写newThread方法即可
    public class MyThreadFactory implements ThreadFactory {
    
        //这里简单粗暴的直接new一个线程
        @Override
        public Thread newThread(Runnable r) {
            Thread newThread = new Thread(r);
            newThread.setName("myThread : " + newThread.getId());
            return newThread;
        }
    }
    
    //拒绝策略,实现RejectedExecutionHandler
    public class MyRejectedExecutionHandler implements RejectedExecutionHandler {
    
        //没做什么事
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            System.out.println("reject " + r);
        }
    }
    
    //测试类
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService =
                new MyThreadPool(1, 1, 10, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(),
                                new MyThreadFactory(), new MyRejectedExecutionHandler());
    
        for (int i = 0; i < 4; i++) {
            executorService.execute(() -> {
                System.out.println("hello,world");
            });
        }
        executorService.shutdown();
        executorService.awaitTermination(1000, TimeUnit.SECONDS);
    }
    
    
    

    其实很简单,阅读来源码之后,对于为什么可以自定义线程池的配置,应该是非常容易理解了的,就不多说了。

    4 小结

    本文介绍了线程池的基本概念,其特点、好处以及简单使用,之后还详细分析了ThreadPoolExecutor类的部分源码,希望读者能接着阅读剩余的源码,加深理解,最后还尝试自定义线程池的各项配置,ThreadPoolExecutor本身就是可扩展的,所以这个过程非常简单。

    5 参考资料

    深入理解 Java 线程池:ThreadPoolExecutor

    《Java并发编程实战》

    相关文章

      网友评论

        本文标题:Java并发编程(三):线程池

        本文链接:https://www.haomeiwen.com/subject/owkkhqtx.html