美文网首页
Java 线程池

Java 线程池

作者: 都是浮云啊 | 来源:发表于2019-01-22 19:14 被阅读0次

前言

线程池就是在初始化程序的时候创建的一个线程的集合,然后当程序在需要执行新任务时能够直接并复用这个集合里的线程。使用线程池的目的是避免频繁创建和销毁线程,达到线程对象的复用。此外,使用线程池还可以灵活控制并发的数目。服务器执行一项任务,创建一个线程时间为T1,执行任务的时间T2,销毁线程的时间为T3。大多数情况下 T1+T2 >> T3,这种情况下使用线程池能够显著提高服务器性能.同时如果不是太想了解细节,只是想应付下面试直接看8的总结就行了。这篇文章整理了好几天,参照了网上很多的博客也看了很多JDK的源码,再加上个人的一些理解,内容还是不少的。此外,篇幅实在太大了,所以JUC中的阻塞队列会单独拎出来。

[TOC]

1. 线程池

1.1 线程池的组成

一个线程池由4个部分组成的

  • 线程池管理器 (ThreadPool):用来创建并管理线程池
  • 工作线程 (Worker): 线程池中的线程,会根据线程的特性灵活调整数量
  • 任务接口 Task: 要提交给线程池执行的任务
  • 等待队列 (Queue): 存放还没来得及执行的任务,缓冲一下
1.2 线程池体系结构

自上而下简单介绍下这个继承树, Executor接口位于继承树的最上层,方法只有一个 execute(Runnable runnable) 这个方法的作用是提交任务到线程池中。往下就是 ExecutorService 可以看到它继承了Executor接口并且也提供了一些关于线程池的基本操作的方法比如暂停、不同提交、执行任务等,所以我们一般实现的时候就实现这个接口。按照 JDK 的惯例,一般都会有一个抽象类去实现接口并且提供部分通用功能的实现,此处就是 AbstractExecutorService 了。而ThreadPoolExecutor里面的方法非常的丰富,图片幅度的原因没截出来,这个类是一个普通类,没有抽象方法,通过上面继承来的抽象方法都是它来实现的(简直是能力担当有木有)。

线程池的体系结构.png

关于线程池中执行任务相关的一些实体:

  • Executors 这个名字就告诉我们了它是一个工具类,里面含有许多静态方法,用来对线程池执行一些操作,比如创建不同种类的线程池,线程池的种类下面有介绍.
  • 当任务提交给线程池之后,线程池是可以获取线程的执行结果的,所以引入了 Future 接口的,我们使用它定义的 FutureTask ,程序不断地向线程池提交任务(以线程的形式,而线程都是实现 Runnable 接口的),这个过程就是将实现了 Runnable 的任务包装成 FutureTask ,然后通过 FutureTask 提交到线程池中去执行并可以获取到结果。
  • BlockingQueue :是一些堵塞队列,我们往线程池中提交的任务太多可能会出现线程池处理不过来的情况,所以就需要一些堵塞队列,同时根据不同的队列的实现,线程池的功能也会有所不同。
    相关实体
1.3 线程池的种类

常见的线程池的种类有4种,详情在第5部分有具体介绍

  • 单个线程的线程池: newSingleThreadExecutor,单个线程的线程池,即线程池中每次只有一个线程工作,单线程串行执行任务
  • 固定数量的线程池: newFixedThreadExecutor,固定数量的线程池,没提交一个任务就是一个线程,直到达到线程池的最大数量,然后后面进入等待队列直到前面的任务完成才继续执行
  • 缓冲线程池(常用): newCacheThreadExecutor,当任务个数超过了核心线程数就把任务丢到队列中,当队列也满了就再创建一些线程直到最大线程数,当线程空闲唱过60s,那么就会回收部分空闲的线程
  • 大小无限制的线程池: newScheduleThreadExecutor,支持定时和周期性的执行

2. Executor 根接口

Executor 这个接口仅仅定义了一个方法 execute ,用来将任务提交到线程池里。

public interface Executor {
    /**
     * 提交Runnable任务到线程池中
     */
    void execute(Runnable command);
}

3. FutureTask

一个线程要提交任务到线程池中时,FutureTask 就是对将要提交到线程池的任务进行的一次封装。 之所以封装是因为我们知道 Runnable.run() 这个方法是没有返回值的,而使用了线程池之后,我们可能需要知道某个任务提交到线程池之后它的返回值信息, 所以会在 <T> Future<T> summit(Runnable task,T result) 这个方法中的第二个参数作为返回值。这2个参数最终都会被包装成 Callable ,它和 Runnable 的区别在于 run() 方法是否有返回值和是否抛出异常,如下源码所示。
FutureRunnable 的关系:
Future -> RunnableFuture -> FutureTask
Runnabel -> RunnableFuture

public interface RunnableFuture<V> extends Runnable, Future<V> {
 
    void run();
}
@FunctionalInterface
public interface Callable<V> {
    /**
     * 有返回值,会抛出异常
     */
    V call() throws Exception;
}

@FunctionalInterface
public interface Runnable {
    /**
     * 没有返回值,不抛异常
     */
    public abstract void run();
}

4. ExecutorService

上面介绍 Executor 接口的的时候说了它只有一个提交任务到线程池的方法。但是,实际开发的时候,我们除了 Executor 的提交任务功能还需要知道线程执行任务后的返回结果,以及线程池的一些基本属性比如空闲线程数这种的,也就是线程池的一些状态,或者我们优雅的关闭线程池而不是直接暴力关机。这种情况下就需要 ExecutorService 了。它的源码如下:

    /**
     * 继承自Executor接口
     */
public interface ExecutorService extends Executor {

    /**
     * 关闭线程池(只是不接受后来的提交任务请求了,不影响已经提交的任务)
     */
    void shutdown();

    /**
     * 区别于前面,只是它会尝试去停止正在执行的任务,依然不接受新的任务请求
     */
    List<Runnable> shutdownNow();

    /**
     * 返回线程池是否已经关闭
     */
    boolean isShutdown();

    /**
     * 关闭之后,所有任务都执行完毕返回true
     * 仅在调用shutdown()/shutdownNow()之后使用
     */
    boolean isTerminated();

    /**
     * 等待所有任务完成并且设置一个超时时间
     */
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

    /**
     * 提交一个Callable任务,都知道它和Runnable很大区别是有返回值
     */
    <T> Future<T> submit(Callable<T> task);

    /**
     * 提交一个Runnable任务,第二个参数将会放在Future<T>中返回
     * 因为Runnable本身不返回东西,此处相当于上面的submit方法
     */
    <T> Future<T> submit(Runnable task, T result);

    /**
     * 提交一个Runnable任务
     */
    Future<?> submit(Runnable task);

    /**
     * 执行所有任务
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;

    /**
     * 执行所有任务并且设置超时时间
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

    /**
     * 有一个任务结束返回了就把它返回
     */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

    /**
     * 同上,只是多了一个设置的超时时间
     */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

5. AbstractExecutorService

从最开始的线程池家族树就可以看到, AbstractExecutorService 类实现了 ExecutorService 接口,并且实现了接口中的 invokeAny/invokeAll 等方法,此外还有2个 newTaskFor 方法用来把任务包装成 FutureTask。 这样的实现了的在子类中就不需要再次实现了直接用就行。主要源码如下:

//实现了ExecutorService接口
public abstract class AbstractExecutorService implements ExecutorService {

    /**
     * RunnableFuture 是用来获得返回结果的,我们一般用它的子类 `FutureTask`
     * newTaskFor 方法用来把我们提交的任务包装成 `FutureTask` ,这样就可以直接提交给线程池了
     */
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

    /**
     * 提交任务到线程池
     */
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        // 包装任务为FutureTask
        RunnableFuture<void> ftask = newTaskFor(task, null);
        //执行任务,FutureTask 间接实现了 Runnable 接口
        execute(ftask);
        //返回结果
        return ftask;
    }

    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

    /**
     * 同上
     */
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

    /**
     * 将 tasks 集合中的任务提交到线程池执行,任意一个线程执行完后就可以结束了
     * 第二个参数 timed 代表是否设置超时机制,超时时间为第三个参数
     */
    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                              boolean timed, long nanos)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (tasks == null)
            throw new NullPointerException();
         // 提交的任务个数
        int ntasks = tasks.size();
        if (ntasks == 0)
            throw new IllegalArgumentException();
        // 初始化一个用于保存任务结果的list
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
        // 参数 this 是真正的执行器,它的作用是包装执行器,把每个结束的任务保存到内部的一个队列中
        ExecutorCompletionService<T> ecs =
            new ExecutorCompletionService<T>(this);
        try {
  
            ExecutionException ee = null;
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            // 待执行任务的迭代器
            Iterator<? extends Callable<T>> it = tasks.iterator();

            // 先提交一个任务,其余的通过循环取出
            futures.add(ecs.submit(it.next()));
            // 任务数-1
            --ntasks;
            // 当前执行任务数1个
            int active = 1;
            for (;;) {
                // poll检查队列中是否有待执行任务,有的话取出
                Future<T> f = ecs.poll();
                // 为null表等待队列中无任务等待,刚刚提交的已经去运行去了
                if (f == null) {
                    //如果总任务数 > 0,表示还能继续提交
                    if (ntasks > 0) {
                        --ntasks;
                        //提交下一个任务
                        futures.add(ecs.submit(it.next()));
                        //正在执行数+1
                        ++active;
                    }
                    //方法下方介绍
                    else if (active == 0)
                        break;
                    else if (timed) {
                    //超时监测
                        f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                        if (f == null)
                            throw new TimeoutException();
                        nanos = deadline - System.nanoTime();
                    }
                    else
                    //没有任务需要提交,池中有任务执行,take()堵塞直到任务完成返回给f
                        f = ecs.take();
                }
                
                //有任务结束了
                if (f != null) {
                    //正在执行数-1
                    --active;
                    try {
                       //返回结果
                        return f.get();
                    } catch (ExecutionException eex) {
                        ee = eex;
                    } catch (RuntimeException rex) {
                        ee = new ExecutionException(rex);
                    }
                }
            }
             /**
              * 关于这里的退出,要结合整个方法来看的,--active; 如果所有任务返回
              * 但是f.get出现异常,就表示任务执行失败了,循环从这里退出
              */

            //for截止
            if (ee == null)
                ee = new ExecutionException();
            throw ee;

        } finally {
        //方法退出前取消其它任务有异常的情况/正常退出
            for (int i = 0, size = futures.size(); i < size; i++)
                futures.get(i).cancel(true);
        }
    }
    
    /**
     * 调用doInvokeAny
     */
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException {
        try {
            return doInvokeAny(tasks, false, 0);
        } catch (TimeoutException cannotHappen) {
            assert false;
            return null;
        }
    }
    
    /**
     * 调用doInvokeAny
     */
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                           long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        return doInvokeAny(tasks, true, unit.toNanos(timeout));
    }
    /**
     * 执行所有任务返回结果
     */
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {
        if (tasks == null)
            throw new NullPointerException();
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false;
        try {
            for (Callable<T> t : tasks) {
                RunnableFuture<T> f = newTaskFor(t);
                futures.add(f);
                execute(f);
            }
            for (int i = 0, size = futures.size(); i < size; i++) {
                Future<T> f = futures.get(i);
                if (!f.isDone()) {
                    try {
                     // 这是一个阻塞方法,直到获取到值,或抛出了异常
                        f.get();
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    }
                }
            }
            done = true;
            return futures;
        } finally {
            if (!done)
                for (int i = 0, size = futures.size(); i < size; i++)
                    futures.get(i).cancel(true);
        }
    }

    /**
     * 带超时的执行所有任务
     */
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                         long timeout, TimeUnit unit)
        throws InterruptedException {
        if (tasks == null)
            throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false;
        try {
            for (Callable<T> t : tasks)
                futures.add(newTaskFor(t));

            final long deadline = System.nanoTime() + nanos;
            final int size = futures.size();
            for (int i = 0; i < size; i++) {
                execute((Runnable)futures.get(i));
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L)
                    return futures;
            }

            for (int i = 0; i < size; i++) {
                Future<T> f = futures.get(i);
                if (!f.isDone()) {
                    if (nanos <= 0L)
                        return futures;
                    try {
                        f.get(nanos, TimeUnit.NANOSECONDS);
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    } catch (TimeoutException toe) {
                        return futures;
                    }
                    nanos = deadline - System.nanoTime();
                }
            }
            done = true;
            return futures;
        } finally {
            if (!done)
                for (int i = 0, size = futures.size(); i < size; i++)
                    futures.get(i).cancel(true);
        }
    }

}

AbstractExecutorService 中可以看出,它实现了一些基本的操作线程池的方法,然后在方法内部调用 execute(Runnable r) 方法去提交任务给线程池执行。然而我们在 AbstractExecutorService 中并没有看到它实现这个最重要的提交任务给线程池的方法。通过了解,这个方法就在大名鼎鼎的ThreadPoolExecutor中实现了。

6. ThreadPoolExecutor

ThreadPoolExecutor 是JDK底层对于线程池的实现,通过继承树和上面介绍可知,它是线程池的 能力担当,实现了所有上层的抽象类和接口定义的方法,包括任务提交、线程数量的管理等方法。这部分先看它的构造方法,构造方法用来创建一个具备一定特性的线程池。然后了解下线程池的一些状态,最后就是添加一个任务到线程池发生了什么了解它的工作原理。

6.1 ThreadPoolExecutor 构造方法 -- 创建线程池

从JDK底层提供的创建线程池工具类 Executors 最终使用的就是 ThreadPoolExecutor 的构造器来构建一个线程池的,可以看出来这个玩意是非常重要的了吧,所以这里看一下它提供的几种构造方法。构造器里主要对一些核心数值进行了校验,然后对核心数值进行初始化,根据这些值的不同,我们可以定义不同功能的线程池。

    /**
     * 构造方法
     * @param corePoolSize 核心线程数
     * @param maximumPoolSize 最大线程数
     * @param keepAliveTime 空闲生存时间
     * @param unit 时间单位
     * @param workQueue 阻塞队列
     * @param threadFactory 使用工厂创建一个线程,当需要的时候
     * @param handler 拒绝策略,当线程池和队列满了执行的阻止程序的策略
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
6.2 线程池的一些状态
  • RUNNING: 最正常的状态,接受新任务,处理队列中的任务
  • SHUTDOWN: 不再接受新任务,但是会处理队列中的任务
  • STOP:不接受新任务,不处理队列中的任务,中断正常执行的任务
  • TIDYING: 所有任务都销毁了,正在工作的线程也会回收,此时会执行钩子方法terminated()
  • TERMINATEDterminated() 方法执行完之后线程池的状态
    状态转换
6.3 拒绝策略

RejectedExecutionHandler 线程池的拒绝策略,当把任务添加到线程池的时候,而此时线程池和阻塞队列都满了的情况下,线程池会选择一种拒绝策略来处理该任务。在JDK中,已经提供了4种拒绝策略

  1. AbortPolicy :直接抛出异常,默认策略
  2. CallerRunsPolicy:用调用者所在的线程来执行任务
  3. DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
  4. DiscardPolicy:直接丢弃任务;

除此之外,我们也可以实现自己的拒绝策略,同时也方便记录下日志,实现 RejectedExecutionHandler 接口即可。

final void reject(Runnable command) {
    // 这里的handler在构造线程池的时候传入的参数,它是`RejectedExecutionHandler`的实例
    handler.rejectedExecution(command, this);
}
//RejectedExecutionHandler在ThreadPoolExecutor中定义了4种可以满足我们需求的实现类供使用如下
// 只要线程池没有被关闭,那么由提交任务的线程自己来执行这个任务。
public static class CallerRunsPolicy implements RejectedExecutionHandler {
    public CallerRunsPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}

// 不管怎样,直接抛出 RejectedExecutionException 异常
// 这个是默认的策略,如果我们构造线程池的时候不传相应的 handler 的话,那就会指定使用这个
public static class AbortPolicy implements RejectedExecutionHandler {
    public AbortPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
    }
}

// 不做任何处理,直接忽略掉这个任务
public static class DiscardPolicy implements RejectedExecutionHandler {
    public DiscardPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
}

// 这个相对霸道一点,如果线程池没有被关闭的话,
// 把队列队头的任务(也就是等待了最长时间的)直接扔掉,然后提交这个任务到等待队列中
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    public DiscardOldestPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            e.getQueue().poll();
            e.execute(r);
        }
    }
}

6.4 提交一个任务给线程池,发生了什么?
6.4.1 线程池里执行任务的角色

WorkerThreadPoolExecutor 的一个非常重要的内部类,它继承了 JUC中的 AbstractQueuedSynchronizer 也就是常说的AQS接口,实现了 Runnable 接口代表它的实例是一个线程对象。合起来就是具备同步功能的线程对象。所以可以认为Worker既是一个可以执行的任务,也可以达到获取锁释放锁的效果。这里继承AQS主要是为了方便线程的中断处理。这里注意两个地方:构造函数、run()。构造函数主要是做三件事:1.设置同步状态state为-1,同步状态大于0表示就已经获取了锁,2.设置将当前任务task设置为firstTask,3.利用Worker本身对象this和ThreadFactory创建线程对象。所以到这里,我们知道任务是 Runnable内部叫 task 或 command),任务提交到线程池中,而具体执行任务的线程是 Worker,任务给干活的人做,就是线程池的工作模式

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /** 真正要执行任务的线程 */
        final Thread thread;
        /** 创建完一个线程的时候我们就会同时指定第一个要执行的任务。允许为null,这个时候线程起来了之后自己去堵塞队列中getTask取任务 */
        Runnable firstTask;
        /** 此线程完成的任务数,valatile声明的,内存可见 */
        volatile long completedTasks;

        /**
         * 构造方法,创建线程的时候给一个任务,也可以给null
         */
        Worker(Runnable firstTask) {
            setState(-1); // 在runWorker执行之前不响应中断标志
            this.firstTask = firstTask;
            //创建一个新的线程赋值给真正要去执行任务的线程
            this.thread = getThreadFactory().newThread(this);
        }

        /** 调用外部的runWorker方法  */
        public void run() {
            runWorker(this);
        }
         /** 值为1表示被锁状态,0是没有锁状态  */
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }
         /** CAS获取锁  */
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
         /** 释放锁  */
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }
         /** 线程中断  */
        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }
6.4.2 从队列中获取任务的方法

有一个非常重要的取出任务的方法一直没说,这里一起说一下 getTask() 这个方法是获取任务的方法。这个方法有三种可能:

  1. 阻塞直到获取到任务返回。默认 corePoolSize 之内的线程是不会回收的,它会一直等待任务。
  2. 超时退出。keepAliveTime 起作用的时候,也就是如果这么多时间内都没有任务,那么应该执行关闭
  3. 发生了以下几种情况,此方法直接返回null
    • 线程池中有大于最多线程数(maximumPoolSize)的worker存在
    • 线程池处于SHUTDOWN,而且workerQueue空的,此种情况不接收新任务
    • 线程池处于STOP状态,不仅不接受新的线程,连workQueue中的也不再执行了
private Runnable getTask() {
        boolean timedOut = false; 

        for (;;) {
            // c 中的高三位表示线程池状态,低29位表示线程个数
            int c = ctl.get();
            int rs = runStateOf(c);
            // 
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                //CAS减少工作线程数
                decrementWorkerCount();
                return null;
            }
            int wc = workerCountOf(c);
            // 允许核心线程数内的线程回收,或当前线程数超过了核心线程数
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
            try {
            //到WorkeQueue中获取任务
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
6.4.3 提交任务到线程池中

上面有了要执行的任务还有了执行任务的线程,就可以支持提交任务给线程池执行了也就是 execute() 方法。比如工厂生产,有了订单、有了工人就可以把这些订单提交给工人让工人去完成了。看下这个方法

 public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
         //如果当前线程数少于核心线程数,会直接添加一个新的worker来执行
        if (workerCountOf(c) < corePoolSize) {
            //创建worker并把command作为该线程的firstTask  之前有说过
            if (addWorker(command, true))//这里成功就直接返回了
                return;
            c = ctl.get();
        }
        //到这里有2种情况,addWorker不成功/当前线程数大于等于核心线程数
        //线程池运行中,把任务添加到workQueue工作队列中
        if (isRunning(c) && workQueue.offer(command)) { 
            //任务的确是进来了,但是[0, corePoolSize)这个范围是要开启新的线程的,这里要做的就是这些处理      
            int recheck = ctl.get();
            //双重检查,如果线程池不是在running,就移除这个任务并执行拒绝策略
            if (! isRunning(recheck) && remove(command))
                reject(command);
            //如果线程还是running的,并且线程数为0就开启新的线程。防止把任务提交过去了但是线程关闭了这种情况
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //到这里说明workQueue队列满了,以最大线程数为界创建新的Worker,还失败的话说明超出最大线程数了,直接拒执行策略。这一步其实就是再抢救一下看看还有没有得救了
        else if (!addWorker(command, false))
            reject(command);
    }

从这里可以看出,并不是线程数少于corePoolSize就立刻创建线程。也不是线程数在[corePoolSize,maximumPoolSize]之间就只复用现有的线程。还是会尝试着再去创建一些线程放到队列里的。实在不行了才会执行拒绝策略。上面多次提到了addWorker这方法,可以看出它是很重要的,它的源码如下:

//参数1是提交给这个线程执行的任务,可以为Null。参数2是表示是否使用核心线程数corePoolSize作为边界
//false表示以最大线程数maximumPoolSize作为边界
private boolean addWorker(Runnable firstTask, boolean core) {
        //代表某一层for循环的标志,不要忘了基础哈
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 如果线程池关闭了,并且满足下列条件就不会创建新的worker
            //1. 线程池状态大于SHUTDOWN也就是STOP、TIDYING、TERMINATED
            //2. firstTask 不是null也就是创建的线程是有任务要执行的
            //3. workQueue是空的
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                  //如果成功就准备创建线程执行任务了,失败的话说明有其它线程尝试往线程池中创建线程
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // 重新读取ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // 如果CAS失败的话进到下一层循环,如果因为其它线程的操作,导致线程池的状态发生了变更,结束外层循环
            }
        }
        //校验完毕,可以开始创建线程执行任务了
        //worker是否已经启动
        boolean workerStarted = false;
        //是否添加到workers这个HashSet中
        boolean workerAdded = false;
        Worker w = null;
        try {
            //创建一个新的Worker并且把firstTask作为第一任务
            w = new Worker(firstTask);
            //取出Worker中线程对象
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                //获取一个全局的锁,保证在持有锁期间线程池不会被关闭
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get());
                    //小于 SHUTTDOWN 那就是 RUNNING,这个自不必说,是最正常的情况
                    //等于SHUTDOWN,不接受新的任务,但是会继续执行等待队列中的任务
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        // worker里的thread如果是已经启动就是线程状态不对异常,它应该是没有被启动的线程
                        if (t.isAlive()) 
                            throw new IllegalThreadStateException();
                         //添加到HashSet中
                        workers.add(w);
                        int s = workers.size();
                        //largestPoolSize 用于记录 workers 中的个数的最大值
                        //workers是不断变化的,通过这个值可以知道线程池达到的最大值
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                //保证锁被释放
                    mainLock.unlock();
                }
                //添加成功的话把线程转换成启动状态
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
        //如果线程没有启动,清理之前的操作,比如运行线程数+1的操作,如下代码
            if (! workerStarted)
                addWorkerFailed(w);
        }
        //返回线程是否启动成功
        return workerStarted;
    }
private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        //获取锁
        mainLock.lock();
        try {
            //从workers中移除
            if (w != null)
                workers.remove(w);
             //原子操作将wokerCount-1
            decrementWorkerCount();
            tryTerminate();
        } finally {
            //保证锁被释放
            mainLock.unlock();
        }
    }
//Worker中的run方法
public void run() {
    runWorker(this);
}
//Worker中的runWorker方法
final void runWorker(Worker w) {
        //获取当前线程
        Thread wt = Thread.currentThread();
        //worker在构造初始化的时候可以指定firstTask。
        Runnable task = w.firstTask;
        //help gc
        w.firstTask = null;
        //
        w.unlock();
        boolean completedAbruptly = true;
        try {
            //循环调用getTask()取要执行的任务
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // 如果线程池是STOP状态,线程需要中断,如果线程池状态大于STOP也要中断
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                //这是一个钩子方法,留给需要的子类实现
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        //最终执行任务的地方
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                    //// 这里不允许抛出 Throwable,所以转换为 Error
                        thrown = x; throw new Error(x);
                    } finally {
                    //钩子方法,将task和异常作为参数留给需要的子类实现
                        afterExecute(task, thrown);
                    }
                } finally {
                //置空task,准备下次循环getTask获取下一个任务
                    task = null;
                    //累加完成任务个数
                    w.completedTasks++;
                    //释放独占锁
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
         //到这里需要线程关闭操作,Worker完成所有的操作或者出现了异常
            processWorkerExit(w, completedAbruptly);
        }
    }

到此,ThreadPoolExecutor的源码分析已经结束了,单纯从源码的难易程度来说,ThreadPoolExecutor的源码还算比较容易让人懂的。前提是对JUC中的部分功能有一些理解,比如各种用来作为堵塞队列的实现。

7. Executors工具类

它仅仅是一个JDK提供的生成线程池的工具类。工具类有个特点是所有方法都是 static 的。它提供了常用的线程池的一些创建实现,但是阿里巴巴的规范中不建议使用工具类来创建线程池,我们一般可以自己实现一个,具体在我的 github 上面有相关线程池的demo。简单介绍下它提供的常用的线程池的创建:

7.1 固定大小的线程池

最大线程数等于核心线程数,keepAliveTime = 0 , 因为它不需要回收 corePoolSize 的内容,任务队列采用LinkedBlockingQueue是一个无界队列。刚开始每个提交的任务都会创建一个 worker ,当worker 数量达到参数nThreads 的时候,不会再创建新的线程,而是把任务放到堵塞队列中,等待线程池去取。

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

7.2一个线程的线程池

过程和上面定长的类似,因为1就是定长为1

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
7.3 长度可伸缩的线程池

最初的时候核心线程数为0,最大可以伸长到 Integer.MAX_VALUE 。空闲线程的存活时间是 60s 任务队列采用 SynchronousQueue ,它是一种特殊的堵塞队列,本身不存储任何元素,存在一个虚拟的队列,不管是读操作还是写操作,如果当前队列中存储的是与当前操作相同模式的线程,那么当前操作也会进入队列中等待,如果是相反模式,则从当前队列中取出头节点。这种线程池一般就是用来处理那些完成任务很快的线程,可以完成很多很多的任务,当不用的时候那些超过 60s 没动静的线程就会被回收。

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

以缓冲线程池为例,分析下:把 ThreadPoolExecutor 中的execute拿来看看。因为coreSize为0,那么提交任务的时候直接将任务提交到队列中。采用了SynchronousQueue队列,所以第一个任务提交过来的时候,offer返回false ,此时没有Worker对这个任务进行接收,进入到创建worker环节。之后再次提交任务的时候,先看是否有空闲的 worker 如果没有重复上述动作,如果有直接复用。

int c = ctl.get();
// corePoolSize 为 0,所以不会进到这个 if 分支
if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command, true))
        return;
    c = ctl.get();
}
// offer 如果有空闲线程刚好可以接收此任务,那么返回 true,否则返回 false
if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))
        reject(command);
    else if (workerCountOf(recheck) == 0)
        //创建一个首任务null的线程
        addWorker(null, false);
}
else if (!addWorker(command, false))
    //添加成功,任务正式提交给线程池处理执行
    //添加失败,一般是还有其它的线程也在添加了,执行拒绝策略
    reject(command);

案例:有那么一家小工厂 (线程池),它接收来自客户的订单(FutureTask)然后让自己的工人(Worker)完成。它处理能力有限,不能够无限制的接收很多客户的订单,所有来自客户的订单它都是由工厂的工人完成的。于是有的时候老板发现客户端订单量很大,它手底下的人处理不过来,这个时候它就需要去招聘一些临时工来干活。当订单量不大的时候,它发现这些工人们不白养他们,于是定下了一个规矩:如果谁空闲的时间超过了 60s,就要被辞退。通过这种方法来达到最大收益。

8.总结

8.1 Java 线程池脑图
Java 线程池脑图
8.2 一些流程图
execute().png
8.3 一些常见的面试题(这部分都是见到的一些有意思的面试题)
  1. 线程池有哪些关键属性
  • 核心线程数corePoolSize,表示线程池中维护的用来接收任务的线程数。比如一个农场有多少有编制的工人。
  • maximumPoolSize最大线程数,一个农场最多允许的工人个数,包括临时工。
  • workQueue 用来存放任务,添加任务的时候,如果当前线程数超过了corePoolSize那么往该队列中插入任务,线程池中空闲的线程会负责到这个队列中拉取任务。
  • keepAliveTime 用来设置空闲时间,如果线程数超过了corePoolSize并且有些线程的空闲时间超过了这个值就会回收掉那部分线程,解决资源。
  • rejectedExecutionHandler用来处理当线程池不能执行此任务时,拒绝策略,可以抛出异常、忽略任务等四种策略。
  1. 线程池中线程的创建时机?
  • 2.1 如果当前线程数少于corePoolSize,那么提交任务的时候创建一个新线程,并由这个线程执行这个任务。
  • 2.2 如果当前线程数已经达到corePoolSize,那么将提交的任务添加到队列中,等待线程池中的线程去队列中取出。
  • 2.3 如果队列满了,将创建新的线程来执行任务,需要保证池中的线程数不会超过最大线程数,否则拒绝策略返回
    3.常用的线程池有哪几种?
  • 如上,常用的有三种,分别是定长、单个、缓冲。其实它们的主要区别是初始的大小和堵塞队列的时间不同。
  1. 添加一个任务到线程池,发生了什么?
    答:

    1. 根据worker获取要执行的任务task,然后调用unlock()方法释放锁,这里释放锁的主要目的在于中断,因为在new Worker时,设置的state为-1,调用unlock()方法可以将state设置为0,这里主要原因就在于interruptWorkers()方法只有在state >= 0时才会执行;
    2. 通过getTask()获取执行的任务,调用task.run()执行,当然在执行之前会调用worker.lock()上锁,执行之后调用worker.unlock()放锁;
    3. 在任务执行前后,可以根据业务场景自定义beforeExecute() 和 afterExecute()方法,则两个方法在ThreadPoolExecutor中是空实现;
    4. 如果线程执行完成,则会调用getTask()方法从阻塞队列中获取新任务,如果阻塞队列为空,则根据是否超时来判断是否需要阻塞
  2. 为什么要使用线程池?

    1. 通过重用线程降低线程创建和销毁的消耗
    2. 提高响应速度,这样的话任务到达时不需要自己去创建线程了
    3. 提高线程的可管理性,线程在一个地方创建,可以管理线程的数量、分配、调优等
  3. execute() 和 submit() 的区别?
    submit()有返回值 可使用 FutureTask 接收。

  4. 堵塞队列中的是什么?
    阻塞队列的泛型使用的都是 Runnable,代表放进来的都是一个个的需要被执行的任务

相关文章

网友评论

      本文标题:Java 线程池

      本文链接:https://www.haomeiwen.com/subject/mejrjqtx.html