美文网首页java并发相关java开发
java阻塞队列 LinkedBlockingQueue

java阻塞队列 LinkedBlockingQueue

作者: 韭菜待收割 | 来源:发表于2018-09-10 19:01 被阅读8次
    java.util.concurrent
    //由单向链表结构组成的有界阻塞队列
    public class LinkedBlockingQueue<E> extends AbstractQueue<E>
            implements BlockingQueue<E>, java.io.Serializable
    
    
    //内部维护的链表节点结构Node
    static class Node<E> { 
      E item;//入队元素 
      Node<E> next;//后继节点 
      Node(E x) { 
        item = x; 
      } 
    } 
    
    //入队锁
    private final ReentrantLock takeLock = new ReentrantLock();
    //出队锁
    private final ReentrantLock putLock = new ReentrantLock();
    

    1、常用方法

    构造方法

    /**
     * 在不指定时容量时为Integer.MAX_VALUE
     */
    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }
    
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }
    
    public LinkedBlockingQueue(Collection<? extends E> c) {
        this(Integer.MAX_VALUE);
        final ReentrantLock putLock = this.putLock;
        putLock.lock(); // Never contended, but necessary for visibility
        try {
            int n = 0;
            for (E e : c) {
                if (e == null)
                    throw new NullPointerException();
                if (n == capacity)
                    throw new IllegalStateException("Queue full");
                enqueue(new Node<E>(e));
                ++n;
            }
            count.set(n);
        } finally {
            putLock.unlock();
        }
    }
    

    父类AbstractQueue实现

    /**
     * 增加一个元索,如果队列已满,则抛出异常
     */
    public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }
    
    /**
     * 移除并返回队列头部的元素,如果队列为空,则抛出异常
     */
    public E remove() {
        E x = poll();
        if (x != null)
            return x;
        else
            throw new NoSuchElementException();
    }
    
    /**
     * 返回队列头部的元素,如果队列为空,则抛出异常
     */
    public E element() {
        E x = peek();
        if (x != null)
            return x;
        else
            throw new NoSuchElementException();
    }
    

    非阻塞方法

    /**
     * 添加一个元素并返回true,如果队列已满,则返回false
     */
    public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        final AtomicInteger count = this.count;
        if (count.get() == capacity)
            return false;
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            if (count.get() < capacity) {
                enqueue(node);
                //队列数据总数自增+1后返回,返回的是未+1的值
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    //唤醒非满等待队列上的线程
                    notFull.signal();
            }
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            //队列中刚好有一个数据,唤醒非空等待队列 
            signalNotEmpty();
        return c >= 0;
    }
    
    /**
     * 移除并返问队列头部的元素,如果队列为空,则返回null
     * */
    public E poll() {
        final AtomicInteger count = this.count;
        if (count.get() == 0)
            return null;
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            if (count.get() > 0) {
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1)//出队后队列还是非空
                    //唤醒非空等待队列上的线程
                    notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            //唤醒非满等待队列上的线程 
            signalNotFull();
        return x;
    }
    
    /**
     * 返回队列头部的元素,如果队列为空,则返回null
     */
    public E peek() {
        if (count.get() == 0)
            return null;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            //返回head的next的item,因为head和tail的item是Null
            Node<E> first = head.next;
            if (first == null)
                return null;
            else
                return first.item;
        } finally {
            takeLock.unlock();
        }
    }
    

    阻塞方法

    /**
     * 添加一个元素,如果队列满,则阻塞
     */
    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();//可被线程中断
        try {
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)//队列未满
                notFull.signal();//唤醒非满等待队列上的线程
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            //队列上只有一个元素,唤醒非空等待队列上的线程
            signalNotEmpty();
    }
    
    
    /**
     * 移除并返回队列头部的元素,如果队列为空,则阻塞
     */
    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();//可被线程中断
        try {
            while (count.get() == 0) {
                notEmpty.await();//休眠非空等待队列上的线程
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();//唤醒非空等待队列上的线程 
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            //唤醒非满等待队列上的线程
            signalNotFull();
        return x;
    }
    

    其它

    /**
     * 没有遍历整个队列
     */
    public int size() {
        return count.get();
    }
    

    2、入队出队操作

    当队列添加元素后:
    *会调用notFull.signal(); 唤醒非满等待队列上的线程;
    *如果队列之前刚好是空的,会调用signalNotEmpty(); 唤醒非空等待队列上的线程。

    当队列删除元素后:
    *如果出队后队列还是非空,会调用notEmpty.signal(); 唤醒非空等待队列上的线程;
    *如果队列之前刚好是满的,会调用signalNotFull(); 唤醒非满等待队列上的线程 。

    相关文章

      网友评论

        本文标题:java阻塞队列 LinkedBlockingQueue

        本文链接:https://www.haomeiwen.com/subject/cqkegftx.html