美文网首页
Java 并发编程之 LinkedBlockingQueue

Java 并发编程之 LinkedBlockingQueue

作者: Tinyspot | 来源:发表于2023-04-12 23:47 被阅读0次

    1. LinkedBlockingQueue

    • 基于单向链表实现的阻塞队列
    • 属于有界阻塞队列

    1.1 生产-消费模型

    生产者生产数据到队列,队列满时需要阻塞线程,停止往队列生产
    消费者消费队列,队列为空时阻塞线程停止消费

    2. 源码分析

    public class LinkedBlockingQueue<E> extends AbstractQueue<E>
            implements BlockingQueue<E>, java.io.Serializable {
    
    }
    

    2.1 成员变量

    • capacity 链表长度
    • count 队列元素数量
    • head / last 链表的头结点和尾节点(头结点出队,尾节点入队)
      • takeLock 消费锁,出队阻塞
      • putLock 生产锁,入队阻塞
    • notEmpty 和 notFull 是条件变量
      • notEmpty 保证消费等待,数据队列空了线程进入等待
      • notFull 保证生产等待,数据队列满了线程进入等待
    public class LinkedBlockingQueue<E> extends AbstractQueue<E>
            implements BlockingQueue<E>, java.io.Serializable {
    
        /** The capacity bound, or Integer.MAX_VALUE if none */
        private final int capacity;
    
        /** Current number of elements */
        private final AtomicInteger count = new AtomicInteger();
    
        /**
         * Head of linked list.
         * Invariant: head.item == null
         */
        transient Node<E> head;
    
        /**
         * Tail of linked list.
         * Invariant: last.next == null
         */
        private transient Node<E> last;
    
        /** Lock held by take, poll, etc */
        private final ReentrantLock takeLock = new ReentrantLock();
    
        /** Wait queue for waiting takes */
        private final Condition notEmpty = takeLock.newCondition();
    
        /** Lock held by put, offer, etc */
        private final ReentrantLock putLock = new ReentrantLock();
    
        /** Wait queue for waiting puts */
        private final Condition notFull = putLock.newCondition();
    
    }
    

    2.2 链表节点

    • 元素 item
    • 后继节点next
    static class Node<E> {
        E item;
        Node<E> next;
    
        Node(E x) { item = x; }
    }
    

    3. 出队和入队

    方法 抛异常 有返回值 阻塞 阻塞特定时间
    入队 add(e) offer(e) put(e) offer(e, time, unit)
    出队 remove() poll() take() poll(time, unit)
    获取队首元素 element() peek() 不支持 不支持

    3.1 阻塞入队 put()

    向队列尾部插入一个元素,如果队列中有空闲则插入后直接返回,如果队列已满则阻塞当前线程,直到队列有空闲插入成功后返回

    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
    
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            /*
             * 若队列满则等待(阻塞当前线程)
             * 当前线程被加入 notFull 条件的等待队列中
             */
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal(); // 唤醒一个等待入队的线程
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }
    

    3.2 非阻塞入队 offer()

    • boolean offer(E e)
      • 队列中有空闲则插入成功后返回true,如果队列已满则丢弃当前元素然后返回false
    • boolean offer(E e, long timeout, TimeUnit unit)
    /**
     * Inserts the specified element at the tail of this queue if it is
     * possible to do so immediately without exceeding the queue's capacity,
     * returning true upon success and false if this queue is full.
     */
    public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        final AtomicInteger count = this.count;
        if (count.get() == capacity)
            return false;
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            if (count.get() < capacity) {
                enqueue(node);
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return c >= 0;
    }
    
    /**
     * Inserts the specified element at the tail of this queue, waiting if
     * necessary up to the specified wait time for space to become available.
     *
     * @return true if successful, or false if
     *         the specified waiting time elapses before space is available
     */
    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException { // ... }
    

    3.3 阻塞出队 take()

    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        // 加锁
        takeLock.lockInterruptibly();
        try {
            // 阻塞挂起当前线程,并把当前线程放入 notEmpty 条件队列
            while (count.get() == 0) {
                notEmpty.await();
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }
    

    3.4 非阻塞出队 poll()

    public E poll() {
        final AtomicInteger count = this.count;
        if (count.get() == 0)
            return null;
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            if (count.get() > 0) {
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1)
                    notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }
    

    3.5 出队 remove()

    public boolean remove(Object o) {
        if (o == null) return false;
        // (1) 双重加锁
        fullyLock();
        try {
            // (2) 遍历队列
            for (Node<E> trail = head, p = trail.next;  p != null;  trail = p, p = p.next) {
                if (o.equals(p.item)) {
                    // (3) 
                    unlink(p, trail);
                    return true;
                }
            }
            return false;
        } finally {
            // (4) 释放锁
            fullyUnlock();
        }
    }
    

    相关文章

      网友评论

          本文标题:Java 并发编程之 LinkedBlockingQueue

          本文链接:https://www.haomeiwen.com/subject/jgpoddtx.html