美文网首页
手写一个堵塞队列

手写一个堵塞队列

作者: xin激流勇进 | 来源:发表于2020-08-27 11:28 被阅读0次

定义队列接口

package cn.school.myconllection;

public interface Queue<E> {
    void offer(E e);

    E take();

    int size();
}

队列实现

package cn.school.myconllection;


import java.util.LinkedList;
import java.util.Random;

public class MyBlockingQueue<E> implements Queue<E> {

    private final int maxSize;
    private final LinkedList<E> elementQueue = new LinkedList<>();
    private final static int DEFAULT_MAX_SIZE = 10;

    /**
     * 阻塞队列中最多可以装入的元素数量
     * @param maxSize
     */
    public MyBlockingQueue(int maxSize) {
        this.maxSize = maxSize;
    }

    public MyBlockingQueue() {
        this(DEFAULT_MAX_SIZE);
    }

    /**
     * 如果阻塞队列内部的容器元素大于或者等于maxSize,此时等待消费者将元素
     * 取出
     * @param e 放入阻塞队列中的元素
     */
    @Override
    public void offer(E e) {
        synchronized (elementQueue) {
            if (elementQueue.size() >= maxSize) {
                try {
                    console(null, "wait");
                    elementQueue.wait();
                } catch (InterruptedException interruptedException) {
                    interruptedException.printStackTrace();
                }
            }
            elementQueue.addLast(e);
            console(e, "offer");
            elementQueue.notifyAll();
        }
    }

    /**
     * 如果内部容器是空的,等待消费者生产元素。
     * 无论是添加还是获取元素的操作,在哪之后都需要
     * 主动调用notifyAll,让等待取消
     * @return
     */
    @Override
    public E take() {
        synchronized (elementQueue) {
            if(elementQueue.isEmpty()) {
                try {
                    console(null, "wait");
                    elementQueue.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            E e = elementQueue.removeFirst();
            console(e, "take");
            elementQueue.notifyAll();
            return e;
        }
    }

    @Override
    public int size() {
        synchronized (elementQueue) {
            return elementQueue.size();
        }
    }

    public void console(E e, String flag) {
        System.out.println(Thread.currentThread().getName() + ":" +flag + ":" + e);
    }

    public static void main(String[] args) {
        MyBlockingQueue<Integer> blockingQueue = new MyBlockingQueue<>();
        new Thread(() -> {
            for(;;) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                blockingQueue.offer(new Random().nextInt());
            }
        }, "product1").start();

        new Thread(() -> {
            for(;;) {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                blockingQueue.offer(new Random().nextInt());
            }
        }, "product2").start();

        new Thread(() -> {
            for(;;) {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                blockingQueue.offer(new Random().nextInt());
            }
        }, "product3").start();

        new Thread(() -> {
            for(;;) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                blockingQueue.take();
            }
        }, "consumer").start();
    }
}

相关文章

网友评论

      本文标题:手写一个堵塞队列

      本文链接:https://www.haomeiwen.com/subject/vulcsktx.html