假如我们现在有这样的需求,有一个仓库,我们可以存东西和取东西,仓库有存储上限。当仓库已满的时候,存东西的人就必须等待,直到有人取走东西。当仓库为空的时候,取东西的人必须等待,直到有人存入东西。
这是一个典型的生产者消费者问题。这里有两个条件,仓库已满的条件和仓库为空的条件,用条件让线程等待,这个让我们想到了并发框架下的Condition。
自己实现这个功能也不难,但是java中给我们提供了很好的实现,就是并发集合中的阻塞队列BlockingQueue。
// 阻塞队列的接口
public interface BlockingQueue<E> extends Queue<E> {}
阻塞队列BlockingQueue继承自Queue队列接口。而Queue接口我们在LinkedList(源码解析)文章中详细介绍过了,这里先回顾一下。
一. Queue接口
队列是一种FIFO(先入先出)的数据结构,也就是说每次插入元素都是插入在队列尾,每次取出元素都是在队列头取出。
所以队列应该有三个重要方法:
- boolean offer(E e); 向队列尾添加元素。
- E poll(); 移除队列头元素,并返回它。
- E peek(); 查看队列头元素。
下面我们来看一下Queue接口的代码
public interface Queue<E> extends Collection<E> {
// 向队列末尾新添加元素,返回true表示添加成功
// 不会返回false,因为添加失败直接抛出IllegalStateException异常。
// 一般调用offer方法实现。
boolean add(E e);
// 向队列末尾新添加元素,返回true表示添加成功,返回false,添加失败
boolean offer(E e);
// 这个与Collection中的remove方法不一样,因为Collection中的remove方法都要提供一个元素或者集合,用于删除。
// 这里不穿任何参数,就是代表删除队列第一个元素(即队列头),并返回它
// 还需要注意的时,如果队列是空的,即队列头是null,这个方法会抛出NoSuchElementException异常。
E remove();
// 这个方法也是删除队列第一个元素(即队列头),并返回它
// 但是它和remove()方法不同的时,如果队列是空的,即队列头是null,它不会抛出异常,而是会返回null。
E poll();
// 查看队列头的元素,如果队列是空的,就抛出异常
E element();
// 查看队列头的元素。如果队列是空的,不会抛出异常,而是返回null
E peek();
}
可以看出继承自Collection接口,但是Queue接口还提供了是三个好像重复的方法。
- 向队列尾添加元素的方法:add(E e)与offer(E e)。区别就是队列是满的,添加失败时,add方法会抛出异常,而offer方法只会返回false。
- 移除队列头元素的方法:remove()与poll()。区别就是队列为空的时候,remove方法会抛出异常,poll方法只会返回null。
- 查看队列头元素的方法:element()与peek()。区别就是队列为空的时候,element方法会抛出异常,peek方法只会返回null。
二. BlockingQueue接口
阻塞队列BlockingQueue与普通队列Queue相比较,不同有两点:
- 当队列已满的时候,当前线程会阻塞等待,直到队列有空余位置,当前线程会被唤醒,将元素插入队列尾。
- 当队列为空的时候,当前线程会阻塞等待,直到队列中有元素,当前线程会被唤醒,从队列头取出元素。
在BlockingQueue中有这样两个方法:
// 向队列末尾新添加元素,如果队列已满,当前线程就等待。响应中断异常
void put(E e) throws InterruptedException;
/**
* 返回并移除队列第一个元素,如果队列是空的,就前线程就等待。响应中断异常
*/
E take() throws InterruptedException;
但是有这样一个问题,当队列已满的时候,当前线程会阻塞等待,它只能祈祷有别的线程取出队列中的元素,如果一直没有线程取走元素,那么当前线程就会一直阻塞等待,整个过程对当前线程是不可控的。
如果我们希望对等待时间有一定的控制,不能出现一直等待的现象,那么就要增加超时时间,所以要添加有超时时间的对应方法。
/**
* 向队列末尾新添加元素,如果队列中没有可用空间,当前线程就等待,
* 如果等待时间超过timeout了,那么返回false,表示添加失败
*/
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
/**
* 返回并移除队列第一个元素,如果队列是空的,就前线程就等待。
* 如果等待时间超过timeout了,那么返回false,表示获取元素失败
*/
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
所以与Queue接口进行对比,BlockingQueue阻塞队列的插入、移除和查看的方法不同:
-
向队列尾添加元素的方法:
1). add(E e)与offer(E e)方法,和Queue接口作用一样,只不过加了多线程同步安全的操作。
2). put(E e)方法,添加元素时,当前线程可能会阻塞等待。
3). offer(E e, long timeout, TimeUnit unit)方法,与put(E e)方法作用相同,只不过加了超时时间。(其实这个方法名用put更好,用offer容易让人和offer(E e)方法产生混淆) -
移除队列头元素的方法:
1). remove()与poll()方法,和Queue接口作用一样,只不过加了多线程同步安全的操作。(注:这个没有在BlockingQueue接口中重新声明)
2). take()方法,移除元素时,当前线程可能会阻塞等待。
3). poll(long timeout, TimeUnit unit) 方法,与take()方法作用相同,只不过加了超时时间。(其实这个方法名用take更好,用poll容易让人和poll()方法产生混淆) -
查看队列头元素的方法:BlockingQueue接口没有添加新的关于查看队列头的新方法,因为查看元素不会改变队列,所以不需要增加阻塞的方法。还是element()与peek()这两个方法。
package java.util.concurrent;
import java.util.Collection;
import java.util.Queue;
// 阻塞队列的接口
public interface BlockingQueue<E> extends Queue<E> {
/**
* 向队列尾添加元素。如果队列已满,添加失败,抛出IllegalStateException异常
*/
boolean add(E e);
// 向队列末尾新添加元素。返回true表示添加成功,false表示添加失败,不会抛出异常
boolean offer(E e);
// 向队列末尾新添加元素,如果队列已满,当前线程就等待。响应中断异常
void put(E e) throws InterruptedException;
/**
* 向队列末尾新添加元素,如果队列中没有可用空间,当前线程就等待,
* 如果等待时间超过timeout了,那么返回false,表示添加失败
*/
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
/**
* 返回并移除队列第一个元素,如果队列是空的,就前线程就等待。响应中断异常
*/
E take() throws InterruptedException;
/**
* 返回并移除队列第一个元素,如果队列是空的,就前线程就等待。
* 如果等待时间超过timeout了,那么返回false,表示获取元素失败
*/
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
// 返回队列剩余空间的个数。(即还能添加多少个元素)
int remainingCapacity();
// 移除队列中包含的所有o对象。返回true表示队列有变化,即删除了元素,返回false表示没有删除任何元素。
boolean remove(Object o);
// 队列中是否包含对象o
public boolean contains(Object o);
/**
* 移除阻塞队列中所有有效元素,把它们添加到另一个集合c中。返回值转移的元素个数。
* @param c
* @return 实际转移的元素个数。
*/
int drainTo(Collection<? super E> c);
/**
* 最多移除阻塞队列中maxElements个数的有效元素,把它们添加到另一个集合c中。
* 返回值实际转移的元素个数。
* @param c
* @param maxElements 最多添加的元素个数
* @return 实际转移的元素个数。
*/
int drainTo(Collection<? super E> c, int maxElements);
}
网友评论