美文网首页
ArrayBlockingQueue实现原理

ArrayBlockingQueue实现原理

作者: sandy_cheng | 来源:发表于2020-06-07 23:45 被阅读0次

    概述

    ArrayBlockingQueue是常用的的FIFO阻塞队列,实现了BlockingQueue接口,是线程安全的。内部主要通过数组(Array)、锁(ReentrantLock)实现。
    注:此队列中不能存放null元素。

    常用方法

    数据写入

    boolean add(E e)

    插入成功返回true,失败时抛出异常,add方法调用了父类AbstractQueue的add方法:

    public boolean add(E e){
       return super.add(e);
    }
    

    AbstractQueue的add方法:

    public boolean add(E e) {
            if (offer(e))
                return true;
            else
                throw new IllegalStateException("Queue full");
        }
    

    可见,add方法最终调用了本类的offer方法,当offer失败时,抛出"Queue full"异常,成功时返回true。

    boolean offer(E e)

    offer方法不会阻塞写入,写入成功返回true,当队列已满时,写入数据失败返回false,在写入过程中加锁,在使用offer写入数据是线程安全的。

    public boolean offer(E e) {
            //非空验证,如果为空时抛出空指针异常
            checkNotNull(e);
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                //验证写入数量是否等于数组长度,如果相等,不进行写入
                if (count == items.length) 
                    return false;
                else {
                    enqueue(e);
                    return true;
                }
            } finally {
                lock.unlock();
            }
        }
    

    boolean offer(E e, long time, timeunit unit)

    方法加入了超时等待功能,在设定时间内会进行自旋等待,在设定时间内如果没有写入成功,返回false,写入成功返回true,此方法会响应中断:

     public boolean offer(E e, long timeout, TimeUnit unit)
            throws InterruptedException {
            //非空验证,如果为空,会抛出异常  
            checkNotNull(e);
            long nanos = unit.toNanos(timeout);
            final ReentrantLock lock = this.lock;
             //可中断锁,响应中断 
            lock.lockInterruptibly();
            try {
                //自旋等待
                while (count == items.length) {
                    //超时,返回false写入失败
                    if (nanos <= 0)
                        return false;
                    //使用condition进行等待,此时线程进行time-waiting状态
                    nanos = notFull.awaitNanos(nanos);
                }
                enqueue(e);
                return true;
            } finally {
                lock.unlock();
            }
        }
    

    put(E e)

    put方法在添加元素时,如果队列已满,会进入阻塞状态,此方法会响应中断:

    public void put(E e) throws InterruptedException {
            //非空验证,如果为空,会抛出异常
            checkNotNull(e);
            final ReentrantLock lock = this.lock;
            //可响应中断锁
            lock.lockInterruptibly();
            try {
                //自旋
                while (count == items.length)
                    //无限等待,直到notFull.signal或signalAll方法唤醒
                    notFull.await();
                enqueue(e);
            } finally {
                lock.unlock();
            }
        } 
    

    equeue(E e)

    私有方法,equeue是写入底层方法,此方法直接对数组进行操作,将新元素写入到数组中:

        private void enqueue(E x) {
            //this.items即为存储元素的数组
            final Object[] items = this.items;
            //将元素按照顺序写入到数组的指定位置,并更新数组下标(加1)
            items[putIndex] = x;
            //++putIndex将putIndex加1并返回加1后的结果
            if (++putIndex == items.length)
                //当下标更新后达到数长度是,从数组下标为0开始写入,循环往复
                putIndex = 0;
            count++;
            //写入成功后,通知消费线程可继续进行消费
            notEmpty.signal();
        }
    

    数据读取

    E take()

    从阻塞队列中读取一个元素,如果队列中无元素,将进入阻塞状态,如果超时返回null,此方法可响应中断:

    //此方法与put方法类似,dequeue方法是从数组中顺序取出一个元素
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                //此时线程状态是waiting
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
    

    E poll(long timeout, TimeUnit unit)

    在设定时间内取出元素,如果超时,返回为空:

    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        //可响应中断锁
        lock.lockInterruptibly();
        try {
            while (count == 0) {
                //超时,则返回为空
                if (nanos <= 0)
                    return null;
                //进入超时等待
                nanos = notEmpty.awaitNanos(nanos);
            }
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
    

    E peek()

    获取队列头部元素,此方法不会阻塞,如果队列中无元素,返回为空,此方法不会移除队列中的元素:

    public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return itemAt(takeIndex); // null when queue is empty    
        } finally {
            lock.unlock();
        }
    }
    

    E element()

    此方法是AbstractQueue的的方法,ArrayBlockingQueue没有对其重写,element方法调用了子类的peek方法,所以不会删除队列中的元素,当队列为空时,抛出NoSuchElementException:

    public E element() {
        //调用子类的peek方法
        E x = peek();
        if (x != null)
            return x;
        else        
          throw new NoSuchElementException();
    }
    

    boolean remove(Object o)

    删除一个元素,如果要删除的元素为null或者不存在,返回false,否则返回true,此方法不会阻塞线程:

    public boolean remove(Object o) {
        if (o == null) return false;
        final Object[] items = this.items;
        //不可中断锁
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count > 0) {
                final int putIndex = this.putIndex;
                int i = takeIndex;
                //循环比较元素并删除
                do {
                    if (o.equals(items[i])) {
                        //删除指定元素,并重新对数组元素位置进行调整保证FIFO
                        removeAt(i);
                        //删除成功返回true
                        return true;
                    }
                    if (++i == items.length)
                        i = 0;
                } while (i != putIndex);
            }
            return false;
        } finally {
            lock.unlock();
        }
    }
    

    其它方法

    boolean contains(Object o)

    判断是否包含一个元素,是返回true,否返回false,此方法需要遍历队列中的元素,并比较,性能较低:

        public boolean contains(Object o) {
            if (o == null) return false;
            final Object[] items = this.items;
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                if (count > 0) {
                    final int putIndex = this.putIndex;
                    int i = takeIndex;
                    do {
                        if (o.equals(items[i]))
                            return true;
                        if (++i == items.length)
                            i = 0;
                    } while (i != putIndex);
                }
                return false;
            } finally {
                lock.unlock();
            }
        }
    

    int drainTo(Collection<? super E> c)

    一次性获取队列中所有元素,内部调用了int drainTo(Collection<? super E> c, int maxElements)方法,maxElements参数为Integer.MAX_VALUE,优点是一次获取队列中所有元素,此操作会移除队列中锁获取的元素,避免了多次加锁造成的性能开销

    public int drainTo(Collection<? super E> c) {
        return drainTo(c, Integer.MAX_VALUE);
    }
    

    int drainTo(Collection<? super E> c, int maxElements)

    从头部获取指定数量的元素,获取的元素将会被从队列中移除。

    相关文章

      网友评论

          本文标题:ArrayBlockingQueue实现原理

          本文链接:https://www.haomeiwen.com/subject/ilqktktx.html