美文网首页一些收藏
Java并发编程 线程池

Java并发编程 线程池

作者: 香沙小熊 | 来源:发表于2020-03-04 19:34 被阅读0次

    1. 线程池的自我介绍

    线程池的重要性
    什么是池
    复用我们的线程
    控制我们资源的总量

    如果不使用线程池,每个任务都新开一个线程处理
    • 一个线程
    • for循环创建线程
    • 当任务数量上升到1000
    public class EveryTaskOneThread {
        public static void main(String[] args) {
            for (int i = 0; i < 10; i++) {
                Thread thread = new Thread(new Task());
                thread.start();
            }
        }
    
        static class Task implements Runnable {
            @Override
            public void run() {
                System.out.println("执行了任务");
            }
        }
    }
    

    这样开销太大,我们希望有固定数量的线程,来执行者1000个线程,这样就避开了反复创建并销毁线程所带来的开销问题。

    为什么要使用线程池?

    1.反复创建开销大
    2.过多的线程会占用太多内存
    解决以上两个问题的思路

    • 用少量的线程-避免内存占用过多
    • 让这部分线程都保持工作,且可以反复执行任务-避免生命周期的损耗
    线程池的好处
    • 加快响应速度
    • 合理利用CPU和内存
    • 统一管理
    线程池适合应用的场景
    • 服务器接收到大量请求时,使用线程池技术是非常合适的。
    • 在开中,如果需要创建5个以上的线程,就可以使用线程池来管理。

    2. 创建和停止线程池

    线程池构造函数的参数
    public ThreadPoolExecutor(
                                  int corePoolSize,        //第1个参数
                                  int maximumPoolSize, //第2个参数
                                  long keepAliveTime, //第3个参数
                                  TimeUnit unit, //第4个参数
                                  BlockingQueue<Runnable> workQueue, //第5个参数
                                  ThreadFactory threadFactory, //第6个参数
                                  RejectedExecutionHandler handler) { //第7个参数
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 || 
                maximumPoolSize < corePoolSize || 
                keepAliveTime < 0) // 第一处
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)//第二处
                throw new NullPointerException();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }
    
    corePoolSize

    表示常驻核心线程数。如果等于0,则任务执行完成后,没有任何请求进入时销毁线程池的线程;如果大于0,即使本地任务执行完毕,核心线程也不会被销毁。这个值的设置非常关键,设置过大会浪费资源,设置的过小会导致线程频繁地创建或销毁。

    maximumPoolSize

    表示线程池能够容纳同时执行的最大线程数。从上方的示例代码中第一处来看,必须大于或等于1。如果待执行的线程数大于此值,需要借助第5个参数的帮助。缓存在队列中。如果maximumPoolSize 与corePoolSize 相等,即是固定大小线程池。

    keepAliveTime

    表示线程池中的线程空闲时间,当空闲时间达到KeepAliveTime 值时,线程被销毁,直到剩下corePoolSize 个线程为止,避免浪费内存和句柄资源。在默认情况下,当线程池的线程大于corePoolSize 时,keepAliveTime 才会起作用。但是ThreadPoolExecutor的allowCoreThreadTimeOut 变量设置为ture时,核心线程超时后也会被回收。

    TimeUnit

    表示时间单位。keepAliveTime 的时间单位通常是TimeUnit.SECONDS。

    workQueue

    表示缓存队列。当请求的线程数大于maximumPoolSize时,线程进入BlockingQueue 阻塞队列。
    有3种最常见的队列类型
    1)直接交换:SynchronousQueue(实际存不下任务)
    2)无界队列:LinkedBlockingQueue
    3)有界队列:ArrayBlockingQueue

    threadFactory

    表示线程工厂。它用来生产一组相同任务的线程。线程池的命名是通过给这个factory增加组名前缀来实现的。在虚拟机栈分析时,就可以知道线程任务是由哪个线程工厂产生的。
    新线程是有ThreadFactory创建的, 默认使用 Executors.defaultThreadFactory(),创建出来的线程都在同一个线程组,拥有同样的NORM_PRIORITY,那么就可以改变线程名、线程组、优先级、是否是守护线程等。

    handler

    表示执行拒绝策略的对象。当超过第5个参数workQueue的任务缓存区上限的时候,就可以通过该策略处理请求,这是一种简单的限流保护。

    增减线程的特点
    1. 通过设置corePoolSize和maximumPoolSize相同,就可以创建固定大小的线程池。
    2. 线程池希望保持较少的线程数,并且只有在负载变得很大时才增加它。
    3. 通过设置maximumPoolSize为很高的值,例如Integer.MAX_VALUE,可以允许线程池容纳任意数量的并发任务。
    4. 是只有在队列填满时才创建多于corePoolSize的线程,所以如果你使用的是无界队列(例如LinkedBlockingQueue),那么线程数就不会超过corePoolSize。
    线程池里的线程数量设定为多少比较合适
    • CPU密集型(加密、计算hash等):最佳线程数为CPU核心数1-2倍
    • 耗时IO型(读写数据库、文件、网络读写等):最佳线程数一般会大于核心数很多倍,以JVM线程监控显示繁忙情况为依据,保证线程空闲可以衔接上,参考Brain Goetz推荐的计算方法:
    • 线程数=CPU核心数*(1+平均等待时间/平均工作时间)

    3. 常见线程池的特点和用法

    • newFixedThreadPool
      采用LinkedBlockingQueue 容易发生OOM
    public class FixedThreadPoolOOM {
    
        private static ExecutorService executorService = Executors.newFixedThreadPool(1);
    
        public static void main(String[] args) {
            for (int i = 0; i < Integer.MAX_VALUE; i++) {
                executorService.execute(new SubThread());
            }
        }
    
    }
    
    class SubThread implements Runnable {
    
    
        @Override
        public void run() {
            try {
                Thread.sleep(1000000000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    

    VM 参数

    -Xmx8m -Xms8m //将内存调小
    
    image.png
    • newSingleThreadExecutor
      采用LinkedBlockingQueue 容易发生OOM

    • newCachedThreadPool
      可缓存线程池
      特点 无界线程池,具有自动回收多余线程的功能
      maximumPoolSize 为 Integer.MAX_VALUE 容易发生OOM

    • newScheduledThreadPool
      maximumPoolSize 为 Integer.MAX_VALUE 容易发生OOM

    • newWorkStealingPool
      适合使用在很耗时的操作,但是newWorkStealingPool不是ThreadPoolExecutor的扩展,它是新的线程池类ForkJoinPool的扩展,但是都是在统一的一个Executors类中实现,由于能够合理的使用CPU进行对任务操作(并行操作),所以适合使用在很耗时的任务中

    4. 停止线程池的正确方法

    1.shutdown
    将线程池状态置为SHUTDOWN,并不会立即停止:

    • 停止接收外部submit的任务
    • 内部正在跑的任务和队列里等待的任务,会执行完
    • 等到第二步完成后,才真正停止
    public class ShutDown {
    
        public static void main(String[] args) throws InterruptedException {
            ExecutorService executorService = Executors.newFixedThreadPool(10);
            for (int i = 0; i < 100; i++) {
                executorService.execute(new ShutDownTask());
            }
            Thread.sleep(1500);
    
    
            executorService.shutdown();
            executorService.execute(new ShutDownTask());
        }
    }
    
    class ShutDownTask implements Runnable {
    
    
        @Override
        public void run() {
            try {
                Thread.sleep(500);
                System.out.println(Thread.currentThread().getName());
            } catch (InterruptedException e) {
                System.out.println(Thread.currentThread().getName() + "被中断了");
            }
        }
    }
    
    image.png
    1. shutdownNow()
      将线程池状态置为STOP。企图立即停止,事实上不一定:
    • 跟shutdown()一样,先停止接收外部提交的任务
    • 忽略队列里等待的任务
    • 尝试将正在跑的任务interrupt中断
    • 返回未执行的任务列表

    它试图终止线程的方法是通过调用Thread.interrupt()方法来实现的,但是大家知道,这种方法的作用有限,如果线程中没有sleep 、wait、Condition、定时锁等应用, interrupt()方法是无法中断当前的线程的。所以,ShutdownNow()并不代表线程池就一定立即就能退出,它也可能必须要等待所有正在执行的任务都执行完成了才能退出。
    但是大多数时候是能立即退出的

    public class ShutDown {
    
        public static void main(String[] args) throws InterruptedException {
            ExecutorService executorService = Executors.newFixedThreadPool(10);
            for (int i = 0; i < 100; i++) {
                executorService.execute(new ShutDownTask());
            }
            Thread.sleep(1500);
            executorService.shutdownNow();
    //        List<Runnable> runnableList = executorService.shutdownNow();
            boolean b = executorService.awaitTermination(9L, TimeUnit.SECONDS);
            System.out.println(b);
            System.out.println(executorService.isTerminated());
        }
    }
    
    class ShutDownTask implements Runnable {
    
    
        @Override
        public void run() {
            try {
                Thread.sleep(500);
                System.out.println(Thread.currentThread().getName());
            } catch (InterruptedException e) {
                System.out.println(Thread.currentThread().getName() + "被中断了");
            }
        }
    }
    
    image.png

    3.isShutdown
    线程池状态是否为SHUTDOWN
    4.isTerminated
    线程池程已停止
    5.awaitTermination

    • 等所有已提交的任务(包括正在跑的和队列中等待的)执行完
    • 或者等超时时间到
    • 或者线程被中断,抛出InterruptedException
      然后返回true(shutdown请求后所有任务执行完毕)或false(已超时)

    5. 任务太多,怎么决绝?

    拒绝时间

    1.当Executor关闭时,提交新任务会被拒绝。
    2.以及当Executor对最大线程和工作队列容量使用有限边界并且已经饱和时。

    线程池拒绝策略,通常有以下四种:

    ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常。 ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新提交被拒绝的任务 ThreadPoolExecutor.CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务

    1. 钩子方法,给线程池加点料
    • 每个任务执行前后
    • 日志、统计
    public class PauseableThreadPool extends ThreadPoolExecutor {
    
        private final ReentrantLock lock = new ReentrantLock();
        private Condition unpaused = lock.newCondition();
        private boolean isPaused;
    
    
        public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                                   TimeUnit unit,
                                   BlockingQueue<Runnable> workQueue) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        }
    
        public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                                   TimeUnit unit, BlockingQueue<Runnable> workQueue,
                                   ThreadFactory threadFactory) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
        }
    
        public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                                   TimeUnit unit, BlockingQueue<Runnable> workQueue,
                                   RejectedExecutionHandler handler) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
        }
    
        public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                                   TimeUnit unit, BlockingQueue<Runnable> workQueue,
                                   ThreadFactory threadFactory, RejectedExecutionHandler handler) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory,
                    handler);
        }
    
    
        @Override
        protected void beforeExecute(Thread t, Runnable r) {
            super.beforeExecute(t, r);
            lock.lock();
            try {
                while (isPaused) {
                    unpaused.await();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    
        private void pause() {
            lock.lock();
            try {
                isPaused = true;
            } finally {
                lock.unlock();
            }
        }
    
        public void resume() {
            lock.lock();
            try {
                isPaused = false;
                unpaused.signalAll();
            } finally {
                lock.unlock();
            }
        }
    
    
        public static void main(String[] args) throws InterruptedException {
            PauseableThreadPool pauseableThreadPool = new PauseableThreadPool(10, 20, 10l,
                    TimeUnit.SECONDS, new LinkedBlockingQueue<>());
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    System.out.println("我被执行");
                    try {
                        Thread.sleep(10);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            for (int i = 0; i < 10000; i++) {
                pauseableThreadPool.execute(runnable);
            }
            Thread.sleep(1500);
            pauseableThreadPool.pause();
            System.out.println("线程池被暂停了");
            Thread.sleep(1500);
            pauseableThreadPool.resume();
            System.out.println("线程池被恢复了");
    
        }
    }
    
    ...
    我被执行
    我被执行
    我被执行
    我被执行
    线程池被暂停了
    线程池被恢复了
    我被执行
    我被执行
    ...
    

    6. 实现原理、源码分析

    线程池组成部分
    • 线程池管理器
    • 工作线程
    • 任务队列
    • 任务接口(Task)
    线程池实现任务复用的原理
    • 相同线程执行不同任务
    final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;
            w.firstTask = null;
            w.unlock(); // allow interrupts
            boolean completedAbruptly = true;
            try {
                while (task != null || (task = getTask()) != null) {
                    w.lock();
                    // If pool is stopping, ensure thread is interrupted;
                    // if not, ensure thread is not interrupted.  This
                    // requires a recheck in second case to deal with
                    // shutdownNow race while clearing interrupt
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            afterExecute(task, thrown);
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                processWorkerExit(w, completedAbruptly);
            }
        
    

    线程池线程复用的主要代码如上;

    具体分析如下:

    如下图,假设是定长线程池定长为3,

    1.那么线程池会根据核心线程的长度来创建线程,这样就固定了线程数量,

    2.然后在runWorker的时候,while循环不停的去缓存队列中查找task;

    3.有task,则运行runlable的run方法;

    4.如果没有task,则执行 processWorkerExit(w, completedAbruptly),删除当前worker(移除workers中该worker的对象(HashSet<Worker> workers = new HashSet<Worker>();))以及更新workcount等操作

    6. 线程池状态

    1、RUNNING

    (1) 状态说明:线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理。
    (02) 状态切换:线程池的初始化状态是RUNNING。换句话说,线程池被一旦被创建,就处于RUNNING状态,并且线程池中的任务数为0!

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    1
    2、 SHUTDOWN

    (1) 状态说明:线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。
    (2) 状态切换:调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN。

    3、STOP

    (1) 状态说明:线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。
    (2) 状态切换:调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) -> STOP。

    4、TIDYING

    (1) 状态说明:当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现。
    (2) 状态切换:当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN -> TIDYING。
    当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP -> TIDYING。

    5、 TERMINATED

    (1) 状态说明:线程池彻底终止,就变成TERMINATED状态。
    (2) 状态切换:线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -> TERMINATED。

    8.使用线程池的注意点

    • 避免任务堆积
    • 避免线程数过度增加
    • 排查线程泄露

    相关文章

      网友评论

        本文标题:Java并发编程 线程池

        本文链接:https://www.haomeiwen.com/subject/xtjhlhtx.html