美文网首页
阻塞队列ArrayBlockingQueue

阻塞队列ArrayBlockingQueue

作者: 萍水相逢_程序员 | 来源:发表于2018-09-19 16:48 被阅读0次

    ArrayBlockingQueue说明

    一个由数组结构组成的有界阻塞队列。 队列的元素是FIFO(first in first out)。新进入的元素插入队列的尾,从队列的头返回元素。队列大小一旦创建后,就不能改变。如果队列满了,put元素会阻塞,如果take空队列会堵塞等待。

    加入数据和取数据 公用一个锁,这个操作无法并行,吞吐量有些影响,插入或删除元素时不会产生或销毁任何额外的对象实例,性能会比队列的好些;

    队列接口设计

    ArrayBlockingQueue源码分析

    public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable{
            
    
            final Object[] items;
            
            //take poll peek remove
            int takeIndex;
            //put offer add
            int putIndex;
            
            //队列里元素的数量
            int count;
            
            //保护访问的锁
            final ReentrantLock lock;
            
            //Condition for waiting takes
            private final Condition notEmpty;
            
            //Condition for waiting puts
            private final Condition notFull;
            
            //构造方法
            public ArrayBlockingQueue(int capacity){
                
                this(capacity,false);
            }
            
            public ArrayBlockingQueue(int capacity,boolean fair){
                
                if(capacity <= 0){
                    throw new IllegalArgumentException();
                }
                this.items = new Object[capacity];
                lock = new ReentrantLock(fair);
                notEmpty = new lock.newCondition();
                notFull = new lock.newCondition();
            }
            
            
            public ArrayBlockingQueue(int capacity,boolean fair, Collection<? extends E> c){
                
                this(capacity,fair);
                
                final ReentrantLock = this.lock;
                lock.lock()//加锁仅仅是为了可见  而不是互斥。 主要是在不同线程对数据操作都要写在主内存 而不是缓存中
                
                try{    
                    int i = 0;
                    try{
                        
                        for(E e : c)
                            items[i++] = Objects.requireNotNull(e);
                    }catch(ArrayIndexOutOfBoundsException ex){
                        throw new IllegalArgumentException();
                    }
                    
                    count = i;
                    putIndex = (i == capacity)?0:1;
                    
                }finally{
                    lock.unlock()
                }
            }
            
            // 不阻塞  队列已经满了会排出异常,IllegalStateException
            public boolean add(E e) {
                 //调用到offer  
                return super.add(e);
            }
            
            // 不阻塞  队列已经满了 返回false
            public boolean  offer(E e){
                
                Objects.requireNotNull(e);
                final ReentrantLock lock = this.lock;
                lock.lock();
                try{
                    if(count == items.length)
                        return fale
                    else{
                        enqueue(e);
                        return true;
                    }
                }finally{
                    lock.unlock();
                }
                
            }
            
            //如果队列满了 会等待,直到变成可用
            public void put(E e) throw InterruptedException{
                Objects.requireNotNull(e);
                
                final ReentrantLock lock = this.lock;
                lock.lockInterruptibly();
                try{
                    while(count == items.length)
                        //等待
                        notFull.await();
                    enqueue(e);
                }finally{
                    lock.unlock();
                }
                
            }
            
            public void offer(E e, long timeout, TimeUnit unit) throws InterruptedException{
                
                Objects.requireNotNull(e);
                //  毫微秒  十亿分之一秒
                long nanos = unit.toNanos(timeout)
                final ReentrantLock lock = this.lock;
                lock.lockInterruptibly();
                try{
                    while(count == items.length){
                        if(nanos <= 0L)
                            return false;
                        //返回值表示剩余时间
                        nanos = notFull.awaitNanos(nanos);
                    }
                    enqueue(e);
                    return true;
                }finally{
                    lock.unlock();
                }
                 
                
            }
            
            //不阻塞 队列为空 直接返回null
            public E poll(){
                final ReentrantLock lock = this.lock;
                lock.lock();
                try{
                    return (count == 0) ? null :dequeue();
                }finally{
                    lock.unlock();
                }
                
            }
            
            //队列为空  则一直等待, 直到被唤醒
            public E take() throws InterruptedException{
                
                final ReentrantLock lock = this.lock;
                lock.lockInterruptibly();
                try{
                    
                    while(count == 0)
                        notEmpty.await();
                    return dequeue();
                }finally{
                    lock.unlock();
                }
                
            }
            
            //如果队列空,等待,如果超时则返回null
            public E poll(long timeout, TimeUnit unit){
                
                long nanos = unit.toNanos(timeout);
                final ReentrantLock lock = this.lock;
                lock.lockInterruptibly();
                try{
                    while(count == 0){
                        if(nanos <= 0){
                            return null;
                        }
                        nanos = notEmpty.awaitNanos(nanos);
                        
                    }
                    return dequeue();
                }finally{
                    lock.unlock();
                }
                
            }
            
            
            //如果队列为空  则返回null
            public E peek(){
                
                final ReentrantLock lock = this.lock;
                lock.lock();
                try{
                    return itemAt(takeIndex);
                }finally{
                    lock.unlock();
                }
                
            }
            
            //数组的末尾加入元素
            private void enqueue(E x){
                
                final Object[] items = this.items;
                items[putIndex] = x;
                if(putIndex == items.length)
                    putIndex = 0;
                count++;
                notEmpty.signal();
            }
            
            private E dequeue(){
                
                    final Object[] items = this.items;
                    E x = (E)items[takeIndex];
                    items[takeIndex] = null;
                    if(++takeIndex == items.length)
                        takeIndex = 0;
                    count--;
                      if (itrs != null)
                            itrs.elementDequeued();
                    notFull.signal();
                    return x;
            }
            
        }
    

    相关文章

      网友评论

          本文标题:阻塞队列ArrayBlockingQueue

          本文链接:https://www.haomeiwen.com/subject/baiinftx.html