Java集合--阻塞队列(LinkedBlockingQueue

作者: 贾博岩 | 来源:发表于2017-12-10 23:03 被阅读640次

    1. LinkedBlockingQueue

    上篇中,说到了ArrayBlockingQueue阻塞队列。在ArrayBlockingQueue中,底层使用了数组结构来实现。

    那么,提到数组了就不得不提及链表。作为两对成双成对的老冤家,链表也可以实现阻塞队列。

    下面,就让我们进入今天的正题LinkedBlockingQueue!!!!

    LinkedBlockingQueue是一个使用链表实现的阻塞队列,支持多线程并发操作,可保证数据的一致性。

    与ArrayBlockingQueue相同的是,LinkedBlockingQueue也实现了元素“先进先出(FIFO)”规则,也使用ReentrantLock来保证数据的一致性;

    与ArrayBlockingQueue不同的是,LinkedBlockingQueue通常被认为是“无界”的,在默认情况下LinkedBlockingQueue的链表长度为Integer.MAX_VALUE。

    下面,我们就对LinkedBlockingQueue的原理具体分析分析!

    (1)成员变量

    对于ArrayBlockingQueue来说,当队列在进行入队和出队时,永远只能有一个操作被执行。因为该队列只有一把锁,所以在多线程执行中并不允许同时出队和入队。

    与ArrayBlockingQueue不同的是,LinkedBlockingQueue拥有两把锁,一把读锁,一把写锁,可以在多线程情况下,满足同时出队和入队的操作。

    在ArrayBlockingQueue中,由于出队入队使用了同一把锁,无论元素增加还是减少,都不会影响到队列元素数量的统计,所以使用了int类型的变量作为队列数量统计。

    但是,在LinkedBlockingQueue中则不同。上面说了,在LinkedBlockingQueue中使用了2把锁,在同时出队入队时,都会涉及到对元素数量的并发修改,会有线程安全的问题。因此,在LinkedBlockingQueue中使用了原子操作类AtomicInteger,底层使用CAS(compare and set)来解决数据安全问题。

    public class LinkedBlockingQueue<E> extends AbstractQueue<E>
            implements BlockingQueue<E>, java.io.Serializable {
        
        //队列容量大小,默认为Integer.MAX_VALUE
        private final int capacity;
    
        //队列中元素个数:(与ArrayBlockingQueue的不同)
        //出队和入队是两把锁
        private final AtomicInteger count = new AtomicInteger(0);
    
        //队列--头结点
        private transient Node<E> head;
    
        //队列--尾结点
        private transient Node<E> last;
    
        //与ArrayBlockingQueue的不同,两把锁
        //读取锁
        private final ReentrantLock takeLock = new ReentrantLock();
    
        //出队等待条件
        private final Condition notEmpty = takeLock.newCondition();
    
        //插入锁
        private final ReentrantLock putLock = new ReentrantLock();
    
        //入队等待条件
        private final Condition notFull = putLock.newCondition();
    }
    

    (2)链表结点

    由于LinkedBlockingQueue是链表结构,所以必然会有结点存在。

    结点中,保存这元素的值,以及本结点指向下一个结点的指针。

    //队列存储元素的结点(链表结点):
    static class Node<E> {
        //队列元素:
        E item;
    
        //链表中指向的下一个结点
        Node<E> next;
    
        //结点构造:
        Node(E x) { item = x; }
    }
    

    (3)构造函数

    之前,我们说了LinkedBlockingQueue可以称为是无界队列,为什么是无界的,就是因为LinkedBlockingQueue的默认构造函数中,指定的队列大小为Integer.MAX_VALUE = 2147483647,想必没有哪个应用程序能达到这个数量。

    在初始化中,LinkedBlockingQueue的头尾结点中的元素被置为null;

    //默认构造函数:
    public LinkedBlockingQueue() {
        //默认队列长度为Integer.MAX_VALUE
        this(Integer.MAX_VALUE);
    }
    
    //指定队列长度的构造函数:
    public LinkedBlockingQueue(int capacity) {
        //初始化链表长度不能为0
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        //设置头尾结点,元素为null
        last = head = new Node<E>(null);
    }
    

    (4)插入元素(入队)

    LinkedBlockingQueue的插入获取和ArrayBlockingQueue基本类似,都包含有阻塞式和非阻塞式。

    put(E e)是阻塞式插入,如果队列中的元素与链表长度相同,则此线程等待,直到有空余空间时,才执行。

    //向队列尾部插入元素:队列满了线程等待
    public void put(E e) throws InterruptedException {
        //不能插入为null元素:
        if (e == null) throw new NullPointerException();
        int c = -1;
        //创建元素结点:
        Node<E> node = new Node(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        //加插入锁,保证数据的一致性:
        putLock.lockInterruptibly();
        try {
            //当队列元素个数==链表长度
            while (count.get() == capacity) {
                //插入线程等待:
                notFull.await();
            }
            //插入元素:
            enqueue(node);
            //队列元素增加:count+1,但返回+1前的count值:
            c = count.getAndIncrement();
            //容量还没满,唤醒生产者线程
            // (例如链表长度为5,此时第五个元素已经插入,c=4,+1=5,所以超过了队列容量,则不会再唤醒生产者线程)
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            //释放锁:
            putLock.unlock();
        }
        //当c=0时,即意味着之前的队列是空队列,消费者线程都处于等待状态,需要被唤醒进行消费
        if (c == 0)
            //唤醒消费者线程:
            signalNotEmpty();
    }
    

    offer(E e)是非阻塞式插入,队列中的元素与链表长度相同时,直接返回false,不会阻塞线程。

    //向队列尾部插入元素:返回true/false
    public boolean offer(E e) {
        //插入元素不能为空
        if (e == null) throw new NullPointerException();
        final AtomicInteger count = this.count;
        //如果队列元素==链表长度,则直接返回false
        if (count.get() == capacity)
            return false;
        int c = -1;
        //创建元素结点对象:
        Node<E> node = new Node(e);
        final ReentrantLock putLock = this.putLock;
        //加锁,保证数据一致性
        putLock.lock();
        try {
            //队列元素个数 小于 链表长度
            if (count.get() < capacity) {
                //向队列中插入元素:
                enqueue(node);
                //增加队列元素个数:
                c = count.getAndIncrement();
                //容量还没满,唤醒生产者线程:
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            //释放锁:
            putLock.unlock();
        }
        //此时,代表队列中还有一条数据,可以进行消费,唤醒消费者线程
        if (c == 0)
            signalNotEmpty();
        return c >= 0;
    }
    

    (5)获取元素(出队)

    take():阻塞式出队,获取队列头部元素,如果队列中没有元素,则此线程的等待,直到队列中有元素才执行。

    //从队列头部获取元素,并返回。队列为null,则一直等待
    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        //设置读取锁:
        takeLock.lockInterruptibly();
        try {
            //如果此时队列为空,则获取线程等待
            while (count.get() == 0) {
                notEmpty.await();
            }
            //从队列头部获取元素:
            x = dequeue();
            //减少队列元素-1,返回count减少前的值;
            c = count.getAndDecrement();
            //队列中还有可以消费的元素,唤醒其他消费者线程
            if (c > 1)
                notEmpty.signal();
        } finally {
            //释放锁:
            takeLock.unlock();
        }
        //队列中出现了空余元素,唤醒生产者进行生产。
        // (链表长度为5,队列在执行take前有5个元素,执行到此处时候有4个元素了,但是c的值还是5,所以会进入到if中来)
        if (c == capacity)
            signalNotFull();
        return x;
    }
    

    poll():非阻塞式出队,当队列中没有元素,则返回null.

    //获取头部元素,并返回。队列为空,则直接返回null
    public E poll() {
        final AtomicInteger count = this.count;
        //如果队列中还没有元素,则直接返回 null
        if (count.get() == 0)
            return null;
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;
        //加锁,保证数据的安全
        takeLock.lock();
        try {
            //此时在判断,队列元素是否大于0
            if (count.get() > 0) {
                //移除队头元素
                x = dequeue();
                //减少队列元素个数
                c = count.getAndDecrement();
                //此时队列中,还有1个元素,唤醒消费者线程继续执行
                if (c > 1)
                    notEmpty.signal();
            }
        } finally {
            //释放锁:
            takeLock.unlock();
        }
        //队列中出现了空余元素,唤醒生产者进行生产。
        // (链表长度为5,队列在执行take前有5个元素,执行到此处时候有4个元素了,但是c的值还是5,所以会进入到if中来)
        if (c == capacity)
            signalNotFull();
        return x;
    }
    

    (6)出队入队图片介绍

    入队

    入队操作,很简单,就是向链表中逐个插入新的元素!

    [图片上传失败...(image-cf7641-1512918370979)]

    首先,将最后的结点指向新插入的结点,其次将last结点置为新插入的结点,流程结束!

    出队

    相比于入队来说,出队的情况要复杂一点点!

    但是,请记住一点,就是头部元素永远为null!

    [图片上传失败...(image-a955df-1512918370979)]

    首先,将头部元素的指向下一个结点的引用,只向自己,主要为了GC的快速清理!

    再将,队列中的第一个元素变成头结点,而头结点又保有永远为null的属性,则将头结点元素置为null,也就是出队操作!

    (7)ArrayBlockingQueue与LinkedBlockingQueue对比

    ArrayBlockingQueue底层基于数组实现,需要使用者指定队列长度,是一个不折不扣的有界队列。

    LinkedBlockingQueue底层基于链表实现,无需使用者指定队列长度(可自定义),当使用默认大小时候,是一个无界队列。

    ArrayBlockingQueue由于默认必须设置队列长度,所以在使用时会能更好的预测系统性能;而LinkedBlockingQueue默认无参构造,无需指定队列长度,所以在使用时一定要多加注意,当队列中元素短时间内暴增时,可能会对系统产生灾难性影响。

    但是,LinkedBlockingQueue的一大优点也是ArrayBlockingQueue所不具备的,那么就是在多个CPU的情况下,LinkedBlockingQueue可以做到同一时刻既消费、又生产。故LinkedBlockingQueue的性能也要优于ArrayBlockingQueue。

    (8)生产者消费者实现

    使用LinkedBlockingQueue简单模拟消费者生产者实现;

    public class LinkedBlockingQueueTest {
    
        static class Apple{
    
            String colour;
    
            public Apple(String colour){
                this.colour = colour;
            }
    
            public String getColour() {
                return colour;
            }
    
            public void setColour(String colour) {
                this.colour = colour;
            }
        }
    
        //生产者
        static class Producer implements Runnable{
    
            LinkedBlockingQueue<Apple> queueProducer ;
    
            Apple apple;
    
            public Producer( LinkedBlockingQueue<Apple> queueProducer,Apple apple){
                this.queueProducer = queueProducer;
                this.apple = apple;
            }
    
            public void run() {
                try {
                    System.out.println("生产"+apple.getColour()+"的苹果");
                    queueProducer.put(apple);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
    
        //消费者
        static class Consumer implements Runnable{
    
            LinkedBlockingQueue<Apple> queueConsumer ;
    
            public Consumer(LinkedBlockingQueue<Apple> queueConsumer){
                this.queueConsumer = queueConsumer;
            }
    
            public void run() {
                try {
                    Apple apple = queueConsumer.take();
                    System.out.println("消费"+apple.getColour()+"的苹果");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            LinkedBlockingQueue<Apple> queue = new LinkedBlockingQueue<Apple>();
    
            Apple appleRed = new Apple("红色");
            Apple appleGreen = new Apple("绿色");
    
            Producer producer1 = new Producer(queue,appleRed);
    
            Producer producer2 = new Producer(queue,appleGreen);
    
            Consumer consumer = new Consumer(queue);
    
            producer1.run();
            producer2.run();
            consumer.run();
    
            Thread.sleep(10000);
        }
    }
    

    相关文章

      网友评论

        本文标题:Java集合--阻塞队列(LinkedBlockingQueue

        本文链接:https://www.haomeiwen.com/subject/lgrbixtx.html