美文网首页
线程池第一印象~

线程池第一印象~

作者: SonyaBaby | 来源:发表于2019-05-28 18:24 被阅读0次
    进阶线程池啦~.png ThreadPoolExecutor 继承关系.png ThreadPoolExecutor 方法Structure.png AbstractExecutorService 方法Structure.png ExecutorService 方法Structure.png 最顶层接口 Executor.png

    ThreadPoolExecutor

    public class ThreadPoolExecutor extends AbstractExecutorService {
    ...
        public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler);
        }
    
        public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler);
        }
    
        public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler);
        }
    
        public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
            if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }
    ...
    }
    
    ThreadPoolExecutor 四个构造器.png

    看源码可知前三个构造器最终调用的都是第四个进行初始化工作。

    workQueue 等待队列

    • ArrayBlockingQueue 使用较少。必须指定 capacity,即有界队列
    • PriorityBlockingQueue 使用较少。默认大小 DEFAULT_INITIAL_CAPACITY = 11,最大MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8,即无界有序队列
    • LinkedBlockingQueue 默认大小 Integer.MAX_VALUE,即无界队列
    • SynchronousQueue 内部并没有数据缓存空间,一旦有了插入线程和移除线程,元素很快就从插入线程移交给移除线程(快速传递元素的方式),在多任务队列中是最快处理任务的方式,元素总是以最快的方式从生产者传递给消费者。典型应用是Executors.newCachedThreadPool(),这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。
    • 线程池的排队策略与所选的 BlockingQueue 有关。

    handler 拒绝处理任务时使用的策略

    • ThreadPoolExecutor.AbortPolicy 丢弃任务并抛出RejectedExecutionException异常,会阻止正常工作。
    • ThreadPoolExecutor.DiscardPolicy 丢弃任务,但是不抛出异常,系统正常工作。
    • ThreadPoolExecutor.DiscardOldestPolicy 丢弃队列最前面的任务(即将被执行的),然后重新尝试提交当前任务(重复此过程)
    • ThreadPoolExecutor.CallerRunsPolicy 由调用线程处理该任务

    核心方法
    execute() 是在Executor接口中的声明,通过这个方法可以向线程池提交一个任务,交由线程池去执行
    submit() 是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果,去看submit()方法的实现,会发现它实际上还是调用的execute()方法,只不过它利用了Future来获取任务执行结果
    shutdown() 不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务
    shutdownNow() 立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务

    线程池状态

        // runState is stored in the high-order bits
        private static final int RUNNING    = -1 << COUNT_BITS;
        private static final int SHUTDOWN   =  0 << COUNT_BITS;
        private static final int STOP       =  1 << COUNT_BITS;
        private static final int TIDYING    =  2 << COUNT_BITS;
        private static final int TERMINATED =  3 << COUNT_BITS;
        /**
         *   RUNNING:  Accept new tasks and process queued tasks
         *   SHUTDOWN: Don't accept new tasks, but process queued tasks
         *   STOP:     Don't accept new tasks, don't process queued tasks,
         *             and interrupt in-progress tasks
         *   TIDYING:  All tasks have terminated, workerCount is zero,
         *             the thread transitioning to state TIDYING
         *             will run the terminated() hook method
         *   TERMINATED: terminated() has completed
         *   
         * RUNNING -> SHUTDOWN
         *    On invocation of shutdown(), perhaps implicitly in finalize()
         * (RUNNING or SHUTDOWN) -> STOP
         *    On invocation of shutdownNow()
         * SHUTDOWN -> TIDYING
         *    When both queue and pool are empty
         * STOP -> TIDYING
         *    When pool is empty
         * TIDYING -> TERMINATED
         *    When the terminated() hook method has completed
        **/
    
    • 创建线程池后,初始时,线程池处于RUNNING状态;
    • 调用 shutdown(),则线程池处于SHUTDOWN状态,此时线程池不能够接受新的任务,它会等待所有任务执行完毕;
    • 调用 shutdownNow(),则线程池处于STOP状态,此时线程池不能接受新的任务,并且尝试终止正在执行的任务;
    • 当线程池处于SHUTDOWN或STOP状态,队列和线程池中都为空的情况,即所有任务都已被终止,workerCount 标记为 0,则处于TIDYING状态
    • 当线程池处于SHUTDOWN或STOP状态,并且所有工作线程已经销毁(处于TIDYING状态),任务缓存队列已经清空或执行结束后,线程池被设置为TERMINATED状态。

    任务的执行相关重要参数

     // 任务缓存队列,用来存放等待执行的任务
    private final BlockingQueue<Runnable> workQueue;
    // 线程池的主要状态锁,对线程池状态(比如线程池大小、runState等)的改变都要使用这个锁
    private final ReentrantLock mainLock = new ReentrantLock();
    
    // Accessed only under mainLock.
    // 用来存放工作集   
    private final HashSet<Worker> workers = new HashSet<Worker>();
    // 用来记录线程池中曾经出现过的最大线程数
    private int largestPoolSize;
    // 用来记录已经执行完毕的任务个数
    private long completedTaskCount;
    
    // 线程工厂类,用来创建线程
    private volatile ThreadFactory threadFactory;
    // 任务拒绝策略
    private volatile RejectedExecutionHandler handler;
    // 线程存活时间
    private volatile long keepAliveTime;
    // 是否允许为核心线程设置存活时间
    private volatile boolean allowCoreThreadTimeOut;
    // 核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列)
    private volatile int corePoolSize;
    
    // //线程池最大能容忍的线程数. Note that the actual maximum is internally bounded by CAPACITY.
    private volatile int maximumPoolSize;
    
    // 默认的任务拒绝策略:丢弃任务并抛出RejectedExecutionException异常
    private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
    

    corePoolSize是正常情况下线程池大小,maximumPoolSize是线程池的一种额外Support,即任务量突然过大时的额外可支持的开销
    largestPoolSize只是用来记录线程池中曾经有过的最大线程数目,跟线程池的容量没有任何关系。

    execute()

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
       
        /** 1. If fewer than corePoolSize threads are running, try to start a new thread with the given command as its first  task.  
          * The call to addWorker atomically checks runState and workerCount, and so prevents false alarms that would add  threads when it shouldn't, by returning false.
          */
        // 线程池中正在作业的线程数 < corePoolSize数
        if (workerCountOf(c) < corePoolSize) {
            // 是否可以继续向 corePool 中新增线程
            if (addWorker(command, true))
                return;
            // 再次获取当前线程池状态值
            c = ctl.get();
        }
    
        /** 2. If a task can be successfully queued, then we still need to double-check whether we should have added a thread (because existing ones died since last checking) or that the pool shut down since entry into this method.
          * So we recheck state and if necessary roll back the enqueuing if stopped, or start a new thread if there are none.
          */
        // 线程池为可运行状态,同时Runnable command可以加入等待队列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //  非Running状态,则从队列中去掉command并执行拒绝策略
            if (! isRunning(recheck) && remove(command))
                reject(command);
    
           /** 3. If we cannot queue task, then we try to add a new thread.  
             * If it fails, we know we are shut down or saturated and so reject the task.
             */
             // 新增thread
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 不可以在拓展的线程池中运行该实例,执行拒绝策略
        else if (!addWorker(command, false))
            reject(command);
    }
    

    使用 ThreadPoolExecutor 创建线程池:

    public class HelloThreadPool {
      public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2,
          100, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(2));
        
        IntStream.range(0, 4).mapToObj(PrintTask::new).forEach(printTask -> {
          executor.execute(printTask);
          System.out.println("线程池中所有线程数目:" + executor.getPoolSize() + ",队列中待执行的任务数目:" +
            executor.getQueue().size() + ",已执行完的任务数目:" + executor.getCompletedTaskCount());
        });
        
        executor.shutdown();
      }
    }
    
    class PrintTask implements Runnable {
      private int taskIndex;
      
      PrintTask(int index) {
        this.taskIndex = index;
      }
      
      @Override
      public void run() {
        System.out.println(taskIndex + " is running...");
         try {
         TimeUnit.SECONDS.sleep(2);
         } catch (InterruptedException e) {
         e.printStackTrace();
         }
        System.out.println(taskIndex + " end");
      }
    }
    

    output:

    0 is running...
    线程池中所有线程数目:1,队列中待执行的任务数目:0,已执行完的任务数目:0
    线程池中所有线程数目:1,队列中待执行的任务数目:1,已执行完的任务数目:0
    线程池中所有线程数目:1,队列中待执行的任务数目:2,已执行完的任务数目:0
    线程池中所有线程数目:2,队列中待执行的任务数目:2,已执行完的任务数目:0
    3 is running...
    3 end
    0 end
    1 is running...
    2 is running...
    2 end
    1 end
    

    当把要执行的实例变成 5 个就会出现 RejectedExecutionException 异常:

    java.util.concurrent.RejectedExecutionException: 
    Task threadPool.PrintTask@7699a589 rejected from 
    java.util.concurrent.ThreadPoolExecutor@58372a00[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]
    

    java doc中,并不提倡我们直接使用 ThreadPoolExecutor ,而是使用 Executors 类中提供的几个静态方法来创建线程池

    • Executors.newCachedThreadPool() 若线程池的当前规模超过了`corePoolSize`,就会回收部分空闲的线程(根据`keepAliveTime`来回收),当需求增加时,线程池又可以智能的添加新线程来处理任务。此线程池大小`Integer.MAX_VALUE`可以认为是不做限制(使用队列`SynchronousQueue`),线程池大小完全依赖于JVM能够创建的最大线程大小

    • Executors.newSingleThreadExecutor() 创建容量为1的线程池,`corePoolSize`和`maximumPoolSize`均为1,使用无界队列`LinkedBlockingQueue`

    • Executors.newFixedThreadPool(int) 创建容量为固定个数n的线程池。`corePoolSize`和`maximumPoolSize`均为n,使用无界队列`LinkedBlockingQueue`

    • Executors.newScheduledThreadPool(int) 创建一个指定corePoolSize的线程池,支持定时及周期性任务执行

    • 如果ThreadPoolExecutor达不到要求,可以自己继承ThreadPoolExecutor类进行重写

        public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
        }
    
        public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
            return new DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
        }
    
        public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
            return new ScheduledThreadPoolExecutor(corePoolSize);
        }
    

    参考链接:https://www.cnblogs.com/dolphin0520/p/3932921.html

    相关文章

      网友评论

          本文标题:线程池第一印象~

          本文链接:https://www.haomeiwen.com/subject/kosrtctx.html