美文网首页
java 固定大小的queue thread safe 线程安全

java 固定大小的queue thread safe 线程安全

作者: 乘以零 | 来源:发表于2017-07-14 10:46 被阅读0次

    用于统计特别好用( 数据时刻更新 但只取最近的几次数据) 采用数组实现

    
    package com.yiwugou.homer.core;
    
    import java.util.AbstractQueue;
    import java.util.Iterator;
    import java.util.NoSuchElementException;
    import java.util.Queue;
    import java.util.concurrent.locks.ReentrantLock;
    
    /**
     *
     * LimitQueue
     *
     * @author zhanxiaoyong@yiwugou.com
     *
     * @since 2017年7月14日 上午10:45:42
     */
    public class LimitQueue<E> extends AbstractQueue<E> implements Queue<E>, java.io.Serializable {
    
        private static final long serialVersionUID = 4361189502222039104L;
    
        /** The queued items */
        final Object[] items;
    
        /** items index for next take, poll, peek or remove */
        int nextTakeIndex;
    
        /** items index for next put, offer, or add */
        int nextPutIndex;
    
        /** Number of elements in the queue */
        int count;
    
        /** Main lock guarding all access */
        final ReentrantLock lock;
    
        public LimitQueue(int capacity) {
            this.items = new Object[capacity];
            this.lock = new ReentrantLock();
        }
    
        @Override
        public boolean offer(E e) {
            checkNotNull(e);
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                if (this.items[this.nextPutIndex] != null) {
                    this.nextTakeIndex = this.inc(this.nextTakeIndex);
                }
                this.items[this.nextPutIndex] = e;
                this.nextPutIndex = this.inc(this.nextPutIndex);
                this.count = (this.items.length <= this.count) ? this.items.length : (this.count + 1);
                return true;
            } finally {
                lock.unlock();
            }
        }
    
        @Override
        public E poll() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                if (this.count == 0) {
                    return null;
                } else {
                    final Object[] items = this.items;
                    E x = (E) (items[this.nextTakeIndex]);
                    items[this.nextTakeIndex] = null;
                    this.nextTakeIndex = this.inc(this.nextTakeIndex);
                    --this.count;
                    return x;
                }
            } finally {
                lock.unlock();
            }
        }
    
        @Override
        public E peek() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return (this.count == 0) ? null : (E) this.items[(this.nextTakeIndex)];
            } finally {
                lock.unlock();
            }
        }
    
        @Override
        public Iterator<E> iterator() {
            return new Itr();
        }
    
        @Override
        public int size() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return this.count;
            } finally {
                lock.unlock();
            }
        }
    
        final int inc(int i) {
            return (++i == this.items.length) ? 0 : i;
        }
    
        final int dec(int i) {
            return ((i == 0) ? this.items.length : i) - 1;
        }
    
        private static void checkNotNull(Object v) {
            if (v == null) {
                throw new NullPointerException("LimitQueue item must not be null!");
            }
        }
    
        void removeAt(int i) {
            final Object[] items = this.items;
            if (i == this.nextTakeIndex) {
                items[this.nextTakeIndex] = null;
                this.nextTakeIndex = this.inc(this.nextTakeIndex);
            } else {
                for (;;) {
                    int nexti = this.inc(i);
                    if (nexti != this.nextPutIndex) {
                        items[i] = items[nexti];
                        i = nexti;
                    } else {
                        items[i] = null;
                        this.nextPutIndex = i;
                        break;
                    }
                }
            }
            --this.count;
        }
    
        private class Itr implements Iterator<E> {
            private int remaining; // Number of elements yet to be returned
            private int nextIndex; // Index of element to be returned by next
            private E nextItem; // Element to be returned by next call to next
            private E lastItem; // Element returned by last call to next
            private int lastRet; // Index of last element returned, or -1 if none
    
            Itr() {
                final ReentrantLock lock = LimitQueue.this.lock;
                lock.lock();
                try {
                    this.lastRet = -1;
                    if ((this.remaining = LimitQueue.this.count) > 0) {
                        this.nextIndex = LimitQueue.this.nextTakeIndex;
                        this.nextItem = (E) LimitQueue.this.items[this.nextIndex];
                    }
                } finally {
                    lock.unlock();
                }
            }
    
            @Override
            public boolean hasNext() {
                return this.remaining > 0;
            }
    
            @Override
            public E next() {
                final ReentrantLock lock = LimitQueue.this.lock;
                lock.lock();
                try {
                    if (this.remaining <= 0) {
                        throw new NoSuchElementException();
                    }
                    this.lastRet = this.nextIndex;
                    E x = (E) LimitQueue.this.items[this.nextIndex]; // check for
                                                                     // fresher
                                                                     // value
                    if (x == null) {
                        x = this.nextItem; // we are forced to report old value
                        this.lastItem = null; // but ensure remove fails
                    } else {
                        this.lastItem = x;
                    }
                    while (--this.remaining > 0
                            && (this.nextItem = (E) LimitQueue.this.items[(this.nextIndex = LimitQueue.this
                                    .inc(this.nextIndex))]) == null) {
                        ;// skip over nulls
                    }
                    return x;
                } finally {
                    lock.unlock();
                }
            }
    
            @Override
            public void remove() {
                final ReentrantLock lock = LimitQueue.this.lock;
                lock.lock();
                try {
                    int i = this.lastRet;
                    if (i == -1) {
                        throw new IllegalStateException();
                    }
                    this.lastRet = -1;
                    E x = this.lastItem;
                    this.lastItem = null;
                    // only remove if item still at index
                    if (x != null && x == LimitQueue.this.items[i]) {
                        boolean removingHead = (i == LimitQueue.this.nextTakeIndex);
                        LimitQueue.this.removeAt(i);
                        if (!removingHead) {
                            this.nextIndex = LimitQueue.this.dec(this.nextIndex);
                        }
                    }
                } finally {
                    lock.unlock();
                }
            }
        }
    
        
    }
    
    
    

    相关文章

      网友评论

          本文标题:java 固定大小的queue thread safe 线程安全

          本文链接:https://www.haomeiwen.com/subject/wsynhxtx.html