美文网首页
线程池与阻塞队列

线程池与阻塞队列

作者: 舞鹤Roc | 来源:发表于2022-01-06 16:35 被阅读0次

    一、线程池

    1、为什么需要使用线程池

    • 减少创建/销毁线程的系统开销
    • 避免抢占系统资源发生阻塞(设置最大线程数、超时时间等)
    • 对线程进行不同的策略的控制(延时、定时等)\color{#FF0000}{通过阻塞队列实现}

    2、如何创建线程池

    • 构造方法:ThreadPoolExecutor提供了四个构造函数,我们只需要弄清楚7参构造函数,便可以理解其他构造函数。
    • 默认实现:通过Executors提供了默认实现的线程池,默认实现是直接或间接配置ThreadPoolExecutor的参数实现的。
    // java.util.concurrent.ThreadPoolExecutor
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
    

    3、线程池参数

    • int corePoolSize:线程池中常驻核心线程数
    • int maximumPoolSize:线程池能够容纳同时执行的最大线程数,此值必须大于等于1
    • long keepAliveTime:多余的空闲线程的存活时间(当前线程数>corePoolSize时,空闲线程的闲置时间达到keepAliveTime时,会被销毁)
    • TimeUnit unit:keepAliveTime的时间单位(TimeUnit枚举:纳秒~天)
    • BlockingQueue workQueue:任务队列,被提交但尚未被执行的任务(\color{#FF0000}{阻塞队列}
    • ThreadFactory threadFactory:把他当成星期六,因为一般不用,表示生成线程池中工作线程的线程工厂,用于创建线程一般默认即可。
    • RejectedExecutionHandler handler:拒绝策略,表示当workQueue被塞满时候,线程池选择的处理策略。

    4、 拒绝策略

    • AbortPolicy:该策略是线程池的默认策略。使用该策略时,如果线程池队列满了丢掉这个任务并且抛出RejectedExecutionException异常。
    • DiscardPolicy:这个策略和AbortPolicy的slient版本,如果线程池队列满了,会直接丢掉这个任务并且不会有任何异常。
    • DiscardOldestPolicy:这个策略从字面上也很好理解,丢弃最老的。也就是说如果队列满了,会将最早进入队列的任务删掉腾出空间,再尝试加入队列。因为队列是队尾进,队头出,所以队头元素是最老的,因此每次都是移除对头元素后再尝试入队。
    // java.util.concurrent.ThreadPoolExecutor.DiscardOldestPolicy
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            //移除队头元素
            e.getQueue().poll();
            //再尝试入队
            e.execute(r);
        }
    }
    
    • CallerRunsPolicy:使用此策略,如果添加到线程池失败,那么主线程会自己去执行该任务,不会等待线程池中的线程去执行。
    • 自定义:自己定义一个拒绝策略,只要实现RejectedExecutionHandler接口,并且实现rejectedExecution方法就可以了。比如自己打日志或者记录到数据库中等等

    5、常见四种线程池

    newFixedThreadPool(nThread) 执行长期任务 =》\color{#FF0000}{LinkedBlockQueue}

    可控制线程最大并发数(同时执行的线程数)
    超出的线程会在队列中等待

    newScheduledThreadPool() 一个任务一个任务执行的场景 =》\color{#FF0000}{DelayedWorkQueue}

    支持定时及周期性任务执行

    newCachedThreadPool() 执行很多短期异步的小程序或者负载较轻的服务器 =》\color{#FF0000}{SynchronousQueue}

    线程数无限制
    有空闲线程则复用空闲线程,若无空闲线程则新建线程
    一定程序减少频繁创建/销毁线程,减少系统开销

    newWorkStealingPool(int) Java8新增,使用目前机器上可用的处理器作为它的并行级别

    ForkJoinPool工作窃取者,使用分治法,它的优势在于,可以充分利用多cpu,多核cpu的优势,把一个任务拆分成多个“小任务”分发到不同的cpu核心上执行,执行完后再把结果收集到一起返回。

    二、阻塞队列

    1、阻塞队列的作用

    在线程池中,往往就会用阻塞队列来保存那些暂时没有空闲线程可以直接执行的任务,等到线程空闲之后再从阻塞队列中弹出任务来执行。一旦队列为空,那么线程就会被阻塞,直到有新任务被插入为止。
    阻塞队列是一种数据结构,用来存储任务,由线程池来控制对阻塞队列的操作(插入、弹出等)。

    2、阻塞队列的核心方法

    // java.util.concurrent.BlockingQueue
    public interface BlockingQueue<E> extends Queue<E> {
        //将给定元素设置到队列中,如果设置成功返回true, 否则抛出异常。如果是往限定了长度的队列中设置值,推荐使用offer()方法。
        boolean add(E e);
        //将给定的元素设置到队列中,如果设置成功返回true, 否则返回false. e的值不能为空,否则抛出空指针异常。
        boolean offer(E e);
        //将元素设置到队列中,如果队列中没有多余的空间,该方法会一直阻塞,直到队列中有多余的空间。
        void put(E e) throws InterruptedException;
        //将给定元素在给定的时间内设置到队列中,如果设置成功返回true, 否则返回false.
        boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
        //从队列中获取值,如果队列中没有值,线程会一直阻塞,直到队列中有值,并且该方法取得了该值。
        E take() throws InterruptedException;
        //在给定的时间里,从队列中获取值,如果没有取到会抛出异常。
        E poll(long timeout, TimeUnit unit) throws InterruptedException;
        //获取队列中剩余的空间。
        int remainingCapacity();
        //从队列中移除指定的值。
        boolean remove(Object o);
        //判断队列中是否拥有该值。
        public boolean contains(Object o);
        //将队列中值,全部移除,并发设置到给定的集合中。
        int drainTo(Collection<? super E> c);
        //指定最多数量限制将队列中值,全部移除,并发设置到给定的集合中。
        int drainTo(Collection<? super E> c, int maxElements);
    }
    

    3、JDK8提供了7个阻塞队列。

    分别是:

    • ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
    • LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。

    如果构造一个LinkedBlockingQueue对象,而没有指定其容量大小,LinkedBlockingQueue会默认一个类似无限大小的容量(Integer.MAX_VALUE),这样的话,如果生产者的速度一旦大于消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已被消耗殆尽了。

    • PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。

    基于优先级的阻塞队列(优先级的判断通过构造函数传入的Compator对象来决定),但需要注意的是PriorityBlockingQueue并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。因此使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。在实现PriorityBlockingQueue时,内部控制线程同步的锁采用的是公平锁。

    • DelayQueue:一个使用优先级队列实现的无界阻塞队列。

    DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。

    • SynchronousQueue:一个不存储元素的阻塞队列。

    类似于无中介的直接交易,每一个put操作必须等待take操作,否则不能添加元素。

    • LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
    • LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

    4、Redis阻塞队列

    redis有一种数据结构是List(有序队列),常用命令为lpush,rpush,lpop,rpop,lrange,brpop,当我们一边 lpush(生成者) 另一边 bpop(消费者),就可以实现一种阻塞队列的用法。

    Redis Brpop 命令移出并获取列表的最后一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。

    但是这样会潜在着,消费失败,消息丢失的风险,所以不会使用bpop,而是使用BRPOPLPUSHLREM来实现。
    参见redis官方文档:安全的队列

    相关文章

      网友评论

          本文标题:线程池与阻塞队列

          本文链接:https://www.haomeiwen.com/subject/dnlrcrtx.html