美文网首页技术杂谈
Java线程池分析

Java线程池分析

作者: 香芋牛奶面包 | 来源:发表于2019-02-16 16:27 被阅读1次

    引言

    在并发编程中,我们经常会使用到线程池,当然我们也可以手动一个一个创建线程,那么为何我们还是推崇大家使用线程池进行并发编程呢?借用《Java并发编程的艺术》提到的来说一下使用线程池的优点有3个

    • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
    • 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
    • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

    ThreadPoolExecutor类

    在日常使用中,大多数情况我们都会使用JDK提供的Executors去创建线程池,Executors利用工厂模式向我们提供了4种线程池实现方式,但是最新的阿里Java开发手册中明确说明了不建议使用Executors去创建线程池,原因是使用Executors创建线程池会使用很多默认值,默认使用的参数有时候是不合理,但是开发者往往会忽略。所以我们应该尽量使用ThreadPoolExecutor类来显示的创建线程池。

    下面我们先来看下ThreadPoolExecutor类的继承结构

    image.png
    • Executor接口只含有一个execute(Runnable command)方法,加入一个新的线程
    • ExecutorService接口相比Executor多个几个方法
    public interface ExecutorService extends Executor {
       void shutdown();
       boolean isShutdown();
       boolean isTerminated();
       boolean awaitTermination(long timeout, TimeUnit unit)
           throws InterruptedException;
       <T> Future<T> submit(Callable<T> task);
       <T> Future<T> submit(Runnable task, T result);
       Future<?> submit(Runnable task);
       <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
           throws InterruptedException;
       <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                     long timeout, TimeUnit unit)
           throws InterruptedException;
    
       <T> T invokeAny(Collection<? extends Callable<T>> tasks)
           throws InterruptedException, ExecutionException;
       <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                       long timeout, TimeUnit unit)
           throws InterruptedException, ExecutionException, TimeoutException;
    }
    

    我们直接看ThreadPoolExecutor所提供的构造方法

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
    

    现在我们来解释一下各个参数的含义

    • corePoolSize 核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系
    • maximumPoolSize 线程池最大线程数
    • keepAliveTime 空闲线程的存活时间,默认只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用
    • unit 参数keepAliveTime的时间单位
    • workQueue 一个阻塞队列,用来存储等待执行的任务
    • threadFactory 线程工厂,主要用来创建线程
    • handler 表示当拒绝处理任务时的策略,有以下四种取值
    ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 
    ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 
    ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
    ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
    

    下面再简单再介绍一下ThreadPoolExecutor中几个重要的方法

    • execute 这个方法其实就是对Executor接口的实现,也是最核心的方法,用于向线程池中提交一个任务执行
    • submit 该方法也是用于向线程池里提交任务,区别在于submit提交可以有返回值,会返回一个Futrue类型来异步获取执行结果
    • shutdown 用于关闭线程池
    • shutdownNow 也是关闭线程池,与shutdown区别在于shutdownNow会尝试关闭正在执行的线程任务

    通过源码分析 ThreadPoolExecutor

    上文我们简单的分析了ThreadPoolExecutor,下面我们将根据源码来进一步分析ThreadPoolExecutor核心功能的实现

    先从其核心方法execute开始解读

    public void execute(Runnable command) {
          // 如果传入空对象 则抛出空指针异常
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        
        /**
         * 如果当前正在执行任务的线程数小于 corePoolSize
         */
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        /**
         * 如果当前正在执行任务的线程数大于corePoolSize,且workQueue未满
         */
        if (isRunning(c) && workQueue.offer(command)) {
                // 重复检查一次,防止正好此时线程池被shutdown
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        /**
         * 如果当前正在执行的任务线程数大于corePoolSize,且workQueue已满,则使用        * maximumPoolSize参数创建线程,如果已经到了maximumPoolSize最大值,则启用拒绝      * 策略
         */
        else if (!addWorker(command, false))
            reject(command);
    }
    

    上面的代码简单总结一下就是

    1. 当前正在执行任务的线程数小于corePoolSize时,正常加入执行
    2. 当前正在执行任务的线程数大于corePoolSize,且workQueue未满时,则将任务加入等待队列workQueue
    3. 如果当前正在执行的任务线程数大于corePoolSize,且workQueue已满,则会临时扩充线程数,根据maximumPoolSize最大线程数值
    4. 如果当前正在执行的任务线程数大于maximumPoolSize 这时已经超出最大可承受的线程数值了,会启用拒绝策略,也就是上文所配置的四种策略之一,默认是抛出异常,加入任务失败

    workQueue 任务队列

    上文有提到任务队列,也就是在等待执行的任务队列。workQueue的本质是一种阻塞队列BlockingQueue<Runnable>,列举三种常用类型

    1. 基于数组的先进先出队列,此队列创建时必须指定大小
    2. 基于链表的先进先出队列
    3. 同步队列 该队列不存储元素,每个插入操作必须等待另一个线程调用移除操作,否则插入操作会一直阻塞

    核心功能分析

    上文中我们通过查看源码知道了新任务加入线程池的策略,下面我们继续往下看,再分析之前,我们可以先思考几个问题

    1. 任务等待队列是在何时被执行?
    2. 线程是如何实现重复利用的?,毕竟这是线程池最重要的功能了
    3. 空闲线程根据keepAliveTime参数是在哪里被回收的?

    接下来我们继续通过源码来解读这三个问题
    根据上文execute方法源码,可以看到调用了addWorker方法

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
    
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
    
            for (;;) {
                int wc = workerCountOf(c);
                // 这里判断当前线程数是否大于最大值,大于了则返回false
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
    
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
                // 新建了一个 Worker
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                // 加锁
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
    
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // 把当前这个工作线程加入到 workers 集合中
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                      // 开始执行此worker
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
    
    • 根据参数core 判断当前线程数是否已经大于corePoolSizemaximumPoolSize,大于则直接返回false
    • 初始化了一个Worker对象,并且加入到了workers集合中,workers是一个Set类型的集合
    • 启动当前这个Worker线程任务

    经过上面的分析,可能大概会有疑惑,Worker对象是做啥的?下面我们就来看下Worker类的实现

    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
    
        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;
    
        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
    
        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }
        
        ...
    }
    
    • Worker实现了Runnable接口
    • 构造方法中通过ThreadFactory初始化了一个新的线程对象
    • run()方法调用了ThreadPoolExecutor的runWorker()

    重点在runWorker方法中

    线程池重复利用机制

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        // 获取第一个任务
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
                // 从任务队列中循环获取任务执行 getTask方法有可能会阻塞
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                      // 空实现,可以通过继承`ThreadPoolExecutor`重写此方法
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                            // 执行
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
    

    代码解析到这里,基本可以回答上文所提出的三个问题了

    • 加入到任务队列的任务会在这里被执行,因为Worker对象执行完一个任务后,并不会立刻结束,而是会通过循环调用getTask()方法从任务队列中获取最新任务来执行,这样子就实现了线程的重复利用,而不必每次都重新创建一个Worker对象
    • getTask()方法无法获取到最新的任务后,该Worker线程才会被回收。所以第三个问题keepAliveTime的机制必然是在getTask()中实现的

    getTask 超时策略

    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
    
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
    
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
    
            int wc = workerCountOf(c);
    
            // workers大于corePoolSize,或则允许corePoolSize设置空闲超时时间
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
                
                // 当前线程数已经大于maximumPoolSize或则已经超时过一次,则直接返回null
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
    
            try {
                     // 获取任务,如果timed为true,则等待一定时间(keepAliveTime)未返回的话,会返回null,如果timed未设置为true,则会一直阻塞,直到有数据
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
    
    • 先判断当前线程池状态,如果runState大于SHUTDOWN(即为STOP或者TERMINATED),则直接返回null
    • 判断当前线程池的线程数是否已经大于核心池大小corePoolSize或者允许为核心池中的线程设置空闲存活时间,则调用workQueue.poll(time,timeUnit)来取任务,这个方法会阻塞等待一定的时间,如果取不到任务就返回null,否则则会调用workQueue.take()来取任务,这个方法会wait释放CPU一直阻塞。

    总结一下,只有当我们设置allowCoreThreadTimeOut(核心池线程空闲超时时间)或则当前线程数大于corePoolSize时,keepAliveTime机制才会生效

    整个流程到这里大概就分析完了,下图基本绘制了线程池提交任务,执行任务的整个流程

    image.png

    Executors 中几个常用的线程池

    虽然阿里Java开发规约中不建议我们使用Executors类直接创建线程池,但还是有必要简单介绍几个其中常用的方法

    • Executors.newCachedThreadPool() 创建一个缓冲池,缓冲池容量大小为Integer.MAX_VALUE. 将corePoolSize设置为0,将maximumPoolSize设置为Integer.MAX_VALUEworkQueue使用的SynchronousQueue,也就是说来了任务就创建线程运行,当线程空闲超过60秒,就销毁线程
    • Executors.newSingleThreadExecutor() 创建容量为1的缓冲池,将corePoolSizemaximumPoolSize都设置为1,workQueue使用的LinkedBlockingQueue
    • Executors.newFixedThreadPool(int) 创建固定容量大小的缓冲池,空闲线程不会销毁。corePoolSizemaximumPoolSize值是相等的,workQueue使用的是LinkedBlockingQueue

    总结

    本篇文章主要从对线程池的配置使用,以及源码实现做了分析,总体上比较全面的分析了Java线程池的实现。

    博客原文地址戳这里

    相关文章

      网友评论

        本文标题:Java线程池分析

        本文链接:https://www.haomeiwen.com/subject/abqieqtx.html