本文会说明java中的阻塞队列, 这是一个多线程条件下常用的数据结构. 题目中的类名实际上都是接口, 其中BlockingQueue
接口定义了阻塞队列的基本操作, TransferQueue
是另外一个继承了BlockingQueue
的接口, 适用于一些特定环境; DelayQueue
是用于存储Delay
类型的队列. 上述类和接口都会在本文进行说明.
简单来说, 阻塞队列实现了这样一条规则: 当向满队列中添加元素或者从空队列中取出元素时, 操作者会进入阻塞状态, 直到队列有空位或放入元素. 当然阻塞队列也实现了队列的线程安全特性. 和其他很多为并发操作设计的类或接口一样, 理解其意义比了解方法更重要一些.
BlockingQueue
public interface BlockingQueue<E> extends Queue<E>
BlockingQueue声明的方法
- Queue<E>接口中的方法, 这里略去;
- 新方法
//这个方法肯定是要加锁的所以请不要考虑"如果其他线程修改了队列怎么办"这样的问题
boolean offer(E e) //把元素放入队列, 无等待时间
boolean offer(E e, long timeout, TimeUnit unit) //在等待时间内把元素放入队列
void put(E e) //一直阻塞, 直到有空位可以放入队列
E poll() //同上
E poll(long timeout, TimeUnit unit)
E take() //一直阻塞, 直到可以获取队列元素
int drainTo(Collection<? super E> c) //所有元素出列, 并放进集合c里
int drainTo(Collection<? super E> c, int max) //max个元素出列, 并放进集合c里
int remainingCapacity()
BlockingQueue的实现类
实现类有ArrayBlockingQueue
, ListBlockingQueue
和PriorityBlockingQueue
, 其中的方法和它们的非线程安全的版本都是相同的, 请自行参考相关文章或源码吧.
当然BlockingQueue也有双向队列的版本, 即
public interface BlockingDeque<E> extends BlockingQueue<E>, Deque<E>
这里也不再说明.
DelayQueue
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E>
首先要说明一下泛型<E extends Delayed>
中Delayed
接口是什么
public interface Delayed extends Comparable<Delayed> {
long getDelay(Timeunit) //获取对应时间单位对应的时长
int compareTo(Delayed) //继承Comparable接口的方法, 所以源码中省略了
}
java文档对Delayed接口说明是: "A mix-in style interface for marking objects that should be acted upon after a given delay."
以DelayQueue为例, 当它需要对队列中的元素进行相关操作时(如设置倒计时), 就会调用getDelay
方法, 其返回值就是对该元素延迟操作的时间; 因此我们在实现这个接口的时候, 需要有方法能够传入延迟时间, 然后实现getDelay
方法得到这个时间.
需要指出, 在jdk中这个接口并没有public的实现类.
借用一篇文章的说法: "(一个线程)要实现Delayed接口的getDelay()和compareTo()方法,放入DelayQueue队列后,通过take()方法取出时,可根据compareTo()方法制定的顺序来优先取出线程执行".
DelayQueue<E extends Delay>的方法
- 构造方法
DelayQueue()
DelayQueue(Collection<? extends E>)
- DelayQueue对象中的方法都是继承自父类或接口, 没有添加新的方法, 因此略去.
DelayQueue的意义
这个类该怎么用? 什么时候用呢?
根据文档的说法, DelayQueue用于放置实现了Delayed接口的对象, 其中的对象只能在其到期时才能从队列中取走.
或者说队列中的元素在到期之前对我们是不可见的.
再或者说... DelayQueue只是一个容器, 而其中到期的元素构成了一个普通的队列.
我觉得这样应该足够好理解了吧, 可以看一下其他文章的例子, 虽然我觉得这些例子怎么都那么别扭呢.
TransferQueue<E>
public interface TransferQueue<E> extends BlockingQueue<E>
TransferQueue只有一个实现类, 是LinkedTransferQueue
.
自己看java文档还是不太明白这个接口是干什么用的, 然后从网上找了一些文章参考. 概括来说, 这个接口除了继承了BlockingQueue, 还实现了"单一队列"的功能, 即队列中只能放入一个元素, 直到该元素取走才能放入下一个. 该功能通过transfer
等方法实现.
TransferQueue新增的方法
在BlockingQueue基础上, TransferQueue新增了下列方法
void transfer(E) //向队列添加一个元素, 然后该线程阻塞直到其他线程取走元素
//如果有消费者线程等待取走元素, 则添加至队列, 返回true, 否则返回false
boolean tryTransfer(E)
boolean tryTransfer(E, long, TimeUnit)
boolean hasWaitingConsumer()
int getWaitingConsumerCount()
可以看出, TransferQueue的方法都是对于生产者来说的, 作为消费者使用这个线程的方法和其他阻塞线程是一样的.
SynchronousQueue
public class SynchronousQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable
在说明TransferQueue
之后, 有必要顺带提一下SynchronousQueue
. SynchronousQueue
是java1.5新增的, 而TransferQueue
是1.7新增的. 一般来说我们现在完全可以用TransferQueue
代替SynchronousQueue
.
Synchronous是同步的意思(Asynchronous是异步). 这个类所实现的功能和TransferQueue
是一样的, 即每次入队列操作都处于阻塞状态, 直到其他线程取走队列元素. 和TransferQueue
不同的是, TransferQueue
额外使用了transfer
方法完成这个功能, 但是offer
和put
方法的含义(即文档说明)是继承自BlockingQueue
的; 而SynchronousQueue
则是改写了offer
方法和put
方法.
实际上, SynchronousQueue中是这样解释offer
和put
方法的:
//Inserts the specified element into this queue, if another thread is waiting to receive it.
//如果有消费者正在等待, 则元素入队列并返回true, 否则返回false;
boolean offer(E)
//Adds the specified element to this queue, waiting if necessary for another thread to receive it.
//元素入队列, 进入阻塞状态, 直到有消费者取走该元素.
void put(E) throws InterruptedException
可以看出, TransferQueue的灵活性要更大一些.
网友评论