美文网首页
近万字,就是为了和你聊聊线程池

近万字,就是为了和你聊聊线程池

作者: java小杰要加油 | 来源:发表于2020-10-27 22:47 被阅读0次

    更多精彩请关注公众号xhJaver,京东java工程师和你一起成长

    我们知道,在计算机中创建一个线程和销毁一个线程都是十分耗费资源的操作,有一种思想叫做,池化思想,就是说我们创建个池子,把耗费资源的操作都提前做好,后面大家一起用创建好的东西,最后统一销毁。省去了用一次创建一次,销毁一次,这种耗费资源的操作。

    一、线程池工作原理

    线程池就是这种思想,他的基本工作流程如下图所示

    线程池原理流程图.png

    那么他的核心线程,任务队列,这些又是什么呢?怎么设置呢?

    这些就要从代码入手了,我们先来看下线程池构造方法的代码

    二、线程池构造方法

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
    
    

    其实ThreadPoolExecutor有四种构造方法,不过底层都是用这个7个参数的构造方法,所以我们弄懂这一个就好了,以下是其他构造方法的底层实现

     public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  RejectedExecutionHandler handler) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 Executors.defaultThreadFactory(), handler);
        }
    
    
       public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 Executors.defaultThreadFactory(), defaultHandler);
        }
    
    
      public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 threadFactory, defaultHandler);
        }
    
    

    其中默认的拒绝策略是

    private static final RejectedExecutionHandler defaultHandler =
            new AbortPolicy();
    
    

    这些构造方法中的

            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 threadFactory, defaultHandler);
    
    

    就是那七个参数的构造方法

    有点懵?没关系,接下来我们一个个的解析这七个参数的意思

    三、线程池参数介绍

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
    
    

    1. 第一个参数 corePoolSize 代表这个线程池的核心线程数

    2. 第二个参数 maximumPoolSize 代表这个线程池的最大线程数 (核心线程数 +非核心线程数)

    3. 第三个参数 keepAliveTime 代表这个线程池的非核心线程的空闲时的存活时间

    4. 第四个参数 unit 代表这个线程池的非核心线程的空闲存活时间的单位

    5. 第五个参数 workQueue 代表这个线程池的任务阻塞队列,jdk中有几种常见的阻塞队列

    • ArrayBlockingQueue:基于数组结构的有界阻塞队列
    • LinkedBlockingQueue:是一个基于链表结构的阻塞队列
    • SynchronousQueue :同步队列,只存储一个任务,插入任务时要等待(如果队列里有元素的话)取出任务时要等待(如果队列里没有元素的话)
    • PriorityBlockingQueue:优先级队列,进入队列的元素按照优先级会进行排序

    建议:建议使用有界队列,要是无界队列的话,任务太多的话可能会导致OOM

    6. 第六个参数 threadFactory(可以自定义) 代表这个线程池的创建线程的工厂,有两种

    • Executors.privilegedThreadFactory() 使用访问权限创建一个权限控制的线程。
    • Executors.defaultThreadFactory() 将创建一个同线程组且默认优先级的线程

    7. 第七个参数 handler(可以自定义) 代表这个线程池的拒绝处理任务的饱和策略,jdk默认提供了四种

    • new ThreadPoolExecutor.AbortPolicy(); 直接抛出异常
    • new ThreadPoolExecutor.CallerRunsPolicy(); 用当前调用者的线程中处理传过来的任务
    • new ThreadPoolExecutor.DiscardOldestPolicy(); 丢弃最老的一个任务,然后把传过来的任务加入到阻塞队列中
    • new ThreadPoolExecutor.DiscardPolicy(); 什么都不做,直接丢掉传过来的任务

    四、使用线程池例子

    基础概念也都看了,下面来看个使用线程池处理任务的小例子

    首先,我们先创建个任务类

    public class Task implements Runnable {
        private String taskName;
    
        public Task(String taskName) {
            this.taskName = taskName;
        }
    
        @Override
        public void run() {
            try {
                //模拟每个任务的耗时
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            String name = Thread.currentThread().getName();
            System.out.println("这里是xhJaver,线程池系列 当前线程名字是 " + name+"  处理了  "+ taskName+"  任务");
        }
    }
    
    

    我们再来看测试类

    public class Demo1 {
        public static void main(String[] args) {
            //阻塞队列,设置阻塞任务最多为10个
            BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable> (10);
            //线程工厂
            ThreadFactory threadFactory = Executors.defaultThreadFactory();
            //拒绝策略 当线程池的最大工作线程跑满以及阻塞队列满了的话,会由拒绝策略处理剩下的任务
            ThreadPoolExecutor.AbortPolicy abortPolicy = new ThreadPoolExecutor.AbortPolicy();
            //创建线程池  核心线程数为5  最大线程数为10 非核心线程空闲存活时间为60s
            ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 60L,
                            TimeUnit.SECONDS, blockingQueue, threadFactory, abortPolicy
            );
            for (int i=0;i<10;i++){
                //创建10个任务,如果要是创建>20个任务,则20以外的任务会交由拒绝策略处理
                Task task = new Task("task" + i);
                //让我们自定义的线程池去跑这些任务
                threadPoolExecutor.execute(task);
            }
             //记得要关闭线程池
            threadPoolExecutor.shutdown();
        }
    }
    
    

    输出结果是

    这里是xhJaver,线程池系列 当前线程名字是 pool-1-thread-1  处理了  task0  任务
    这里是xhJaver,线程池系列 当前线程名字是 pool-1-thread-2  处理了  task1  任务
    这里是xhJaver,线程池系列 当前线程名字是 pool-1-thread-3  处理了  task2  任务
    这里是xhJaver,线程池系列 当前线程名字是 pool-1-thread-4  处理了  task3  任务
    这里是xhJaver,线程池系列 当前线程名字是 pool-1-thread-5  处理了  task4  任务
    这里是xhJaver,线程池系列 当前线程名字是 pool-1-thread-1  处理了  task5  任务
    这里是xhJaver,线程池系列 当前线程名字是 pool-1-thread-2  处理了  task6  任务
    这里是xhJaver,线程池系列 当前线程名字是 pool-1-thread-5  处理了  task9  任务
    这里是xhJaver,线程池系列 当前线程名字是 pool-1-thread-4  处理了  task8  任务
    这里是xhJaver,线程池系列 当前线程名字是 pool-1-thread-3  处理了  task7  任务
    
    

    五、线程工厂是什么 ?

    文中一直说线程工厂线程工厂,这线程工场到底是干嘛的呢? 当然是创建线程的工厂啦,创建线程,线程当然得有个名字咯,就像刚才的小例子输出的一样,线程的名字是pool-1-thread-3等等,我现在不想叫这个名字了,那就叫thread-xhJaver吧,这是自定义的名字,那怎么自定义呢?

    首先,要实现ThreadFactory接口中的Thread newThread(Runnable r)方法, 传入一个任务,返回一个自定义线程,如下面的代码一样

    public class DIYThreadFactory implements ThreadFactory {
    
        private AtomicInteger atomicInteger;
    
        public  DIYThreadFactory( AtomicInteger atomicInteger){
             this.atomicInteger =  atomicInteger;
        }
        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setName("xhJaver-thread-"+atomicInteger.getAndIncrement());
            return thread;
        }
    }
    
    

    然后在使用时传入这个自定义的线程工厂

    public static void main(String[] args) {
     //阻塞队列,设置阻塞任务最多为10个
    BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable> (10);
     //创建线程安全的计数器
     AtomicInteger atomicInteger = new AtomicInteger();
     //自定义线程工厂
     ThreadFactory threadFactory = new DIYThreadFactory(atomicInteger);
     //拒绝策略 当线程池的最大工作线程跑满以及阻塞队列满了的话,会由拒绝策略处理剩下的任务
    ThreadPoolExecutor.AbortPolicy abortPolicy = new ThreadPoolExecutor.AbortPolicy();
     //创建线程池  核心线程数为5  最大线程数为10 非核心线程空闲存活时间为60s
     ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 60L,
       TimeUnit.SECONDS, blockingQueue, threadFactory, abortPolicy
     );
     for (int i=0;i<10;i++){
      //创建10个任务,如果要是创建>20个任务,则20以外的任务会交由拒绝策略处理
      Task task = new Task("task" + i);
      //让我们自定义的线程池去跑这些任务
      threadPoolExecutor.execute(task);
     }
     //记得要关闭线程池
     threadPoolExecutor.shutdown();
    }
    
    

    输出结果是

    这里是xhJaver,线程池系列 当前线程名字是 xhJaver-thread-0  处理了  task0  任务
    这里是xhJaver,线程池系列 当前线程名字是 xhJaver-thread-1  处理了  task1  任务
    这里是xhJaver,线程池系列 当前线程名字是 xhJaver-thread-4  处理了  task4  任务
    这里是xhJaver,线程池系列 当前线程名字是 xhJaver-thread-3  处理了  task3  任务
    这里是xhJaver,线程池系列 当前线程名字是 xhJaver-thread-2  处理了  task2  任务
    这里是xhJaver,线程池系列 当前线程名字是 xhJaver-thread-0  处理了  task5  任务
    这里是xhJaver,线程池系列 当前线程名字是 xhJaver-thread-1  处理了  task6  任务
    这里是xhJaver,线程池系列 当前线程名字是 xhJaver-thread-2  处理了  task9  任务
    这里是xhJaver,线程池系列 当前线程名字是 xhJaver-thread-3  处理了  task8  任务
    这里是xhJaver,线程池系列 当前线程名字是 xhJaver-thread-4  处理了  task7  任务
    
    

    我也学会了自定义线程工厂了,可自定义名字到底有用呢,当然是排查问题啊!把线程名字定义为和自己业务有关的名字,到时候报错的时候就方便排查了。

    六、 拒绝策略是什么

    线程工厂可以自定义,那拒绝策略可以自定义吗?当然可以啦 方法如下,首先也要实现一个RejectedExecutionHandler接口,重写rejectedExecution 这个方法

    public class DIYRejectedHandler implements RejectedExecutionHandler {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        //记录日志等操作
            System.out.println("这是xhJaver无法处理的任务  "+r.toString()+"  当前线程名字是 "+Thread.currentThread().getName());
        }
    }
    
    

    然后在使用时传入这个自定义的拒绝策略

    public static void main(String[] args) {
     //阻塞队列,设置阻塞任务最多为10个
     BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable> (10);
     //创建线程安全的计数器
     AtomicInteger atomicInteger = new AtomicInteger();
     //自定义线程工厂
     ThreadFactory threadFactory = new DIYThreadFactory(atomicInteger);
     //自定义拒绝策略 当线程池的最大工作线程跑满以及阻塞队列满了的话,会由拒绝策略处理剩下的任务
     DIYRejectedHandler diyRejectedHandler = new DIYRejectedHandler();
     //创建线程池  核心线程数为5  最大线程数为10 非核心线程空闲存活时间为60s
     ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 60L,
       TimeUnit.SECONDS, blockingQueue, threadFactory, diyRejectedHandler
     );
     for (int i=0;i<30;i++){
      //创建10个任务,如果要是创建>20个任务,则20以外的任务会交由拒绝策略处理
      Task task = new Task("task" + i);
      //让我们自定义的线程池去跑这些任务
      threadPoolExecutor.execute(task);
     }
     //记得要关闭线程池
     threadPoolExecutor.shutdown();
    }
    
    

    输出结果是

    这是xhJaver无法处理的任务  Task{taskName='task20'}  当前线程名字是 main
    这是xhJaver无法处理的任务  Task{taskName='task21'}  当前线程名字是 main
    这是xhJaver无法处理的任务  Task{taskName='task22'}  当前线程名字是 main
    这是xhJaver无法处理的任务  Task{taskName='task23'}  当前线程名字是 main
    这是xhJaver无法处理的任务  Task{taskName='task24'}  当前线程名字是 main
    这是xhJaver无法处理的任务  Task{taskName='task25'}  当前线程名字是 main
    这是xhJaver无法处理的任务  Task{taskName='task26'}  当前线程名字是 main
    这是xhJaver无法处理的任务  Task{taskName='task27'}  当前线程名字是 main
    这是xhJaver无法处理的任务  Task{taskName='task28'}  当前线程名字是 main
    这是xhJaver无法处理的任务  Task{taskName='task29'}  当前线程名字是 main
    这里是xhJaver,线程池系列 当前线程名字是 xhJaver-thread-5  处理了  task15  任务
    这里是xhJaver,线程池系列 当前线程名字是 xhJaver-thread-4  处理了  task4  任务
    这里是xhJaver,线程池系列 当前线程名字是 xhJaver-thread-3  处理了  task3  任务
    这里是xhJaver,线程池系列 当前线程名字是 xhJaver-thread-2  处理了  task2  任务
    这里是xhJaver,线程池系列 当前线程名字是 xhJaver-thread-1  处理了  task1  任务
    这里是xhJaver,线程池系列 当前线程名字是 xhJaver-thread-0  处理了  task0  任务
    这里是xhJaver,线程池系列 当前线程名字是 xhJaver-thread-9  处理了  task19  任务
    这里是xhJaver,线程池系列 当前线程名字是 xhJaver-thread-8  处理了  task18  任务
    这里是xhJaver,线程池系列 当前线程名字是 xhJaver-thread-7  处理了  task17  任务
    这里是xhJaver,线程池系列 当前线程名字是 xhJaver-thread-6  处理了  task16  任务
    这里是xhJaver,线程池系列 当前线程名字是 xhJaver-thread-4  处理了  task6  任务
    这里是xhJaver,线程池系列 当前线程名字是 xhJaver-thread-5  处理了  task5  任务
    这里是xhJaver,线程池系列 当前线程名字是 xhJaver-thread-3  处理了  task7  任务
    这里是xhJaver,线程池系列 当前线程名字是 xhJaver-thread-2  处理了  task8  任务
    这里是xhJaver,线程池系列 当前线程名字是 xhJaver-thread-1  处理了  task9  任务
    这里是xhJaver,线程池系列 当前线程名字是 xhJaver-thread-0  处理了  task10  任务
    这里是xhJaver,线程池系列 当前线程名字是 xhJaver-thread-9  处理了  task11  任务
    这里是xhJaver,线程池系列 当前线程名字是 xhJaver-thread-8  处理了  task12  任务
    这里是xhJaver,线程池系列 当前线程名字是 xhJaver-thread-7  处理了  task13  任务
    这里是xhJaver,线程池系列 当前线程名字是 xhJaver-thread-6  处理了  task14  任务
    
    

    七、常见的阻塞队列及注意点

    因为阻塞队列的知识太多了,后续我们会单独开篇来讲这个阻塞队列,先介绍几个常用的

    1.ArrayBlockingQueue 基于数组的有界队列

    2.LinkedBlockingQueue 基于链表的无界队列

    3.SynchronousQueue

    它内部只有一个元素,插入时如果发现内部有元素未被取走则阻塞,取元素时若队列没有元素则被阻 塞,直到有元素插入进来。

    搭配线程池使用如下 ,先创建任务类

    public class Task implements Runnable {
        private String taskName;
    
        public Task(String taskName) {
            this.taskName = taskName;
        }
    
        @Override
        public String toString() {
            return "Task{" +
                    "taskName='" + taskName + '\'' +
                    '}';
        }
    
        @Override
        public void run() {
            try {
                //模拟每个任务的耗时
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            String name = Thread.currentThread().getName();
            System.out.println("这里是xhJaver,线程池系列 当前线程名字是 " + name+"  处理了  "+ taskName+"  任务");
        }
    }
    
    

    再使用阻塞队列

    public static void main(String[] args) {
            ExecutorService executorService = Executors.newCachedThreadPool();
            for (int i=0;i<10;i++){
                //创建十个任务
                Task task = new Task("task" + i);
                //去跑任务
                executorService.execute(task);
            }
             //记得要关闭线程池
            executorService.shutdown();
        }
    
    

    其中newCachedThreadPool底层就使用的是SynchronousQueue

     public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
    
    

    输出结果是

    这里是xhJaver,线程池系列 当前线程名字是 pool-1-thread-1  处理了  task0  任务
    这里是xhJaver,线程池系列 当前线程名字是 pool-1-thread-2  处理了  task1  任务
    这里是xhJaver,线程池系列 当前线程名字是 pool-1-thread-5  处理了  task4  任务
    这里是xhJaver,线程池系列 当前线程名字是 pool-1-thread-4  处理了  task3  任务
    这里是xhJaver,线程池系列 当前线程名字是 pool-1-thread-3  处理了  task2  任务
    这里是xhJaver,线程池系列 当前线程名字是 pool-1-thread-6  处理了  task5  任务
    这里是xhJaver,线程池系列 当前线程名字是 pool-1-thread-7  处理了  task6  任务
    这里是xhJaver,线程池系列 当前线程名字是 pool-1-thread-10  处理了  task9  任务
    这里是xhJaver,线程池系列 当前线程名字是 pool-1-thread-9  处理了  task8  任务
    这里是xhJaver,线程池系列 当前线程名字是 pool-1-thread-8  处理了  task7  任务
    
    

    由此可见,线程池分别创建了十个线程来处理这十个任务,为什么呢? 这是因为,我每个任务的模拟处理时间是1s,当再来的任务发现阻塞队列中有任务还没被取走,就创建非核心线程处理刚来的这个任务,不断的来任务,不断的创建线程,所以用这个阻塞队列再搭配线程池的总线程数等参数设置可能会因为不断的创建线程而导致OOM。

    4.PriorityBlockingQueue 优先级队列 进入队列的元素会按照任务的优先级排序。并且必须实现Comparable接口。

    参数:priorityTask - 要比较的对象。 返回:负整数、零或正整数, 根据此对象是小于、等于还是大于指定对象(要比较的对象)。

    先创建一个带有优先级的任务

    public class PriorityTask implements Runnable , Comparable<PriorityTask>{
    
        private String taskName;
        // 优先级,根据这个数进行排序
        private Integer priority;
    
        public PriorityTask(Integer priority,String taskName) {
            this.priority = priority;
            this.taskName = taskName;
        }
    
        //这个compareTo方法的返回值如果是-1的话,则排序会认为传过来的任务比此任务的大,降序排列
        //这个compareTo方法的返回值如果是1的话,则排序会认为传过来的任务比此任务的小,升序排列
        @Override
        public int compareTo(PriorityTask priorityTask) {
            //Integer.compare返回 -1代表 传过来的任务的priority 比次任务的priority要小
            // Integer.compare 0   传过来的任务的priority 比次任务的priority一样大
            //Integer.compare  1   传过来的任务的priority 比次任务的priority要大
            return Integer.compare(priorityTask.priority,this.priority);
        }
    
        @Override
        public void run() {
            try {
                //模拟每个任务的耗时
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            String name = Thread.currentThread().getName();
            System.out.println("这里是xhJaver,线程池系列 当前线程名字是 " + name+"  处理了  "+ taskName+"  任务");
        }
    
        @Override
        public String toString() {
            return "Task{" +
                    "taskName='" + taskName + '\'' +
                    '}';
        }
    
    }
    
    

    Integer.compare 的 比较大小代码

    java
       public static int compare(int x, int y) {
            return (x < y) ? -1 : ((x == y) ? 0 : 1);
        }
    
    

    测试代码

    
      public static void main(String[] args) {
            ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
                    60L, TimeUnit.SECONDS,
                    new PriorityBlockingQueue());
            for (int i=0;i<5;i++){
                //创建十个任务
                PriorityTask priorityTask = new PriorityTask(i,"task" + i);
                //去跑任务
                threadPoolExecutor.execute(priorityTask);
            }
            for (int i=100;i>=95;i--){
                //创建十个任务
                PriorityTask priorityTask = new PriorityTask(i,"task" + i);
                //去跑任务
                threadPoolExecutor.execute(priorityTask);
            }
             //记得要关闭线程池
            threadPoolExecutor.shutdown();
        }
    
    

    输出结果是

    这里是xhJaver,线程池系列 当前线程名字是 pool-1-thread-1  处理了  task0  任务
    这里是xhJaver,线程池系列 当前线程名字是 pool-1-thread-1  处理了  task100  任务
    这里是xhJaver,线程池系列 当前线程名字是 pool-1-thread-1  处理了  task99  任务
    这里是xhJaver,线程池系列 当前线程名字是 pool-1-thread-1  处理了  task98  任务
    这里是xhJaver,线程池系列 当前线程名字是 pool-1-thread-1  处理了  task97  任务
    这里是xhJaver,线程池系列 当前线程名字是 pool-1-thread-1  处理了  task96  任务
    这里是xhJaver,线程池系列 当前线程名字是 pool-1-thread-1  处理了  task95  任务
    这里是xhJaver,线程池系列 当前线程名字是 pool-1-thread-1  处理了  task4  任务
    这里是xhJaver,线程池系列 当前线程名字是 pool-1-thread-1  处理了  task3  任务
    这里是xhJaver,线程池系列 当前线程名字是 pool-1-thread-1  处理了  task2  任务
    这里是xhJaver,线程池系列 当前线程名字是 pool-1-thread-1  处理了  task1  任务
    
    

    由输出结果可见,除了第一个以外,处理任务的顺序会按照优先级大小先处理

    八、几种常见的线程池及注意点

    他们分别是以下几种

    1.newFixedThreadPool

    • Executors.newFixedThreadPool(10) 它的构造方法是
     public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
    
    

    有此可见,这个FixedThreadPool线程池的核心线程数和最大线程数一样,所以就没有非核心线程数,存活时间这个参数也就是无效的了,它底层用的是LinkedBlockingQueue这个阻塞队列,这个队列是个无界队列,可以点进去源码看它默认的容量是Integer.MAX_VALUE

     public LinkedBlockingQueue() {
            this(Integer.MAX_VALUE);
        }
    
    

    所以这会导致一个什么问题呢?就会导致,当核心线程都跑满的时候,再来新任务的话就会不断的添加至这个阻塞队列里面,一直加一直加,但是内存是有限的,所以有可能会出现 OOM(OutOfMemory) 的问题

    • Executors.newFixedThreadPool(10,Executors.defaultThreadFactory()); 这个构造方法可以传过来指定的创建线程的工厂
     public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>(),
                                          threadFactory);
        }
    
    

    2.newCachedThreadPool

    • Executors.newCachedThreadPool() 它的构造方法是
    public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
    
    

    由此可见,它的核心线程数默认是0,线程池总线程容量是Integer.MAX_VALUE,阻塞队列用的是SynchronousQueue同步队列,非核心线程数的空闲存活时间为60s,这会导致一个什么问题呢?只要来了一个任务,如果没有线程的话就创建一个非核心线程去跑这个任务,如果跑着的过程中又来了一个任务,就会继续创建线程去跑,以此类推,内存是有限的,不断的创建线程的话也会触发OOM问题

    • Executors.newCachedThreadPool(Executors.defaultThreadFactory()) 这个构造方法可以传过来指定的创建线程的工厂
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>(),
                                          threadFactory);
        }
    
    

    3.newSingleThreadExecutor

    • Executors.newSingleThreadExecutor() 它的构造方法是
    public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
    
    

    我们可以看出,它的核心线程数是一个,总线程数也是一个。底层用的是LinkedBlockingQueue阻塞队列 当来任务的时候线程池如果没有线程的话,则创建一个也是唯一一个线程来执行任务,剩下的任务都会被塞进无界阻塞队列里面,也是会有可能产生OOM问题。

    • Executors.newSingleThreadExecutor(Executors.defaultThreadFactory()) 这个构造方法可以传过来指定的创建线程的工厂
    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>(),
                                        threadFactory));
        }
    
    

    九、拓展线程池

    什么?线程池还可以拓展?!是的,如果我想记录下每个任务的执行开始情况,结束情况,线程池关闭情况就要拓展啦,ThreadPoolExecutor它内部是提供了几个方法给我们拓展,其中beforeExecute、afterExecute、terminated,这三个分别对应任务开始,任务结束,线程池关闭的三种情况,所以我们就要重写他们啦,话不多说,看下代码

    public static void main(String[] args) {
     //阻塞队列,设置阻塞任务最多为10个
     BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable> (10);
     //创建线程安全的计数器
     AtomicInteger atomicInteger = new AtomicInteger();
     //自定义线程工厂
     ThreadFactory threadFactory = new DIYThreadFactory(atomicInteger);
     //自定义拒绝策略 当线程池的最大工作线程跑满以及阻塞队列满了的话,会由拒绝策略处理剩下的任务
     DIYRejectedHandler diyRejectedHandler = new DIYRejectedHandler();
     //创建线程池  核心线程数为5  最大线程数为10 非核心线程空闲存活时间为60s
     ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 60L,
       TimeUnit.SECONDS, blockingQueue, threadFactory, diyRejectedHandler
     ){
      @Override
      protected void beforeExecute(Thread t, Runnable r) {
       System.out.println("xhJaver 当前线程是"+t.getName()+"开始处理任务:"+r.toString());
      }
    
      @Override
      protected void afterExecute(Runnable r, Throwable t) {
       if(t!=null){
        System.out.println("xhJaver 当前线程是"+Thread.currentThread().getName() +"处理任务结束:"+r.toString()+" 错误是 "+ t);
       }
       System.out.println("xhJaver 当前线程是"+Thread.currentThread().getName() +"处理任务结束:"+r.toString()+" 没有错误 ");
    
      }
    
      @Override
      protected void terminated() {
       System.out.println("xhJaver 当前线程是"+Thread.currentThread().getName() +"关闭线程池");
      }
     };
     for (int i=0;i<21;i++){
      //创建10个任务,如果要是创建>20个任务,则20以外的任务会交由拒绝策略处理
      Task task = new Task("task" + i);
      //让我们自定义的线程池去跑这些任务
      threadPoolExecutor.execute(task);
     }
     //记得要关闭线程池
     threadPoolExecutor.shutdown();
    }
    
    

    输出结果是

    这是xhJaver无法处理的任务  Task{taskName='task20'}  当前线程名字是 main
    xhJaver 当前线程是xhJaver-thread-7开始处理任务:Task{taskName='task17'}
    xhJaver 当前线程是xhJaver-thread-6开始处理任务:Task{taskName='task16'}
    xhJaver 当前线程是xhJaver-thread-9开始处理任务:Task{taskName='task19'}
    xhJaver 当前线程是xhJaver-thread-4开始处理任务:Task{taskName='task4'}
    xhJaver 当前线程是xhJaver-thread-8开始处理任务:Task{taskName='task18'}
    xhJaver 当前线程是xhJaver-thread-2开始处理任务:Task{taskName='task2'}
    xhJaver 当前线程是xhJaver-thread-3开始处理任务:Task{taskName='task3'}
    xhJaver 当前线程是xhJaver-thread-5开始处理任务:Task{taskName='task15'}
    xhJaver 当前线程是xhJaver-thread-0开始处理任务:Task{taskName='task0'}
    xhJaver 当前线程是xhJaver-thread-1开始处理任务:Task{taskName='task1'}
    这里是xhJaver,线程池系列 当前线程名字是 xhJaver-thread-4  处理了  task4  任务
    xhJaver 当前线程是xhJaver-thread-4处理任务结束:Task{taskName='task4'} 没有错误 
    xhJaver 当前线程是xhJaver-thread-4开始处理任务:Task{taskName='task5'}
    这里是xhJaver,线程池系列 当前线程名字是 xhJaver-thread-9  处理了  task19  任务
    xhJaver 当前线程是xhJaver-thread-9处理任务结束:Task{taskName='task19'} 没有错误 
    这里是xhJaver,线程池系列 当前线程名字是 xhJaver-thread-6  处理了  task16  任务
    xhJaver 当前线程是xhJaver-thread-6处理任务结束:Task{taskName='task16'} 没有错误 
    xhJaver 当前线程是xhJaver-thread-9开始处理任务:Task{taskName='task6'}
    这里是xhJaver,线程池系列 当前线程名字是 xhJaver-thread-7  处理了  task17  任务
    xhJaver 当前线程是xhJaver-thread-7处理任务结束:Task{taskName='task17'} 没有错误 
    xhJaver 当前线程是xhJaver-thread-7开始处理任务:Task{taskName='task8'}
    xhJaver 当前线程是xhJaver-thread-6开始处理任务:Task{taskName='task7'}
    这里是xhJaver,线程池系列 当前线程名字是 xhJaver-thread-1  处理了  task1  任务
    这里是xhJaver,线程池系列 当前线程名字是 xhJaver-thread-8  处理了  task18  任务
    xhJaver 当前线程是xhJaver-thread-8处理任务结束:Task{taskName='task18'} 没有错误 
    xhJaver 当前线程是xhJaver-thread-8开始处理任务:Task{taskName='task9'}
    这里是xhJaver,线程池系列 当前线程名字是 xhJaver-thread-2  处理了  task2  任务
    xhJaver 当前线程是xhJaver-thread-2处理任务结束:Task{taskName='task2'} 没有错误 
    xhJaver 当前线程是xhJaver-thread-2开始处理任务:Task{taskName='task10'}
    这里是xhJaver,线程池系列 当前线程名字是 xhJaver-thread-3  处理了  task3  任务
    xhJaver 当前线程是xhJaver-thread-3处理任务结束:Task{taskName='task3'} 没有错误 
    这里是xhJaver,线程池系列 当前线程名字是 xhJaver-thread-5  处理了  task15  任务
    xhJaver 当前线程是xhJaver-thread-5处理任务结束:Task{taskName='task15'} 没有错误 
    xhJaver 当前线程是xhJaver-thread-5开始处理任务:Task{taskName='task12'}
    xhJaver 当前线程是xhJaver-thread-1处理任务结束:Task{taskName='task1'} 没有错误 
    xhJaver 当前线程是xhJaver-thread-1开始处理任务:Task{taskName='task13'}
    这里是xhJaver,线程池系列 当前线程名字是 xhJaver-thread-0  处理了  task0  任务
    xhJaver 当前线程是xhJaver-thread-3开始处理任务:Task{taskName='task11'}
    xhJaver 当前线程是xhJaver-thread-0处理任务结束:Task{taskName='task0'} 没有错误 
    xhJaver 当前线程是xhJaver-thread-0开始处理任务:Task{taskName='task14'}
    这里是xhJaver,线程池系列 当前线程名字是 xhJaver-thread-4  处理了  task5  任务
    xhJaver 当前线程是xhJaver-thread-4处理任务结束:Task{taskName='task5'} 没有错误 
    这里是xhJaver,线程池系列 当前线程名字是 xhJaver-thread-6  处理了  task7  任务
    xhJaver 当前线程是xhJaver-thread-6处理任务结束:Task{taskName='task7'} 没有错误 
    这里是xhJaver,线程池系列 当前线程名字是 xhJaver-thread-9  处理了  task6  任务
    xhJaver 当前线程是xhJaver-thread-9处理任务结束:Task{taskName='task6'} 没有错误 
    这里是xhJaver,线程池系列 当前线程名字是 xhJaver-thread-7  处理了  task8  任务
    xhJaver 当前线程是xhJaver-thread-7处理任务结束:Task{taskName='task8'} 没有错误 
    这里是xhJaver,线程池系列 当前线程名字是 xhJaver-thread-2  处理了  task10  任务
    xhJaver 当前线程是xhJaver-thread-2处理任务结束:Task{taskName='task10'} 没有错误 
    这里是xhJaver,线程池系列 当前线程名字是 xhJaver-thread-8  处理了  task9  任务
    xhJaver 当前线程是xhJaver-thread-8处理任务结束:Task{taskName='task9'} 没有错误 
    这里是xhJaver,线程池系列 当前线程名字是 xhJaver-thread-5  处理了  task12  任务
    xhJaver 当前线程是xhJaver-thread-5处理任务结束:Task{taskName='task12'} 没有错误 
    这里是xhJaver,线程池系列 当前线程名字是 xhJaver-thread-0  处理了  task14  任务
    xhJaver 当前线程是xhJaver-thread-0处理任务结束:Task{taskName='task14'} 没有错误 
    这里是xhJaver,线程池系列 当前线程名字是 xhJaver-thread-3  处理了  task11  任务
    xhJaver 当前线程是xhJaver-thread-3处理任务结束:Task{taskName='task11'} 没有错误 
    这里是xhJaver,线程池系列 当前线程名字是 xhJaver-thread-1  处理了  task13  任务
    xhJaver 当前线程是xhJaver-thread-1处理任务结束:Task{taskName='task13'} 没有错误 
    xhJaver 当前线程是xhJaver-thread-1关闭线程池
    
    

    更多精彩请关注公众号xhJaver,京东java工程师和你一起成长

    相关文章

      网友评论

          本文标题:近万字,就是为了和你聊聊线程池

          本文链接:https://www.haomeiwen.com/subject/uwufvktx.html