美文网首页
java中的阻塞队列

java中的阻塞队列

作者: FunnyBuff | 来源:发表于2021-06-14 18:33 被阅读0次

    前言

    在去年的面试过程中,被面试官问道“阻塞队列”这个问题,因为当时并没有对此问题进行深入理解,只是按照自己的理解说明了该问题,最后面试结果也不太好,今天对该问题进行简要的面试并记录如下;如有错误,欢迎指正。

    什么是阻塞队列

    在数据结构中,队列遵循FIFO(先进先出)原则。在java中,Queue接口定义了定义了基本行为,由子类完成实现,常见的队列有ArrayDequeLinkedList等,这些都是非线程安全的,在java 1.5中新增了阻塞队列,当队列满时,添加元素的线程呈阻塞状态;当队列为空时,获取元素的线程呈阻塞状态。

    生产者、消费者模型

    微信截图_20210614125821.png

    生产者将元素添加到队列中,消费中获取数据后完成数据处理。两者通过队列解决了生产者和消费者的耦合关系;当生产者的生产速度与消费者的消费速度不一致时,可以通过大道缓冲的目的。

    阻塞队列的使用场景

    1. 线程池

      在线程池中,当工作线程数大于等于corePoolSize时,后续的任务后添加到阻塞队列中;

    目前有那些阻塞队列

    在java中,BlockingQueue接口定义了阻塞队列的行为,常用子类是ArrayBlockingQueueLinkedBlockingQueue

    ArrayBlockingQueue.png

    BlockingQueue继承了Queue接口,拥有其全部特性。在BlockingQueuejava doc中对其中的操作方法做了汇总

    微信截图_20210614132620.png
    • 插入元素
      • add(e):当队列已满时,再添加元素会抛出异常IllegalStateException
      • offer(e):添加成功,返回true,否则返回false
      • put:(e):当队列已满时,再添加元素会使线程变为阻塞状态
      • offer(e, time,unit):当队列已满时,在末尾添加数据,如果在指定时间内没有添加成功,返回false,反之是true
    • 删除元素:
      • remove(e):返回true表示已成功删除,否则返回false
      • poll():如果队列为空返回null,否则返回队列中的第一个元素
      • take():获取队列中的第一个元素,如果队列为空,获取元素的线程变为阻塞状态
      • poll(time, unit):当队列为空时,线程被阻塞,如果超过指定时间,线程退出
    • 检查元素:
      • element():获取队头元素,如果元素为null,抛出NoSuchElementException
      • peek():获取队头元素,如果队列为空返回null,否则返回目标元素

    ArrayBlockingQueue

    底层基于数组的有界阻塞队列,在构造此队列时必须指定容量;

    构造函数

    // 第一个  
    public ArrayBlockingQueue(int capacity, boolean fair,Collection<? extends E> c) {
            this(capacity, fair);
    
            final ReentrantLock lock = this.lock;
            lock.lock(); // Lock only for visibility, not mutual exclusion
            try {
                int i = 0;
                try {
                    for (E e : c) {
                        checkNotNull(e);
                        items[i++] = e;
                    }
                } catch (ArrayIndexOutOfBoundsException ex) {
                    throw new IllegalArgumentException();
                }
                count = i;
                putIndex = (i == capacity) ? 0 : i;
            } finally {
                lock.unlock();
            }
        }
    
        // 第二个
        public ArrayBlockingQueue(int capacity, boolean fair) {
            if (capacity <= 0)
                throw new IllegalArgumentException();
            this.items = new Object[capacity];
            lock = new ReentrantLock(fair);
            notEmpty = lock.newCondition();
            notFull =  lock.newCondition();
        }
    
        // 第三个
        public ArrayBlockingQueue(int capacity) {
            this(capacity, false);
        }
    
    • capacity:队列的初始容量
    • fair:线程访问队列的公平性。如果为true按照FIFO的原则处理,反之;默认为false
    • c:已有元素的集合,类型于合并两个数组

    put()方法

       public void put(E e) throws InterruptedException {
             // 检查元素是否为null
            checkNotNull(e);
            final ReentrantLock lock = this.lock;
            // 获取锁
            lock.lockInterruptibly();
            try {
                // 如果当前队列为空,变为阻塞状态
                while (count == items.length)
                    notFull.await();
                // 反之,就添加元素
                enqueue(e);
            } finally {
                // 解锁
                lock.unlock();
            }
        }
    
        private void enqueue(E x) {
            final Object[] items = this.items;
            items[putIndex] = x;
            if (++putIndex == items.length)
                putIndex = 0;
            count++;
            // 此时队列不为空,唤醒消费者
            notEmpty.signal();
        }
    

    take()方法

        public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            // 获取锁
            lock.lockInterruptibly();
            try {
                // 如果队列为空,消费者变为阻塞状态
                while (count == 0)
                    notEmpty.await();
                // 不为空,就获取数据
                return dequeue();
            } finally {
                // 解锁
                lock.unlock();
            }
        }
    
            private E dequeue() {
            final Object[] items = this.items;
            @SuppressWarnings("unchecked")
            // 获取队头元素x
            E x = (E) items[takeIndex];
            items[takeIndex] = null;
            if (++takeIndex == items.length)
                takeIndex = 0;
            count--;
            if (itrs != null)
                itrs.elementDequeued();
             // 此时队列没有满,同时生产者继续添加数据
            notFull.signal();
            return x;
        }
    

    LinkedBlockingQueue

    底层基于单向链表的无界阻塞队列,如果不指定初始容量,默认为Integer.MAX_VALUE,否则为指定容量

    构造函数

        // 不指定容量     
        public LinkedBlockingQueue() {
            this(Integer.MAX_VALUE);
        }
        // 指定容量
        public LinkedBlockingQueue(int capacity) {
            if (capacity <= 0) throw new IllegalArgumentException();
            this.capacity = capacity;
            last = head = new Node<E>(null);
        }
    
        // 等同于合并数组
        public LinkedBlockingQueue(Collection<? extends E> c) {
            this(Integer.MAX_VALUE);
            final ReentrantLock putLock = this.putLock;
            putLock.lock(); // Never contended, but necessary for visibility
            try {
                int n = 0;
                for (E e : c) {
                    if (e == null)
                        throw new NullPointerException();
                    if (n == capacity)
                        throw new IllegalStateException("Queue full");
                    enqueue(new Node<E>(e));
                    ++n;
                }
                count.set(n);
            } finally {
                putLock.unlock();
            }
        }
    

    put()方法

        public void put(E e) throws InterruptedException {
            // 元素为空,抛出异常
            if (e == null) throw new NullPointerException();
            int c = -1;
            Node<E> node = new Node<E>(e);
            final ReentrantLock putLock = this.putLock;
            // 获取队列中的数据量
            final AtomicInteger count = this.count;
            // 获取锁
            putLock.lockInterruptibly();
            try {
                // 队列满了,变为阻塞状态
                while (count.get() == capacity) {
                    notFull.await();
                }
                // 将目标元素添加到链表的尾端
                enqueue(node);
                // 总数增加
                c = count.getAndIncrement();
                // 队列还没有满,继续添加元素
                if (c + 1 < capacity)
                    notFull.signal();
            } finally {
                // 解锁
                putLock.unlock();
            }
            if (c == 0)
                signalNotEmpty();
        }
    

    take()方法

        public E take() throws InterruptedException {
            E x;
            int c = -1;
            // 获取队列中的工作数
            final AtomicInteger count = this.count;
            final ReentrantLock takeLock = this.takeLock;
            // 获取锁
            takeLock.lockInterruptibly();
            try {
                // 如果队列为空,变为阻塞状态
                while (count.get() == 0) {
                    notEmpty.await();
                }
                // 获取队头元素
                x = dequeue();
                // 递减
                c = count.getAndDecrement();
                // 通知消费者
                if (c > 1)
                    notEmpty.signal();
            } finally {
                // 解锁
                takeLock.unlock();
            }
            if (c == capacity)
                // 
                signalNotFull();
            return x;
        }
    

    对比

    相同点

    1. 两者都是通过Condition通知生产者和消费者完成元素的添加和获取
    2. 都可以指定容量

    不同点

    1. ArrayBlockingQueue基于数据,LinkedBlockingQueue基于链表
    2. ArrayBlockingQueue内有一把锁,LinkedBlockingQueue内有两把锁
      微信截图_20210614170048.png
    微信截图_20210614170154.png

    自己动手实现一个阻塞队列

    通过分析源码可以知道,阻塞队列其实是通过通知机制Condition完成生产者和消费的互通。也可以通过Object类中的wait()notifynotifyAll实现。下面是自己写的一个阻塞队列

    public class BlockQueue {
        // 对象锁
        public static final Object LOCK = new Object();
        // 控制变量的值 来通知双方
        public boolean condition;
        
        public void put() {
            synchronized (LOCK) {
                while (condition) {
                    try {
                        // 满了
                        System.out.println("put   队列满了,开始阻塞");
                        LOCK.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                condition = true;
                System.out.println("put   改为true,唤醒消费者");
                LOCK.notifyAll();
            }
        }
    
    
        public void take() {
            synchronized (LOCK) {
                while (!condition) {
                    // 没满
                    System.out.println("take   队列没满,开始阻塞");
                    try {
                        LOCK.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                condition = false;
                System.out.println("take   改为false,唤醒生产者");
                LOCK.notifyAll();
            }
        }
    }
    

    参考文章:

    并发容器之BlockingQueue (juejin.cn)

    BlockingQueue (Java Platform SE 8 ) (oracle.com)


    阅读原文

    相关文章

      网友评论

          本文标题:java中的阻塞队列

          本文链接:https://www.haomeiwen.com/subject/bkuoeltx.html