美文网首页
4、阻塞队列和线程池原理

4、阻塞队列和线程池原理

作者: 咸鱼Jay | 来源:发表于2022-03-02 14:14 被阅读0次

    阻塞队列和线程池原理

    阻塞队列

    队列是一种特殊的线性表,特殊之处在于它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作,和栈一样,队列是一种操作受限制的线性表。进行插入操作的端称为队尾,进行删除操作的端称为队头。

    在队列中插入一个队列元素称为入队,从队列中删除一个队列元素称为出队。因为队列只允许在一端插入,在另一端删除,所以只有最早进入队列的元素才能最先从队列中删除,故队列又称为先进先出(FIFO—first in first out)线性表。

    什么是阻塞队列

    1. 支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满。
    2. 支持阻塞的移除方法:意思是在队列为空时,获取元素的线程会等待队列变为非空。

    在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序整体处理数据的速度。

    在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。

    为了解决这种生产消费能力不均衡的问题,便有了生产者和消费者模式。生产者和消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通信,而是通过阻塞队列来进行通信,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

    阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。阻塞队列就是生产者用来存放元素、消费者用来获取元素的容器。

    方法/处理方式 抛出异常 返回特殊值 一直阻塞 超时退出
    插入方法 add(e) offer(e) put(e) offer(e,time,unit)
    移除入方法 remove() poll() take() poll(time,unit)
    检查方法 element() peek() 不可用 不可用
    • 抛出异常:当队列满时,如果再往队列里插入元素,会抛出IllegalStateException("Queuefull")异常。当队列空时,从队列里获取元素会抛出NoSuchElementException异常。

    • 返回特殊值:当往队列插入元素时,会返回元素是否插入成功,成功返回true。如果是移除方法,则是从队列里取出一个元素,如果没有则返回null。

    • 一直阻塞:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到队列可用或者响应中断退出。当队列空时,如果消费者线程从队列里take元素,队列会阻塞住消费者线程,直到队列不为空。

    • 超时退出:当阻塞队列满时,如果生产者线程往队列里插入元素,队列会阻塞生产者线程一段时间,如果超过了指定的时间,生产者线程就会退出。

    常用阻塞队列

    • ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
    • LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。
    • PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
    • DelayQueue:一个使用优先级队列实现的无界阻塞队列。
    • SynchronousQueue:一个不存储元素的阻塞队列。
    • LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
    • LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

    以上的阻塞队列都实现了BlockingQueue接口,也都是线程安全的。

    有界无界?

    有限队列就是长度有限,满了以后生产者会阻塞,无界队列就是里面能放无数的东西而不会因为队列长度限制被阻塞,当然空间限制来源于系统资源的限制,如果处理不及时,导致队列越来越大越来越大,超出一定的限制致使内存超限,操作系统或者JVM帮你解决烦恼,直接把你 OOM kill 省事了。

    无界也会阻塞,为何?因为阻塞不仅仅体现在生产者放入元素时会阻塞,消费者拿取元素时,如果没有元素,同样也会阻塞。

    ArrayBlockingQueue

    是一个用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。默认情况下不保证线程公平的访问队列,所谓公平访问队列是指阻塞的线程,可以按照阻塞的先后顺序访问队列,即先阻塞线程先访问队列。非公平性是对先等待的线程是非公平的,当队列可用时,阻塞的线程都可以争夺访问队列的资格,有可能先阻塞的线程最后才访问队列。初始化时有参数可以设置

    LinkedBlockingQueue

    是一个用链表实现的有界阻塞队列。此队列的默认和最大长度为Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序。

    Array实现和Linked实现的区别

    1. 队列中锁的实现不同
      ArrayBlockingQueue实现的队列中的锁是没有分离的,即生产和消费用的是同一个锁;
      LinkedBlockingQueue实现的队列中的锁是分离的,即生产用的是putLock,消费是takeLock
    2. 在生产或消费时操作不同
      ArrayBlockingQueue实现的队列中在生产和消费的时候,是直接将枚举对象插入或移除的;
      LinkedBlockingQueue实现的队列中在生产和消费的时候,需要把枚举对象转换为Node<E>进行插入或移除,会影响性能
    3. 队列大小初始化方式不同
      ArrayBlockingQueue实现的队列中必须指定队列的大小;
      LinkedBlockingQueue实现的队列中可以不指定队列的大小,但是默认是Integer.MAX_VALUE

    PriorityBlockingQueue

    PriorityBlockingQueue是一个支持优先级的无界阻塞队列。默认情况下元素采取自然顺序升序排列。也可以自定义类实现compareTo()方法来指定元素排序规则,或者初始化PriorityBlockingQueue时,指定构造参数Comparator来对元素进行排序。需要注意的是不能保证同优先级元素的顺序。

    DelayQueue

    是一个支持延时获取元素的无界阻塞队列。队列使用PriorityQueue来实现。队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。

    DelayQueue非常有用,可以将DelayQueue运用在以下应用场景。

    ==缓存系统的设计==:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。

    SynchronousQueue

    是一个不存储元素的阻塞队列。每一个put操作必须等待一个take操作,否则不能继续添加元素。SynchronousQueue可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。队列本身并不存储任何元素,非常适合传递性场景。

    LinkedTransferQueue

    多了tryTransfertransfer方法,

    1. transfer方法
      如果当前有消费者正在等待接收元素(消费者使用take()方法或带时间限制的poll()方法时),transfer方法可以把生产者传入的元素立刻transfer(传输)给消费者。如果没有消费者在等待接收元素,transfer方法会将元素存放在队列的tail节点,并等到该元素被消费者消费了才返回。

    2. tryTransfer方法
      tryTransfer方法是用来试探生产者传入的元素是否能直接传给消费者。如果没有消费者等待接收元素,则返回false。和transfer方法的区别是tryTransfer方法无论消费者是否接收,方法立即返回,而transfer方法是必须等到消费者消费了才返回。

    LinkedBlockingDeque

    LinkedBlockingDeque是一个由链表结构组成的双向阻塞队列。所谓双向队列指的是可以从队列的两端插入和移出元素。双向队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。

    多了addFirst、addLast、offerFirst、offerLast、peekFirstpeekLast等方法,以First单词结尾的方法,表示插入、获取(peek)或移除双端队列的第一个元素。以Last单词结尾的方法,表示插入、获取或移除双端队列的最后一个元素。另外,插入方法add等同于addLast,移除方法remove等效于removeFirst。但是take方法却等同于takeFirst,不知道是不是JDK的bug,使用时还是用带有FirstLast后缀的方法更清楚。在初始化LinkedBlockingDeque时可以设置容量防止其过度膨胀。另外,双向阻塞队列可以运用在“工作窃取”模式中。

    线程池

    为什么要用线程池?

    Java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。在开发过程中,合理地使用线程池能够带来3个好处。

    1. 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
    2. 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。假设一个服务器完成一项任务所需时间为:T1 创建线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间。 如果:T1 + T3 远大于 T2,则可以采用线程池,以提高服务器性能。线程池技术正是关注如何缩短或调整T1,T3时间的技术,从而提高服务器程序性能的。它把T1,T3分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段,这样在服务器程序处理客户请求时,不会有T1,T3的开销了。
    3. 提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。

    ThreadPoolExecutor 的类关系

    Executor是一个接口,它是Executor框架的基础,它将任务的提交与任务的执行分离开来。

    ExecutorService接口继承了Executor,在其上做了一些shutdown()submit()的扩展,可以说是真正的线程池接口;

    AbstractExecutorService抽象类实现了ExecutorService接口中的大部分方法;

    ThreadPoolExecutor是线程池的核心实现类,用来执行被提交的任务。

    ScheduledExecutorService接口继承了ExecutorService接口,提供了带"周期执行"功能ExecutorService

    ScheduledThreadPoolExecutor是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令。ScheduledThreadPoolExecutorTimer更灵活,功能更强大。

    线程池的创建各个参数含义

    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)
    

    corePoolSize

    线程池中的核心线程数,默认情况下,核心线程一直存活在线程池中,即便他们在 线程池中处于闲置状态。除非我们将ThreadPoolExecutorallowCoreThreadTimeOut属性设为true的时候,这时候处于闲置的核心线程在等待新任务到来时会有超时策略,这个超时时间由keepAliveTime来指定。一旦超过所 设置的超时时间,闲置的核心线程就会被终止。

    maximumPoolSize

    线程池中所容纳的最大线程数,如果活动的线程达到这个数值以后,后续的新任务 将会被阻塞。包含核心线程数+非核心线程数。

    keepAliveTime

    非核心线程闲置时的超时时长,对于非核心线程,闲置时间超过这个时间,非核心 线程就会被回收。只有对ThreadPoolExecutorallowCoreThreadTimeOut属性设 为true的时候,这个超时时间才会对核心线程产生效果。

    TimeUnit

    keepAliveTime的时间单位。他是一个枚举,可以使用的单位有天(TimeUnit.DAYS),小时(TimeUnit.HOURS),分钟(TimeUnit.MINUTES),毫秒(TimeUnit.MILLISECONDS),微秒 (TimeUnit.MICROSECONDS, 千分之一毫秒)和毫微秒 (TimeUnit.NANOSECONDS, 千分之一微秒);

    workQueue

    workQueue必须是BlockingQueue阻塞队列。当线程池中的线程数超过它的corePoolSize的时候,线程会进入阻塞队列进行阻塞等待。通过workQueue,线程池实现了阻塞功能。

    一般来说,我们应该尽量使用有界队列,因为使用无界队列作为工作队列会对线程池带来如下影响。

    1. 当线程池中的线程数达到corePoolSize后,新任务将在无界队列中等待,因此线程池中的线程数不会超过corePoolSize
    2. 由于1,使用无界队列时maximumPoolSize将是一个无效参数。
    3. 由于1和2,使用无界队列时keepAliveTime将是一个无效参数。
    4. 更重要的,使用无界queue可能会耗尽系统资源,有界队列则有助于防止资源耗尽,同时即使使用有界队列,也要尽量控制队列的大小在一个合适的范围。

    threadFactory

    创建线程的工厂,通过自定义的线程工厂可以给每个新建的线程设置一个具有识别度的线程名,当然还可以更加自由的对线程做更多的设置,比如设置所有的线程为守护线程。

    Executors静态工厂里默认的threadFactory,线程的命名规则是“pool-数字-thread-数字”。

    RejectedExecutionHandler

    线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略:

    1. AbortPolicy:直接抛出RejectedExecutionException异常,默认策略;
    2. CallerRunsPolicy:用调用者所在的线程来执行任务;
    3. DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
    4. DiscardPolicy:直接丢弃任务,不进行处理;

    当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。

    线程池的工作机制

    1. 如果在线程池中的线程数量没有达到核心的线程数量,这时候就回启动一个核心线程来执行任务。

    2. 如果线程池中的线程数量已经超过核心线程数,这时候任务就会被插入到任务队列中排队等待执行。

    3. 由于任务队列已满,无法将任务插入到任务队列中。这个时候如果线程池中的线程数量没有达到线程池所设定的最大值,那么这时候就会立即启动一个非核心线程 来执行任务。

    4. 如果线程池中的数量达到了所规定的最大值,那么就会拒绝执行此任务,这时候就会调用RejectedExecutionHandler中的rejectedExecution方法来通知调用者。

    ThreadPoolExecutor的使用

    ExecutorService service = new ThreadPoolExecutor(5, 10, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
    

    对于ThreadPoolExecutor有多个构造方法,对于上面的构造方法中的其他参数都采 用默认值。可以通过executesubmit两种方式来向线程池提交一个任务。

    execute()

    当我们使用execute来提交任务时,由于execute方法没有返回值,所以说 我们也就无法判定任务是否被线程池执行成功。

    service.execute(new Runnable() {
        public void run() {
            System.out.println("execute方式");
        }
    });
    

    submit()

    用于提交需要返回值的任务。线程池会返回一个future类型的对象,通过这个future对象可以判断任务是否执行成功,并且可以通过future的get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。

    Future<Integer> future = service.submit(new Callable<Integer>() {
        @Override
        public Integer call() throws Exception {
            System.out.println("submit方式");
            return 2;
        }
    });
    try {
        Integer number = future.get();
    } catch (ExecutionException e) { // TODO Auto-generated catch block 
        e.printStackTrace();
    }
    

    关闭线程池

    调用线程池的shutdown()shutdownNow()方法来关闭线程池。它们的原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法终止。但是它们存在一定的区别:

    shutdown原理:将线程池状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程。

    shutdownNow原理:将线程池的状态设置成STOP状态,然后中断所有任务(包括正 在执行的)的线程,并返回等待执行任务的列表。

    中断采用interrupt方法,所以无法响应中断的任务可能永远无法终止。 但调用上述 的两个关闭之一,isShutdown()方法返回值为true,当所有任务都已关闭,表示线 程池关闭完成,则isTerminated()方法返回值为true。当需要立刻中断所有的线程, 不一定需要执行完任务,可直接调用shutdownNow()方法。

    四种线程池类

    Java中四种具有不同功能常见的线程池。他们都是直接或者间接配置ThreadPoolExecutor来实现他们各自的功能。这四种线程池分别是newFixedThreadPool,newCachedThreadPool,newScheduledThreadPoolnewSingleThreadExecutor。这四个线程池可以通过Executors类获取。

    1. newFixedThreadPool

    通过Executors中的newFixedThreadPool方法来创建,该线程池是一种线程数量固 定的线程池。

    ExecutorService service = Executors.newFixedThreadPool(4);
    

    在这个线程池中 所容纳最大的线程数就是我们设置的核心线程数。 如果线程池的 线程处于空闲状态的话,它们并不会被回收,除非是这个线程池被关闭。如果所有 的线程都处于活动状态的话,新任务就会处于等待状态,直到有线程空闲出来。

    由于newFixedThreadPool只有核心线程,并且这些线程都不会被回收,也就是它能够更快速的响应外界请求。从下面的newFixedThreadPool方法的实现可以看出,newFixedThreadPool只有核心线程,并且不存在超时机制,采用LinkedBlockingQueue,所以对于任务队列的大小也是没有限制的。

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
            0L, TimeUnit.MILLISECONDS, 
            new LinkedBlockingQueue<Runnable>()); 
    }
    

    2. newCachedThreadPool

    通过Executors中的newCachedThreadPool方法来创建。

    public static ExecutorService newCachedThreadPool() { 
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 
            60L, TimeUnit.SECONDS,
            new SynchronousQueue<Runnable>());
    }
    

    通过上面的newCachedThreadPool方法在这里我们可以看出它的核心线程数为0,线程池的最大线程数Integer.MAX_VALUE。而Integer.MAX_VALUE是一个很大 的数,也差不多可以说 这个线程池中的最大线程数可以任意大。

    当线程池中的线程都处于活动状态的时候,线程池就会创建一个新的线程来处理任务。该线程池中的线程超时时长为60秒,所以当线程处于闲置状态超过60秒的时候便会被回收。这也就意味着若是整个线程池的线程都处于闲置状态超过60秒以后, 在newCachedThreadPool线程池中是不存在任何线程的,所以这时候它几乎不占 用任何的系统资源。

    对于newCachedThreadPool他的任务队列采用的是SynchronousQueue,上面说到在SynchronousQueue内部没有任何容量的阻塞队列。SynchronousQueue内部相当于一个空集合,我们无法将一个任务插入到SynchronousQueue中。所以说在线程池中如果现有线程无法接收任务,将会创建新的线程来执行任务。

    3. newScheduledThreadPool

    通过Executors中的newScheduledThreadPool方法来创建。

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize); 
    }
    public ScheduledThreadPoolExecutor(int corePoolSize) { 
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, 
            new DelayedWorkQueue()); 
    }
    

    它的核心线程数是固定的,对于非核心线程几乎可以说是没有限制的,并且当非核 心线程处于限制状态的时候就会立即被回收。

    创建一个可定时执行或周期执行任务的线程池:

    ScheduledExecutorService service = Executors.newScheduledThreadP ool(4);
    service.schedule(new Runnable() { 
        public void run() {
            System.out.println(Thread.currentThread().getName()+"延迟 三秒执行"); 
        }
    }, 3, TimeUnit.SECONDS);
    service.scheduleAtFixedRate(new Runnable() {
        public void run() {
            System.out.println(Thread.currentThread().getName()+"延迟 三秒后每隔2秒执行"); 
        }
    }, 3, 2, TimeUnit.SECONDS);
    

    输出结果:

    pool-1-thread-2延迟三秒后每隔2秒执行
    pool-1-thread-1延迟三秒执行
    pool-1-thread-1延迟三秒后每隔2秒执行
    pool-1-thread-2延迟三秒后每隔2秒执行
    pool-1-thread-2延迟三秒后每隔2秒执行

    schedule(Runnable command, long delay, TimeUnit unit) :延迟一定时间 后执行Runnable任务;

    schedule(Callable callable, long delay, TimeUnit unit) :延迟一定时 间后执行Callable任务;

    scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) :延迟一定时间后,以间隔period时间的频率周期性地 执行任务;

    scheduleWithFixedDelay(Runnable command, long initialDelay, long delay,TimeUnit unit):与scheduleAtFixedRate()方法很类似,但是不同的是scheduleWithFixedDelay()方法的周期时间间隔是以上一个任务执行结束到下一个任务开始执行的间隔,而scheduleAtFixedRate()方法的周期时间间隔是以上一个任务开始执行到下一个任务开始执行的间隔,也就是这一些任务系列的触发时间都是 可预知的。

    ScheduledExecutorService功能强大,对于定时执行的任务,建议多采用该方法。

    4. newSingleThreadExecutor

    通过Executors中的newSingleThreadExecutor方法来创建,在这个线程池中只有一个核心线程,对于任务队列没有小限制,也就意味着这一个任务处于活动状态 时,其他任务都会在任务队列中排队等候依次执行。

    newSingleThreadExecutor将所有的外界任务统一到一个线程中支持,所以在这个 任务执行之间我们不需要处理线程同步的问题。

    public static ExecutorService newSingleThreadExecutor() { 
        return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1, 0L,
        TimeUnit.MILLISECONDS, 
        new LinkedBlockingQueue<Runnable>())); 
    }
    

    合理地配置线程池

    要想合理地配置线程池,就必须首先分析任务特性,可以从以下几个角度来分析。

    • 任务的性质:CPU密集型任务、IO密集型任务和混合型任务。
    • 任务的优先级:高、中和低。
    • 任务的执行时间:长、中和短。
    • 任务的依赖性:是否依赖其他系统资源,如数据库连接。

    性质不同的任务可以用不同规模的线程池分开处理。

    任务类别 说明
    CPU密集型任务 应配置尽可能小的线程,如配置N+1个线程的线程池。
    IO密集型任务 由于IO操作速度远低于CPU速度,那么在运行这类任务时,CPU绝大多数时间处于空闲状态,那么线程池可以配置尽量多些的线程,以提高CPU利用率,如2*N。
    混合型的任务 如果可以拆分,将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐量将高于串行执行的吞吐量。如果这两个任务执行时间相差太大,则没必要进行分解。可以通过Runtime.getRuntime().availableProcessors()方法获得当前设备的CPU个数。

    优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高的任务先执行。

    执行时间不同的任务可以交给不同规模的线程池来处理,或者可以使用优先级队列,让执行时间短的任务先执行。

    建议使用有界队列。有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点儿,比如几千。

    如果当时我们设置成无界队列,那么线程池的队列就会越来越多,有可能会撑满内存,导致整个系统不可用,而不只是后台任务出现问题。

    Java线程池实现原理及其在美团业务中的实践

    相关文章

      网友评论

          本文标题:4、阻塞队列和线程池原理

          本文链接:https://www.haomeiwen.com/subject/iezlrrtx.html