美文网首页
java基础-手写ArrayBlockingQueue

java基础-手写ArrayBlockingQueue

作者: 巨子联盟 | 来源:发表于2018-07-26 16:40 被阅读0次

    -##### ArrayBlockingQueue

    package com.byedbl.queue;
    import java.io.Serializable;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class MyArrayBlockingQueue<E> implements Serializable {
        private static final long serialVersionUID = 102615140181641502L;
    
        private final E[] items;
    
        private int takeIndex;
    
        /** items index for next put, offer, or add. */
        private int putIndex;
    
        private final ReentrantLock lock;
    
        /** Condition for waiting takes */
        private final Condition notEmpty;
        /** Condition for waiting puts */
        private final Condition notFull;
    
        private int count;
    
        public MyArrayBlockingQueue(int capacity) {
            if (capacity <= 0)
                throw new IllegalArgumentException();
            this.items = (E[]) new Object[capacity];
            lock = new ReentrantLock();
            notEmpty = lock.newCondition();
            notFull = lock.newCondition();
        }
    
        public boolean add(E e) {
            if (offer(e)) {
                return true;
            } else {
                throw new IllegalStateException("Queue full");
            }
        }
    
        public boolean offer(E e) {
            if (e == null)
                throw new NullPointerException();
            lock.lock();
            try {
                if (count == items.length)
                    return false;
                else {
                    insert(e);
                    return true;
                }
            } finally {
                lock.unlock();
            }
        }
    
        // 循环增加
        private final int inc(int i) {
            return (++i == items.length) ? 0 : i;
        }
    
        private void insert(E e) {
            items[putIndex] = e;
            putIndex = inc(putIndex);
            ++count;
            // 唤醒非空线程
            notEmpty.signal();
        }
    
        private E extract() {
            final E[] items = this.items;
            E e = items[takeIndex];
            items[takeIndex] = null;
            takeIndex = inc(takeIndex);
            --count;
            notFull.signal();
            return e;
        }
    
        public boolean offer(E e, long timeout, TimeUnit unit)
                throws InterruptedException {
            if (e == null)
                throw new NullPointerException();
            long nanos = unit.toNanos(timeout);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    // 添加新元素
                    if (count != items.length) {
                        insert(e);
                        return true;
                    }
                    // 没时间了就返回false
                    if (nanos <= 0) {
                        return false;
                    }
                    // 等待
                    try {
                        // 返回的nanos值会减去传入的nanos值,所以基本等一次就会为<=0了
                        nanos = notFull.awaitNanos(nanos);
                    } catch (InterruptedException e1) {
                        notFull.signal();
                        throw e1;
                    }
                }
            } finally {
                lock.unlock();
            }
        }
    
        public void put(E e) throws InterruptedException {
            if (e == null)
                throw new NullPointerException();
            final E[] items = this.items;
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                try {
                    while (count == items.length) {
                        notFull.await();
                    }
                } catch (InterruptedException e1) {
                    notFull.signal();
                    throw e1;
                }
                insert(e);
            } finally {
                lock.unlock();
            }
        }
    
        public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                try {
                    while (count == 0) {
                        notEmpty.await();
                    }
                } catch (InterruptedException e1) {
                    notEmpty.signal();
                    throw e1;
                }
                E e = extract();
                return e;
            } finally {
                lock.unlock();
            }
        }
    
        public E poll() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                if (count == 0) {
                    return null;
                }
                return extract();
            } finally {
                lock.unlock();
            }
        }
    
        public E poll(long timeout, TimeUnit unit) throws InterruptedException {
            long nanos = unit.toNanos(timeout);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    if (count == 0) {
                        try {
                            nanos = notEmpty.awaitNanos(nanos);
                        } catch (InterruptedException e) {
                            notEmpty.signal();
                            throw e;
                        }
                    }
                    if (nanos <= 0) {
                        return null;
                    }
                    return extract();
                }
            } finally {
                lock.unlock();
            }
        }
    
        public E peek() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return (count == 0) ? null : items[takeIndex];
            } finally {
                lock.unlock();
            }
        }
    
        public int size() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return count;
            } finally {
                lock.unlock();
            }
        }
    
        public boolean remove(E e) {
            if (e == null)
                throw new NullPointerException();
            final E[] items = this.items;
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                int i = takeIndex;
                int k = 0;
                for (;;) {
                    if (k++ >= count)
                        return false;
                    if (e.equals(items[i])) {
                        removeAt(i);
                        return true;
                    }
                    i = inc(i);
                }
            } finally {
                lock.unlock();
            }
        }
    
        private void removeAt(int i) {
            final E[] items = this.items;
            if(i == takeIndex){
                items[takeIndex] = null;
                takeIndex = inc(takeIndex);
            }else{
                for(;;){
                    int nexti = inc(i);
                    if(nexti != putIndex){
                        items[i] = items[nexti];
                        i = nexti;
                    }else{
                        items[i] = null;
                        putIndex = i;
                        break;
                    }
                }
            }
            --count;
            notFull.signal();
            
        }
        
        public boolean contains(Object o) {
            if (o == null) return false;
            final E[] items = this.items;
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                int i = takeIndex;
                int k = 0;
                while (k++ < count) {
                    if (o.equals(items[i]))
                        return true;
                    i = inc(i);
                }
                return false;
            } finally {
                lock.unlock();
            }
        }
        public String toString() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return super.toString();
            } finally {
                lock.unlock();
            }
        }
        
    }
    
    
    • 测试

    package com.byedbl.queue;
    
    import java.util.Random;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class MyArrayBlockingQueueTest {
    
        public static void main(String[] args) throws InterruptedException {
            final ReentrantLock lock = new ReentrantLock();
            final Condition producer = lock.newCondition();
            final Condition consumer = lock.newCondition();
    
            final MyArrayBlockingQueue<Integer> queue = new MyArrayBlockingQueue<Integer>(2);
            // for(int i=0;i<6;i++){
            // queue.add(i);
            // }
    
            for (int i = 0; i < 2; i++) {
                queue.add(i);
            }
    
            
            Thread c = new Thread(new Runnable() {
                
                @Override
                public void run() {
                    for(;;){
                        lock.lock();
                        try{
                            if(queue.size()>0){
                                Integer value = queue.poll();
                                System.out.println("poll a value:"+value);
                                producer.signal();
                                try {
                                    consumer.await();
                                } catch (InterruptedException e) {
                                    e.printStackTrace();
                                }
                                
                            }
                        }finally{
                            lock.unlock();
                        }
                    }
                    
                    
                }
            });
            c.start();
    
            Thread p = new Thread(new Runnable() {
    
                @Override
                public void run() {
                    for(;;){
                        int value = new Random().nextInt(100);
                        lock.lock();
                        try{
                            if (!queue.contains(value)) {
                                queue.add(value);
                                System.out.println("add a value:"+value);
                                consumer.signal();
                                try {
                                    producer.await();
                                } catch (InterruptedException e) {
                                    e.printStackTrace();
                                }
                                
                            }
                        }finally{
                            lock.unlock();
                        }
                            
                    }
                    
                }
            });
            
            p.start();
            Thread.sleep(50000);
            System.out.println(queue.size());
            System.out.println(queue.toString());
        }
    }
    
    

    相关文章

      网友评论

          本文标题:java基础-手写ArrayBlockingQueue

          本文链接:https://www.haomeiwen.com/subject/zzvbmftx.html