美文网首页
阻塞队列ArrayBlockingQueue

阻塞队列ArrayBlockingQueue

作者: 萍水相逢_程序员 | 来源:发表于2018-09-19 16:48 被阅读0次

ArrayBlockingQueue说明

一个由数组结构组成的有界阻塞队列。 队列的元素是FIFO(first in first out)。新进入的元素插入队列的尾,从队列的头返回元素。队列大小一旦创建后,就不能改变。如果队列满了,put元素会阻塞,如果take空队列会堵塞等待。

加入数据和取数据 公用一个锁,这个操作无法并行,吞吐量有些影响,插入或删除元素时不会产生或销毁任何额外的对象实例,性能会比队列的好些;

队列接口设计

ArrayBlockingQueue源码分析

public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable{
        

        final Object[] items;
        
        //take poll peek remove
        int takeIndex;
        //put offer add
        int putIndex;
        
        //队列里元素的数量
        int count;
        
        //保护访问的锁
        final ReentrantLock lock;
        
        //Condition for waiting takes
        private final Condition notEmpty;
        
        //Condition for waiting puts
        private final Condition notFull;
        
        //构造方法
        public ArrayBlockingQueue(int capacity){
            
            this(capacity,false);
        }
        
        public ArrayBlockingQueue(int capacity,boolean fair){
            
            if(capacity <= 0){
                throw new IllegalArgumentException();
            }
            this.items = new Object[capacity];
            lock = new ReentrantLock(fair);
            notEmpty = new lock.newCondition();
            notFull = new lock.newCondition();
        }
        
        
        public ArrayBlockingQueue(int capacity,boolean fair, Collection<? extends E> c){
            
            this(capacity,fair);
            
            final ReentrantLock = this.lock;
            lock.lock()//加锁仅仅是为了可见  而不是互斥。 主要是在不同线程对数据操作都要写在主内存 而不是缓存中
            
            try{    
                int i = 0;
                try{
                    
                    for(E e : c)
                        items[i++] = Objects.requireNotNull(e);
                }catch(ArrayIndexOutOfBoundsException ex){
                    throw new IllegalArgumentException();
                }
                
                count = i;
                putIndex = (i == capacity)?0:1;
                
            }finally{
                lock.unlock()
            }
        }
        
        // 不阻塞  队列已经满了会排出异常,IllegalStateException
        public boolean add(E e) {
             //调用到offer  
            return super.add(e);
        }
        
        // 不阻塞  队列已经满了 返回false
        public boolean  offer(E e){
            
            Objects.requireNotNull(e);
            final ReentrantLock lock = this.lock;
            lock.lock();
            try{
                if(count == items.length)
                    return fale
                else{
                    enqueue(e);
                    return true;
                }
            }finally{
                lock.unlock();
            }
            
        }
        
        //如果队列满了 会等待,直到变成可用
        public void put(E e) throw InterruptedException{
            Objects.requireNotNull(e);
            
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try{
                while(count == items.length)
                    //等待
                    notFull.await();
                enqueue(e);
            }finally{
                lock.unlock();
            }
            
        }
        
        public void offer(E e, long timeout, TimeUnit unit) throws InterruptedException{
            
            Objects.requireNotNull(e);
            //  毫微秒  十亿分之一秒
            long nanos = unit.toNanos(timeout)
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try{
                while(count == items.length){
                    if(nanos <= 0L)
                        return false;
                    //返回值表示剩余时间
                    nanos = notFull.awaitNanos(nanos);
                }
                enqueue(e);
                return true;
            }finally{
                lock.unlock();
            }
             
            
        }
        
        //不阻塞 队列为空 直接返回null
        public E poll(){
            final ReentrantLock lock = this.lock;
            lock.lock();
            try{
                return (count == 0) ? null :dequeue();
            }finally{
                lock.unlock();
            }
            
        }
        
        //队列为空  则一直等待, 直到被唤醒
        public E take() throws InterruptedException{
            
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try{
                
                while(count == 0)
                    notEmpty.await();
                return dequeue();
            }finally{
                lock.unlock();
            }
            
        }
        
        //如果队列空,等待,如果超时则返回null
        public E poll(long timeout, TimeUnit unit){
            
            long nanos = unit.toNanos(timeout);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try{
                while(count == 0){
                    if(nanos <= 0){
                        return null;
                    }
                    nanos = notEmpty.awaitNanos(nanos);
                    
                }
                return dequeue();
            }finally{
                lock.unlock();
            }
            
        }
        
        
        //如果队列为空  则返回null
        public E peek(){
            
            final ReentrantLock lock = this.lock;
            lock.lock();
            try{
                return itemAt(takeIndex);
            }finally{
                lock.unlock();
            }
            
        }
        
        //数组的末尾加入元素
        private void enqueue(E x){
            
            final Object[] items = this.items;
            items[putIndex] = x;
            if(putIndex == items.length)
                putIndex = 0;
            count++;
            notEmpty.signal();
        }
        
        private E dequeue(){
            
                final Object[] items = this.items;
                E x = (E)items[takeIndex];
                items[takeIndex] = null;
                if(++takeIndex == items.length)
                    takeIndex = 0;
                count--;
                  if (itrs != null)
                        itrs.elementDequeued();
                notFull.signal();
                return x;
        }
        
    }

相关文章

网友评论

      本文标题:阻塞队列ArrayBlockingQueue

      本文链接:https://www.haomeiwen.com/subject/baiinftx.html