image.pngLinkedBlockingQueue是一个单向链表结构的阻塞队列,继承了抽象类AbstractQueue,实现了BlockingQueue和Serializable接口。因为底层是链表结构,所以优点是存取快,缺点是查询慢。先存先取。
构造方法和成员变量
数据单元Node
static class Node<E> {
//数据
E item;
//下一个元素
Node<E> next;
//构造方法
Node(E x) { item = x; }
}
成员变量
//队列的最大容量
private final int capacity;
//这是一个Integer的原子类,记录了当前队列总共有多少元素
private final AtomicInteger count = new AtomicInteger();
//队列的头部
transient Node<E> head;
//队列的尾部
private transient Node<E> last;
//出列锁
private final ReentrantLock takeLock = new ReentrantLock();
//出列锁的Condition,用于暂停出列线程
private final Condition notEmpty = takeLock.newCondition();
//入列锁
private final ReentrantLock putLock = new ReentrantLock();
//入列锁的Condition,用于暂停入列线程
private final Condition notFull = putLock.newCondition();
构造方法
//无参构造,默认队列长度为Integer.MAX_VALUE
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
//传入一个int型整数作为队列的最大容量
public LinkedBlockingQueue(int capacity) {
//入参校验,小于0抛出异常
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
//初始化头尾数据,在队列没有数据时头和尾是同一个空对象
last = head = new Node<E>(null);
}
//传入一个实现了Collection接口的集合,将集合的数据导入到队列中
public LinkedBlockingQueue(Collection<? extends E> c) {
//首先调用一个参数的构造方法初始化
this(Integer.MAX_VALUE);
//获取入列锁
final ReentrantLock putLock = this.putLock;
//上锁
putLock.lock(); // Never contended, but necessary for visibility
//遍历集合将数据导入到队列
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}
因为LinkedBlockingQueue底层是一个列表结构,不涉及到扩容,所以成员变量和构造方法总体上还是比较简单的,主要做了初始化了队列的容量以及锁和头尾数据。当时还能通过在构造方法中传入一个集合来将集合的数据导入队列
队列最主要的是插入和取出这两种操作。根据这些操作在队列达到最大容量或者为空的时候,有分为以下几种类型
操作类型 | Throws Exception | Special Value | Blocked | Timed out |
---|---|---|---|---|
插入 | add(o) | offer(o) | put(o) | offer(o,timeout,unit) |
取出 | take() | poll(timeout,unit) |
-
Throws Exception
:如果元素插入失败,则抛出异常,成功则返回true -
Special Value
:返回一个boolean类型的数据,如果是true则插入成功,如果为false则插入失败, -
Blocked
:在取出或插入元素时,如果队列满了,或者队列取空了,则阻塞当前操作的线程,直到当前队列中有空闲位置或者当前队列中有待取元素时继续执行 -
Timed out
:等待超时,在Blocked的情况下。如果等待超过了设定的时间,则线程不再等待,并且都返回一个null
主要方法
因为LinkedBlockingQueue作为一个队列,最主要的必然是入列和出列这些方法,接下来将从这些方法的源码展开。其他不常用的方法稍稍提及
1、add(E e)
往队列中添加一个元素,如果添加成功则返回true,如果添加失败则抛出异常
public boolean add(E e) {
//调用offer方法,如果添加成功则返回true
if (offer(e))
return true;
else
//否则添加失败抛出异常
throw new IllegalStateException("Queue full");
}
可以看到add(o)方法主要还是调用了offer(o)方法,对插入失败做了抛异常的处理
2、offer(E e)
往队列中添加一条数据,如果添加成功则返回true,失败返回false
public boolean offer(E e) {
//插入数据为null,抛出空指针异常
if (e == null) throw new NullPointerException();
//引用用于记录队列大小的原子类
final AtomicInteger count = this.count;
//如果队列装满了,则不能继续插入,返回false
if (count.get() == capacity)
return false;
//用于记录操作前队列的大小,预先设置为负数,
//最后根据正负判断是否插入成功
int c = -1;
//new一个新的节点
Node<E> node = new Node<E>(e);
//引用插入锁
final ReentrantLock putLock = this.putLock;
//上锁
putLock.lock();
try {
//如果队列还没有装满,就往链表中插入一条数据
if (count.get() < capacity) {
enqueue(node);
/**
队列大小自增1(注意这里如果不了解原
子类的朋友,getAndIncrement是先
取再自增,所以这里的c还是插入前的队
列大小)c = count.getAndIncrement();
(c+1代表插入后的队列大小)所以这里
的逻辑是如果当插入后的队列大小还未
达到最大容量的时候,去唤醒其他还在
等待插入的线程。这个问题在多线程中
会遇到,如果没有这个判断,在多线程
环境中会造成线 程假死。具体原因在
offer(o,timeout,unit)说完后分析*/
if (c + 1 < capacity)
notFull.signal();
}
} finally {
//释放锁
putLock.unlock();
}
/**如果插入前队列为空,则说明会有取出数据的
线程还在等待,此时队列中已经插入了一条数据
唤醒还在等待取出数据的线程可以继续从队列中
*/取出数据了
if (c == 0)
signalNotEmpty();
//因为队列长度预先设置为负数,如果插入成功则
//一定是判断是>=0的数,否则插入失败
return c >= 0;
}
//往链表中添加一条数据的方法
private void enqueue(Node<E> node) {
last = last.next = node;
}
//通知还在出列的线程可以继续从队列中拿数据了
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
作为一个链表,LinkedBlockingQueue的offer()入列方法还是比较容易理解的。在这里使用到的Lock锁和原子类因为不是本篇文章的重点。所以如果有不了解的朋友可以去看《Java多线程编程核心技术》这本书。
3、offer(E e, long timeout, TimeUnit unit)
往队列中添加一个元素,如果队列塞满了,则等待一段时间,如果在等待的这段时间内队列内的元素还是慢的则等待超时,返回false
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
将传入的时间转换为纳秒
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
//以上代码除了时间转换其他的和offer(E e)一样
/**这里是用lockInterruptibly()上锁的原因
是lockInterruptibly()方法在上锁的时候如果
当前线程中断则会抛异常,因为这个方法设置的超
时是用线程等待来实现的,如果在当前线程等待的
过程中突然中断了,则能把线程中断的状态响应给
调用方法的地方进行处理。而lock()上锁则不能
响应线程的中断状态*/
putLock.lockInterruptibly();
try {
//当队列塞满的时候
while (count.get() == capacity) {
//如果设插入置的等待超时时间<=0,就相当于不等待,直接返回插入失败
if (nanos <= 0)
return false;
/** 线程等待,这里的awaitNanos(nanos)方法执行后回返回
一个long类型的<=0的数。所以这里的awaitNanos(nanos)
方法实际上最多只会执行一次,下一次
会进入上面的if里面return掉,也就
达到了超时的效果,最终超时后会返回
false插入失败*/
nanos = notFull.awaitNanos(nanos);
}
//往链表中插入数据
enqueue(new Node<E>(e));
//下面的逻辑跟offer(E e)中的一样
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}
offer(E e, long timeout, TimeUnit unit)和offer(E e)的逻辑差不多,只不过在队列塞满的情况下前者做的是等待超时,而后者返回false,这也导致了它们两者选择上锁方式上的不同
4、 put(E e)
往队列中添加一个元素,如果队列塞满了,则线程无限期等
待直到被唤醒
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
/**唯一与offer(E e, long timeout,
TimeUnit unit)中不一样的是在队列塞满
的情况下这里执行的是线程无限期等待,直到
被唤醒,其他逻辑都是一样的*/
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
put()与其他入列方法最大的不同就是在队列塞满的时候线程会无限期等待。直到有其他线程在执行出列方法时队列有了空闲空间,就会唤醒put()方法继续执行入列动作
5、poll()
从队列中取出第一个元素并返回,同时将这个元素从队列中删除。如果取出成功则返回取出的元素,如果失败则返回null
public E poll() {
//引用原子类
final AtomicInteger count = this.count;
//当队列中没有数据时,啥也取不到,返回null
if (count.get() == 0)
return null;
//从队列中取出的第一个元素,预先设置成null
E x = null;
//队列长度,预先设置成负数
int c = -1;
//引用“取锁”
final ReentrantLock takeLock = this.takeLock;
//上锁
takeLock.lock();
try {
//队列中还有元素
if (count.get() > 0) {
//执行出列方法
x = dequeue();
//获取队列长度并自减
c = count.getAndDecrement();
//跟入列中的一样
if (c > 1)
notEmpty.signal();
}
} finally {
//释放锁
takeLock.unlock();
}
//与入列中的一样
if (c == capacity)
signalNotFull();
//返回取出的元素
return x;
}
//队列出列
private E dequeue() {
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
//通知还在入列的线程可以继续往队列中存数据了
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
poll()方法和offer(E e)是一对的,一入一出,一个判断队列是否塞满,一个判断队列是否取完。
6、poll(long timeout, TimeUnit unit)
从队列中取出第一个元素,并将该元素从队列中删除,取出成功返回被取出的元素,失败则返回null
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
这个方法和offer(E e, long timeout, TimeUnit unit)也是一对的,一存一取,并且在取出第一个元素后将该元素从队列中删除。一个在塞满是等待超时,一个在取完时等待超时
7、take()
从队列中取出第一个元素,并将该元素从队列中删除,如果队列中的元素取完了,则线程无限期等待,直到被其它线程唤醒
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
这个方法与put(E e)是一对的,一入一出,一个在队列塞满时无限期等待,一个在队列取完时无限期等待。
8、remove(Object o)
从队列中删除指定的元素
public boolean remove(Object o) {
if (o == null) return false;
//入列和出列同时上锁
fullyLock();
try {
//遍历链表,找到要删除的元素
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
//找到了要删除的元素, if (o.equals(p.item)) {
//执行删除方法
unlink(p, trail);
//删除成功,返回true
return true;
}
}
return false;
} finally {
//释放锁
fullyUnlock();
}
}
由于LinkedBlockingQueue是链表结构,并且还是单向链表,所以remove方法会涉及到从头到尾的遍历,效率上来说是很低的
9、其他的一些方法
peek()
获取队列中的第一个元素,但是不删除
contains(Object o)
判断队列中是否包含要查找的元素,同样需要遍历
toArray()
将队列转换成数组
toArray(T[] a)
将队列转换成指定数据类型的数组
clear()
清空队列
drainTo(Collection<? super E> c)
将队列转换成实现Collection接口的集合。
drainTo(Collection<? super E> c, int maxElements)
将队列的指定长度的元素转换成实现Collection接口的集合,从头开始转换,
如果指定的长度大于或等于队列长度,则将队列所有元素都转换成集合。
iterator()
获取迭代器
spliterator()
获取分流迭代器(1.8新增)
remainingCapacity()
获取队列中的剩余容量
size()
获取队列的大小/长度
在offer(E e, long timeout, TimeUnit unit)方法中提到的在入列方法中唤醒入列的等待状态是因为这个方法和put()方法中都会出现线程等待。例如此时有三个线程ThreadA 、ThreadB和ThreadC,队列最大容量为5。ThreadA先执行offer(.........)方法往队列中塞6条数据,ThreadB执行put()方法往队列中塞2条数据,这时候由于队列最大容量为5,ThreadA中还有一条数据再等待插入,ThreadB()中2条数据都在等待插入,这时候ThreadC执行take()方法从队列中取两条数据并删除,这时候唤醒了ThreadA执行最后一条数据的插入,这时候因为ThreadC删除了2条数据,ThreadA再插入一条,最后队列中总共有4条数据,还能再插入一条数据,因为使用的是同一把锁,所以这时候就需要唤醒另一个Put()方法所在的线程。所以这就是上面方法中要加上那个判断逻辑的原因,否则上面例子put()方法本来还能再插入一条数据,现在就不能继续插入造成线程假死。
以上就是LinkedBlockingQueue常用的一些方法,以上Lock锁由于不在本篇文章的主讲范围,所以不做扩展,原子类是一个可以在多线程中进行原子操作的类,例如本篇文章中的链表自增,使用原子类在多线程中就不会出现线程安全问题。但是原子类只能保证单个方法中是原子的,不同方法之间还是非原子的,但是最终的结果还是安全的。
网友评论