美文网首页
JDK8中自定义线程池的使用

JDK8中自定义线程池的使用

作者: 豪大大大 | 来源:发表于2018-07-30 22:17 被阅读0次

    1、ThreadPoolExecutor类

    java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,因此如果要透彻地了解Java中的线程池,必须先了解这个类。下面我们来看一下ThreadPoolExecutor类的具体实现源码。

    在ThreadPoolExecutor类中提供了四个构造方法:

    public class ThreadPoolExecutor extends AbstractExecutorService {

        .....

        public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,

                BlockingQueue workQueue);

        public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,

                BlockingQueue workQueue,ThreadFactory threadFactory);

        public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,

                BlockingQueue workQueue,RejectedExecutionHandler handler);

        public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,

            BlockingQueue workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);

        ...

    }

    从上面的代码可以得知,ThreadPoolExecutor继承了AbstractExecutorService类,并提供了四个构造器,事实上,通过观察每个构造器的源码具体实现,发现前面三个构造器都是调用的第四个构造器进行的初始化工作。

    下面解释下一下构造器中各个参数的含义:

    (1)corePoolSize:核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,在等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程,并且在创建完线程后都会保持线程数。如果希望线程在一定存活时间后自动销毁,则可调用allowCoreThreadTimeOut()方法。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;

    (2)maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;

    (3)keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize;

    (4)unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:

    TimeUnit.DAYS; //天

    TimeUnit.HOURS; //小时

    TimeUnit.MINUTES; //分钟

    TimeUnit.SECONDS; //秒

    TimeUnit.MILLISECONDS;//毫秒

    TimeUnit.MICROSECONDS;//微妙

    TimeUnit.NANOSECONDS; //纳秒

    (5)workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:

    ArrayBlockingQueue;

    LinkedBlockingQueue;

    SynchronousQueue;

    ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关。

    (6)threadFactory:线程工厂,主要用来创建线程;

    (7)handler:表示当拒绝处理任务时的策略,有以下四种取值:

    ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。

    ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。

    ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)

    ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

    具体参数的配置与线程池的关系将在下一节讲述。

      从上面给出的ThreadPoolExecutor类的代码可以知道,ThreadPoolExecutor继承了AbstractExecutorService,我们来看一下AbstractExecutorService的实现:

    public abstract class AbstractExecutorService implements ExecutorService {

        protected  RunnableFuture newTaskFor(Runnable runnable, T value) { };

        protected  RunnableFuture newTaskFor(Callable callable) { };

        public Future submit(Runnable task) {};

        public  Future submit(Runnable task, T result) { };

        public  Future submit(Callable task) { };

        private  T doInvokeAny(Collection> tasks, boolean timed, long nanos)  throws InterruptedException, ExecutionException,TimeoutException {};

        public  T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException {};

        public  T invokeAny(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { };

        public  List> invokeAll(Collection> tasks) throws InterruptedException { };

        public  List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException {};

    }

     AbstractExecutorService是一个抽象类,它实现了ExecutorService接口。

      我们接着看ExecutorService接口的实现:

    public interface ExecutorService extends Executor {

        void shutdown();

        boolean isShutdown();

        boolean isTerminated();

        boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

         Future submit(Callable task);

         Future submit(Runnable task, T result);

        Future submit(Runnable task);

         List> invokeAll(Collection> tasks) throws InterruptedException;

         List> invokeAll(Collection> tasks, long timeout, TimeUnit unit)  throws InterruptedException;

         T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException;

         T invokeAny(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;

    }

    而ExecutorService又是继承了Executor接口,我们看一下Executor接口的实现:

    public interface Executor {

        void execute(Runnable command);

    }

    到这里,大家应该明白了ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor几个之间的关系了。

    Executor是一个顶层接口,在它里面只声明了一个方法execute(Runnable),返回值为void,参数为Runnable类型,从字面意思可以理解,就是用来执行传进去的任务的;

    然后ExecutorService接口继承了Executor接口,并声明了一些方法:submit、invokeAll、invokeAny以及shutDown等;

    抽象类AbstractExecutorService实现了ExecutorService接口,基本实现了ExecutorService中声明的所有方法;

    然后ThreadPoolExecutor继承了类AbstractExecutorService。

    在ThreadPoolExecutor类中有几个非常重要的方法:

    execute()

    submit()

    shutdown()

    shutdownNow()

    execute()方法实际上是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。

    submit()方法是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果,去看submit()方法的实现,会发现它实际上还是调用的execute()方法,只不过它利用了Future来获取任务执行结果(Future相关内容将在下一篇讲述)。

    shutdown()和shutdownNow()是用来关闭线程池的。

    还有很多其他的方法:

      比如:getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等获取与线程池相关属性的方法,有兴趣的朋友可以自行查阅API。

    2、深入剖析线程池实现原理

    在上一节我们从宏观上介绍了ThreadPoolExecutor,下面我们来深入解析一下线程池的具体实现原理,将从下面几个方面讲解:

           1.线程池状态

      2.任务的执行

      3.线程池中的线程初始化

      4.任务缓存队列及排队策略

      5.任务拒绝策略

      6.线程池的关闭

      7.线程池容量的动态调整

    (1)线程池生命周期

    在ThreadPoolExecutor中将运行状态存储在高阶位上,另外定义了5个static final变量表示线程池的各个生命周期:

    private static final int RUNNING = -1 << COUNT_BITS; //运行状态

    private static final int SHUTDOWN = 0 << COUNT_BITS; //不接受新任务,但在处理排队的任务。

    private static final int STOP = 1 << COUNT_BITS; //不接受新的任务,也不处理队列中的任务,中断正在进行的任务

    private static final int TIDYING = 2 << COUNT_BITS; //所有的任务终止,队列中的任务也为零,线程池为tidying,将调用terminated()

    private static final int TERMINATED = 3 << COUNT_BITS; //调用terminated()方法完成后

    当创建线程池后,初始时,线程池处于RUNNING状态;

    RUNNING -> SHUTDOWN:在调用shutdown()时,可能隐式地在finalize()中

    (RUNNING or SHUTDOWN) -> STOP:调用shutdownNow()

    SHUTDOWN -> TIDYING:当线程池还有队列中的任务都为空时

    STOP -> TIDYING:当线程池为空时

    TIDYING -> TERMINATED:当调用 terminated()方法完成时

    (2)任务的执行

    在了解将任务提交给线程池到任务执行完毕整个过程之前,我们先来看一下ThreadPoolExecutor类中其他的一些比较重要成员变量:

    private final BlockingQueue workQueue; //任务缓存队列,用来存放等待执行的任务

    private final ReentrantLock mainLock = new ReentrantLock(); //线程池的主要状态锁,对线程池状态(比如线程池大小 、runState等)的改变都要使用这个锁

    private final HashSet workers = new HashSet(); //用来存放工作集

    private volatile long keepAliveTime; //线程存货时间   

    private volatile boolean allowCoreThreadTimeOut; //是否允许为核心线程设置存活时间

    private volatile int corePoolSize; //核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列)

    private volatile int maximumPoolSize; //线程池最大能容忍的线程数

    private volatile int poolSize; //线程池中当前的线程数

    private volatile RejectedExecutionHandler handler; //任务拒绝策略

    private volatile ThreadFactory threadFactory; //线程工厂,用来创建线程

    private int largestPoolSize; //用来记录线程池中曾经出现过的最大线程数

    private long completedTaskCount; //用来记录已经执行完毕的任务个数

        每个变量的作用都已经标明出来了,这里要重点解释一下corePoolSize、maximumPoolSize、largestPoolSize三个变量。

      corePoolSize在很多地方被翻译成核心池大小,其实我的理解这个就是线程池的大小。举个简单的例子:

      假如有一个工厂,工厂里面有10个工人,每个工人同时只能做一件任务。

      因此只要当10个工人中有工人是空闲的,来了任务就分配给空闲的工人做;

      当10个工人都有任务在做时,如果还来了任务,就把任务进行排队等待;

      如果说新任务数目增长的速度远远大于工人做任务的速度,那么此时工厂主管可能会想补救措施,比如重新招4个临时工人进来;

      然后就将任务也分配给这4个临时工人做;

      如果说着14个工人做任务的速度还是不够,此时工厂主管可能就要考虑不再接收新的任务或者抛弃前面的一些任务了。

      当这14个工人当中有人空闲时,而新任务增长的速度又比较缓慢,工厂主管可能就考虑辞掉4个临时工了,只保持原来的10个工人,毕竟请额外的工人是要花钱的。

      这个例子中的corePoolSize就是10,而maximumPoolSize就是14(10+4)。

      也就是说corePoolSize就是线程池大小,maximumPoolSize在我看来是线程池的一种补救措施,即任务量突然过大时的一种补救措施。

      不过为了方便理解,在本文后面还是将corePoolSize翻译成核心池大小。

      largestPoolSize只是一个用来起记录作用的变量,用来记录线程池中曾经有过的最大线程数目,跟线程池的容量没有任何关系。

    下面我们进入正题,看一下任务从提交到最终执行完毕经历了哪些过程。

      在ThreadPoolExecutor类中,最核心的任务提交方法是execute()方法,虽然通过submit也可以提交任务,但是实际上submit方法里面最终调用的还是execute()方法,所以我们只需要研究execute()方法的实现原理即可:

    public void execute(Runnable command) {

    if (command ==null)

    throw new NullPointerException();

        int c =ctl.get();

        if (workerCountOf(c)

    if (addWorker(command, true))

        return;

            c =ctl.get();

        }

    if (isRunning(c) &&workQueue.offer(command)) {

    int recheck =ctl.get();

            if (!isRunning(recheck) && remove(command))

    reject(command);

            else if (workerCountOf(recheck) ==0)

    addWorker(null, false);

        }

    else if (!addWorker(command, false))

    reject(command);

    }

    方法注释:

     Proceed in 3 steps:

    1. If fewer than corePoolSize threads are running, try to start a new thread with the given command as its first task.  The call to addWorker atomically checks runState and workerCount, and so prevents false alarms that would add threads when it shouldn't, by returning false.

    当运行线程小于corePoolSize 数量时,开启一个新的线程去运行第一个任务。调用addWorker 方法原子性的检查runState和workerCount

    ,防止错误提醒。

    2. If a task can be successfully queued, then we still need to double-check whether we should have added a thread (because existing ones died since last checking) or that the pool shut down since entry into this method. So we recheck state and if necessary roll back the enqueuing if stopped, or start a new thread if there are none.

    如果任务可以成功排队,那么我们仍然需要仔细检查是否应该添加一个线程(因为自上次检查后现有的线程已经死亡),或者自从进入此方法后池关闭了。 所以我们重新检查状态,如果没有运行则回滚入队并拒绝该任务,或者如果没有则启动新的线程。

     3. If we cannot queue task, then we try to add a new thread.  If it fails, we know we are shut down or saturated and so reject the task.

    如果我们不能排队任务,那么我们尝试添加一个新线程。 如果失败,我们知道我们已关闭或饱和,因此拒绝该任务。

    (3)线程池中的线程初始化

    默认情况下,创建线程池之后,线程池中是没有线程的,需要提交任务之后才会创建线程。

      在实际中如果需要线程池创建之后立即创建线程,可以通过以下两个方法办到:

    prestartCoreThread():初始化一个核心线程;

    prestartAllCoreThreads():初始化所有核心线程

    public boolean prestartCoreThread() {

        return workerCountOf(ctl.get()) < corePoolSize && addWorker(null, true);

    }

    public int prestartAllCoreThreads() {

        int n = 0;

        while (addWorker(null, true))

            ++n;

        return n;

    }

    (4)任务缓存队列及排队策略

    在前面我们多次提到了任务缓存队列,即workQueue,它用来存放等待执行的任务。

      workQueue的类型为BlockingQueue,通常可以取下面三种类型:

      1)ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小;

      2)LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE;

      3)synchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。

    (5)任务拒绝策略

      当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:

    ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。

    ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。

    ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)

    ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

    (6)线程池的关闭

      ThreadPoolExecutor提供了两个方法,用于线程池的关闭,分别是shutdown()和shutdownNow(),其中:

    shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务

    shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务

    (7)线程池容量的动态调整

    ThreadPoolExecutor提供了动态调整线程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),

    setCorePoolSize:设置核心池大小

    setMaximumPoolSize:设置线程池最大能创建的线程数目大小

      当上述参数从小变大时,ThreadPoolExecutor进行线程赋值,还可能立即创建新的线程来执行任务。

    总结:在jdk8中线程池主要使用了CAS无锁方法,重复校验方式,跟方便阅读代码并速度提升

    参考:https://www.cnblogs.com/dolphin0520/p/3932921.html

    相关文章

      网友评论

          本文标题:JDK8中自定义线程池的使用

          本文链接:https://www.haomeiwen.com/subject/bxttvftx.html