美文网首页
Java 并发编程之 LinkedBlockingQueue

Java 并发编程之 LinkedBlockingQueue

作者: Tinyspot | 来源:发表于2023-04-12 23:47 被阅读0次

1. LinkedBlockingQueue

  • 基于单向链表实现的阻塞队列
  • 属于有界阻塞队列

1.1 生产-消费模型

生产者生产数据到队列,队列满时需要阻塞线程,停止往队列生产
消费者消费队列,队列为空时阻塞线程停止消费

2. 源码分析

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

}

2.1 成员变量

  • capacity 链表长度
  • count 队列元素数量
  • head / last 链表的头结点和尾节点(头结点出队,尾节点入队)
    • takeLock 消费锁,出队阻塞
    • putLock 生产锁,入队阻塞
  • notEmpty 和 notFull 是条件变量
    • notEmpty 保证消费等待,数据队列空了线程进入等待
    • notFull 保证生产等待,数据队列满了线程进入等待
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    /** The capacity bound, or Integer.MAX_VALUE if none */
    private final int capacity;

    /** Current number of elements */
    private final AtomicInteger count = new AtomicInteger();

    /**
     * Head of linked list.
     * Invariant: head.item == null
     */
    transient Node<E> head;

    /**
     * Tail of linked list.
     * Invariant: last.next == null
     */
    private transient Node<E> last;

    /** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();

}

2.2 链表节点

  • 元素 item
  • 后继节点next
static class Node<E> {
    E item;
    Node<E> next;

    Node(E x) { item = x; }
}

3. 出队和入队

方法 抛异常 有返回值 阻塞 阻塞特定时间
入队 add(e) offer(e) put(e) offer(e, time, unit)
出队 remove() poll() take() poll(time, unit)
获取队首元素 element() peek() 不支持 不支持

3.1 阻塞入队 put()

向队列尾部插入一个元素,如果队列中有空闲则插入后直接返回,如果队列已满则阻塞当前线程,直到队列有空闲插入成功后返回

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();

    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        /*
         * 若队列满则等待(阻塞当前线程)
         * 当前线程被加入 notFull 条件的等待队列中
         */
        while (count.get() == capacity) {
            notFull.await();
        }
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal(); // 唤醒一个等待入队的线程
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
}

3.2 非阻塞入队 offer()

  • boolean offer(E e)
    • 队列中有空闲则插入成功后返回true,如果队列已满则丢弃当前元素然后返回false
  • boolean offer(E e, long timeout, TimeUnit unit)
/**
 * Inserts the specified element at the tail of this queue if it is
 * possible to do so immediately without exceeding the queue's capacity,
 * returning true upon success and false if this queue is full.
 */
public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    if (count.get() == capacity)
        return false;
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        if (count.get() < capacity) {
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        }
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return c >= 0;
}

/**
 * Inserts the specified element at the tail of this queue, waiting if
 * necessary up to the specified wait time for space to become available.
 *
 * @return true if successful, or false if
 *         the specified waiting time elapses before space is available
 */
public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException { // ... }

3.3 阻塞出队 take()

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    // 加锁
    takeLock.lockInterruptibly();
    try {
        // 阻塞挂起当前线程,并把当前线程放入 notEmpty 条件队列
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

3.4 非阻塞出队 poll()

public E poll() {
    final AtomicInteger count = this.count;
    if (count.get() == 0)
        return null;
    E x = null;
    int c = -1;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        if (count.get() > 0) {
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        }
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

3.5 出队 remove()

public boolean remove(Object o) {
    if (o == null) return false;
    // (1) 双重加锁
    fullyLock();
    try {
        // (2) 遍历队列
        for (Node<E> trail = head, p = trail.next;  p != null;  trail = p, p = p.next) {
            if (o.equals(p.item)) {
                // (3) 
                unlink(p, trail);
                return true;
            }
        }
        return false;
    } finally {
        // (4) 释放锁
        fullyUnlock();
    }
}

相关文章

网友评论

      本文标题:Java 并发编程之 LinkedBlockingQueue

      本文链接:https://www.haomeiwen.com/subject/jgpoddtx.html