美文网首页
手写一个堵塞队列

手写一个堵塞队列

作者: xin激流勇进 | 来源:发表于2020-08-27 11:28 被阅读0次

    定义队列接口

    package cn.school.myconllection;
    
    public interface Queue<E> {
        void offer(E e);
    
        E take();
    
        int size();
    }
    
    

    队列实现

    package cn.school.myconllection;
    
    
    import java.util.LinkedList;
    import java.util.Random;
    
    public class MyBlockingQueue<E> implements Queue<E> {
    
        private final int maxSize;
        private final LinkedList<E> elementQueue = new LinkedList<>();
        private final static int DEFAULT_MAX_SIZE = 10;
    
        /**
         * 阻塞队列中最多可以装入的元素数量
         * @param maxSize
         */
        public MyBlockingQueue(int maxSize) {
            this.maxSize = maxSize;
        }
    
        public MyBlockingQueue() {
            this(DEFAULT_MAX_SIZE);
        }
    
        /**
         * 如果阻塞队列内部的容器元素大于或者等于maxSize,此时等待消费者将元素
         * 取出
         * @param e 放入阻塞队列中的元素
         */
        @Override
        public void offer(E e) {
            synchronized (elementQueue) {
                if (elementQueue.size() >= maxSize) {
                    try {
                        console(null, "wait");
                        elementQueue.wait();
                    } catch (InterruptedException interruptedException) {
                        interruptedException.printStackTrace();
                    }
                }
                elementQueue.addLast(e);
                console(e, "offer");
                elementQueue.notifyAll();
            }
        }
    
        /**
         * 如果内部容器是空的,等待消费者生产元素。
         * 无论是添加还是获取元素的操作,在哪之后都需要
         * 主动调用notifyAll,让等待取消
         * @return
         */
        @Override
        public E take() {
            synchronized (elementQueue) {
                if(elementQueue.isEmpty()) {
                    try {
                        console(null, "wait");
                        elementQueue.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                E e = elementQueue.removeFirst();
                console(e, "take");
                elementQueue.notifyAll();
                return e;
            }
        }
    
        @Override
        public int size() {
            synchronized (elementQueue) {
                return elementQueue.size();
            }
        }
    
        public void console(E e, String flag) {
            System.out.println(Thread.currentThread().getName() + ":" +flag + ":" + e);
        }
    
        public static void main(String[] args) {
            MyBlockingQueue<Integer> blockingQueue = new MyBlockingQueue<>();
            new Thread(() -> {
                for(;;) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    blockingQueue.offer(new Random().nextInt());
                }
            }, "product1").start();
    
            new Thread(() -> {
                for(;;) {
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    blockingQueue.offer(new Random().nextInt());
                }
            }, "product2").start();
    
            new Thread(() -> {
                for(;;) {
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    blockingQueue.offer(new Random().nextInt());
                }
            }, "product3").start();
    
            new Thread(() -> {
                for(;;) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    blockingQueue.take();
                }
            }, "consumer").start();
        }
    }
    
    

    相关文章

      网友评论

          本文标题:手写一个堵塞队列

          本文链接:https://www.haomeiwen.com/subject/vulcsktx.html