美文网首页
Java生产者消费者实现与ArrayBlockingQueue

Java生产者消费者实现与ArrayBlockingQueue

作者: 柚子过来 | 来源:发表于2018-06-20 00:12 被阅读0次

生产者消费者模式中主要是两点:1、当没有资源时消费者阻塞 。2、当生产者生产资源后需要通知消费者。在Java中利用锁以及wait/notify操作可以简单实现该模式,代码如下:

 public class CP {
    private static ArrayList<String> cp = new ArrayList<>();
    private static Object lock = new Object();

     static class Consumer implements Runnable{
        @Override
        public void run() {
            synchronized (lock) {
                while (true) {
                    while (cp.isEmpty()){      //防止虚假唤醒
                        try {
                            lock.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    System.out.println("consuming"+cp.remove(0));
                }
            }

        }
    }

    static class Producer implements Runnable{
        @Override
        public void run() {
            for(int i = 0;i<5;i++) {
                synchronized (lock) {
                    cp.add("hello");
                    System.out.println("add hello");
                    lock.notify();
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

    public static void main(String[] args) {
        Producer producer = new Producer();
        Consumer consumer = new Consumer();
        Thread pro = new Thread(producer);
        Thread con = new Thread(consumer);
        pro.start();
        con.start();
    }
}

除了内置锁与wait/notify实现以外,还可以使用显示锁与await/signal实现,在Java同步队列ArrayBlockingQueue中就是一个很好的实现例子,并且在ArrayBlockingQueue体现了多个Condition协调作用的优势。

ArrayBlockingQueue定义了三个与同步生产消费相关的域,通过notEmpty、notFull来控制生产与消费的阻塞与唤醒:

 /** Main lock guarding all access */
final ReentrantLock lock;

/** Condition for waiting takes */
private final Condition notEmpty;

/** Condition for waiting puts */
private final Condition notFull;

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

队列有很多入队出队的方法,这里主要看put和take:

put:

队列满时put操作会阻塞,当有出队操作发生会调用signal方法唤醒阻塞的put。当put成功时同样会调用signal方法唤醒阻塞的take。

  public void put(E e) throws InterruptedException {
    Objects.requireNonNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await();        //队满阻塞
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

private void enqueue(E e) {
    // assert lock.isHeldByCurrentThread();
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = e;
    if (++putIndex == items.length) putIndex = 0;
    count++;
    notEmpty.signal();    //唤醒因队空阻塞的线程
}
take:

队列空时,take操作会阻塞,当有put操作完成并调用signal之后,take才会被唤醒。

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

 private E dequeue() {
    // assert lock.isHeldByCurrentThread();
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E e = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length) takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();
    return e;
}

与ArrayBlockingQueue类似的还有LinkedBlockingQueue,它使用两个锁来并行控制put与take的同步(在ArrayBlockingQueue中因为是使用同一个lock,所以put和take不能同时被执行),所以效率高一点:

transient LinkedBlockingQueue.Node<E> head;
private transient LinkedBlockingQueue.Node<E> last;
private final ReentrantLock takeLock;
private final Condition notEmpty;
private final ReentrantLock putLock;
private final Condition notFull;

相关文章

网友评论

      本文标题:Java生产者消费者实现与ArrayBlockingQueue

      本文链接:https://www.haomeiwen.com/subject/boazeftx.html