一、常见的BlockingQueue
1. ArrayBlockingQueue
由数组支持的有界队列,内部维护一个定长数组。
生产者与消费者公用同一个锁控制数据同步。
处理过程:当生产者往队列存放数据时,当队列达到最大容量时才会阻塞生产者队列,并且唤醒消费者;当消费者将队列元素全部取出,队列为空时才会阻塞消费者,并且唤醒生产者。
先进先出(FIFO)
默认非公平锁,可指定为公平锁。
使用Condition的等待/通知机制,使得ArrayBlockingQueue的数据写入和获取操作足够轻巧,不需分别使用独立生产-消费锁
2. LinkedBlockingQueue
由链接节点支持的可选有界队列,内部维护一个缓冲队列,由链表构成。
生产者与消费者采用独立的锁控制数据同步。
处理过程:当生产者往队列存放数据时,只有队列达到最大容量时(构造函数指定),才会阻塞生产者队列,当消费者从队列消费掉一条数据时,生产者线程会被唤醒。消费者同理,当缓冲区无数据时,阻塞,有数据时唤醒。
注:默认LinkedBlockingQueue构造方法没有指定容量大小,默认无限大Integer.MAX_VALUE,而生产者消费者是同时进行的,所以要避免生产者生产速度远大于消费者消费速度的情况,防止堆内存耗尽。
先进先出(FIFO)
非公平锁
添加和删除队列中的元素的时候,会创建和销毁节点对象,在高并发和大量数据的时候,GC压力很大
3. PriorityBlockingQueue
由优先级堆支持的无界优先级队列,优先级由传入的Compator决定,内部维护一个数组。
处理过程:不会阻塞生产者生产数据,只会在没有可消费的数据时,阻塞消费者。
注:避免生产者生产速度远大于消费者消费速度的情况,防止堆内存耗尽。
非公平锁
4. DelayQueue
由优先级堆支持的、基于时间的无界调度队列
(DelayQueue入队的对象必须要实现Delayed接口,而Delayed集成自Comparable接口)
处理过程:只有当期指定的延迟时间到了,才会从队列中获取该元素。DelayQueue是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。
场景:
缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。
定时任务调度:使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,比如TimerQueue就是使用DelayQueue实现的
非公平锁
5. SynchronousQueue
无缓冲的阻塞队列,不存储元素。
处理过程:
生产者生产操作必须等待一个消费操作,否则不能继续添加元素。
支持公平锁,非公平锁
特点
ArrayBlockingQueue
生产者与消费者公用一个锁控制数据同步
生产->队列满了->阻塞
消费->队列空了->阻塞
生产与消费是交替进行的,因为是同一把锁。
LinkedBlockingQueue
生产者与消费者采用独立锁控制数据同步
生产->队列满了->阻塞
消费->队列空了->阻塞
生产与消费是同时进行的,有生产就可以消费。
SynchronousQueue
无缓冲队列
生产->消费(未消费则阻塞生产)
生产者生产一个必须等待一个消费操作,否则不能继续添加元素
PriorityBlockingQueue
优先级无界队列,由传入的Compator决定优先级
生产->消费->消费空了->阻塞
生产不阻塞,消费空了阻塞,生产者生产过快容易内存耗尽
DelayQueue
基于时间优先级的无界队列,入队的对象要实现Delayed接口,该接口实现了Comparable接口
生产->延迟时间到了->消费->消费空了->阻塞
生产不阻塞,消费空了阻塞。定时任务可基于该队列处理
二、阻塞的实现
ReentranLock + Condition 实现队列的阻塞,ReentranLock是锁,Condition是条件状态,通过等待/通知机制,来实现线程之间的通信。
ReentranLock + Condition的等待/通知机制和Object的wait()与notify()是类似的,通过synchronized,在锁中使用wait()与notify()达到线程之间通信,在ReentranLock的lock()和unlock()之间通过类似的await()和signal()达到线程之间的通信。
Condition条件队列
条件队列是不会被唤醒竞争锁的,如果条件队列的线程需要被唤醒竞争锁资源需要被转移到CLH等待队列中才可以。
在阻塞队列中A调用put()方法的时候,如果队列已满,则A线程挂起,等待恢复,如果B线程调用take()后,消耗一个队列元素后,会通知put()方法中挂起的A线程,因为这个时候B线程已经消耗一个元素可以在向队列中添加元素。相反如果队列一空 B线程调用tack()方法会阻塞,当A线程调用put()调用后通知tack()方法中的B线程,有元素可以获取。这就是阻塞的实现过程。
三、BlockingQueue的核心方法
1. 生产者生产数据
boolean add(E e);
将指定的元素插入到此队列中,如果成功则返回true,如果当前没有空间可用则抛出IllegalStateException。当使用容量受限的队列时,通常更可取的做法是使用offer()方法。
boolean offer(E e);
将指定的元素插入到此队列中,如果成功则返回true,如果当前没有空间可用则返回false。
void put(E e) throws InterruptedException;
将指定的元素插入到此队列中,如果当前没有空间则阻塞至有空间可用
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
将指定的元素插入到此队列中,如果需要空间可用,则需要等待指定的等待时间。
2. 消费者消费数据
E take() throws InterruptedException;
获取队首元素并删除,如当前没有元素则等待至有元素可获取
E poll(long timeout, TimeUnit unit) throws InterruptedException;
获取队首元素并删除,如果当前没有元素,则需要等待指定时间
int drainTo();
一次性从队列中获取所有可用元素(也可指定个数),并将它们添加到给定集合中。此操作可能比重复轮询此队列更有效率。
参考:https://www.iteye.com/blog/wsmajunfeng-1629354
https://www.jianshu.com/p/32665a52eba1
网友评论