美文网首页
BlockingQueue简析

BlockingQueue简析

作者: lipy_ | 来源:发表于2017-03-15 18:08 被阅读0次

BlockingQueue

BlockingQueue是一个阻塞队列。在高并发场景是用得非常多的,在线程池中。如果运行线程数目大于核心线程数目时,也会尝试把新加入的线程放到一个BlockingQueue中去。

核心方法

放入数据

add(object)

队列没满的话,放入成功。否则抛出异常。

offer(object)

表示如果可能的话,将object加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false.(本方法不阻塞当前执行方法的线程)

offer(E o, long timeout, TimeUnit unit)

可以设定等待的时间,如果在指定的时间内,还不能往队列中加入BlockingQueue,则返回失败。

put(object)

把object加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程阻塞。直到BlockingQueue里面有空间再继续.

获取数据

poll(time)

取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null;

poll(long timeout, TimeUnit unit)

从BlockingQueue取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。

take()

取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入;

drainTo()

一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。

线程池中对BlockingQueue的两个实现

LinkedBlockingQueue

LinkedBlockingQueue是一个用链表实现的有界阻塞队列。
当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue可以通过构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。

而LinkedBlockingQueue之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。

此队列的默认和最大长度为Integer.MAX_VALUE。

public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

此队列按照先进先出的原则对元素进行排序

private E dequeue() {
    // assert takeLock.isHeldByCurrentThread();
    // assert head.item == null;
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // help GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}

LinkedBlockingQueue数据是放在一个Node节点中

/**
 * Linked list node class
 */
static class Node<E> {
    E item;

    Node<E> next;

    Node(E x) { item = x; }
}

LinkedBlockingQueue有两个锁,一个放入锁,一个取得锁。分别对应放入元素和取得元素时的操作。这是由链表的结构所确定的。但是删除一个元素时,要同时获得放入锁和取得锁。

阻塞等待放入

public void put(E e) throws InterruptedException {  
    if (e == null) throw new NullPointerException();  
    int c = -1;  
    Node<E> node = new Node<E>(e);  
    final ReentrantLock putLock = this.putLock;  
    final AtomicInteger count = this.count;  
    putLock.lockInterruptibly(); //取得放入锁  
    try {  
        while (count.get() == capacity) {//队列已满  
            notFull.await();  
       }  
       enqueue(node);//入队  
       c = count.getAndIncrement();//当前队列中元素个数加1  
       if (c + 1 < capacity)  
           notFull.signal();  
    } finally {  
        putLock.unlock();  
    }  
    if (c == 0)  
        signalNotEmpty();  
}  

取队头元素。没有的话返回null,有的话返回元素,并将队列中删除此元素

public E poll() {  
    final AtomicInteger count = this.count;  
    if (count.get() == 0)  
        return null;  
    E x = null;  
    int c = -1;  
    final ReentrantLock takeLock = this.takeLock;  
    takeLock.lock();//获得取得锁  
    try {  
        if (count.get() > 0) {  
            x = dequeue();//出队  
            c = count.getAndDecrement();//当前队列中元素个数减去1  
            if (c > 1)  
                notEmpty.signal();//不为空条件成功  
        }  
    } finally {  
        takeLock.unlock();  
    }  
    if (c == capacity)  
        signalNotFull();  
    return x;  
}  

删除时要同时取得放入锁和取得锁

public boolean remove(Object o) {  
    if (o == null) return false;  
    fullyLock();//同时取得放入锁和取得锁  
    try {  
        for (Node<E> trail = head, p = trail.next;  
             p != null;  
             trail = p, p = p.next) {  
            if (o.equals(p.item)) {  
                unlink(p, trail);  
                return true;  
            }  
        }  
        return false;  
    } finally {  
        fullyUnlock();  
    }  
}  

void fullyLock() {
    putLock.lock();
    takeLock.lock();
}

SynchronousQueue

  1. 容量为0,无论何时 size方法总是返回0
  2. put操作阻塞, 直到另外一个线程取走队列的元素。
  3. take操作阻塞,直到另外的线程put某个元素到队列中。
  4. 任何线程只能取得其他线程put进去的元素,而不会取到自己put进去的元素

构造方法上接收boolean参数,表示这是一个公平的基于队列的排队模式,还是一个非公平的基于栈的排队模式。

public SynchronousQueue(boolean fair) {  
    transferer = fair ? new TransferQueue() : new TransferStack();  
} 

公平模式:SynchronousQueue会采用公平锁,并配合一个FIFO队列来阻塞多余的生产者和消费者,从而体系整体的公平策略;但如果是非公平模式(SynchronousQueue默认):SynchronousQueue采用非公平锁,同时配合一个LIFO队列来管理多余的生产者和消费者

相关文章

网友评论

      本文标题:BlockingQueue简析

      本文链接:https://www.haomeiwen.com/subject/gbcrnttx.html