美文网首页Java 杂谈技术干货
以LinkedBlockingQueue为例浅谈阻塞队列的实现

以LinkedBlockingQueue为例浅谈阻塞队列的实现

作者: LittleMagic | 来源:发表于2019-04-01 22:53 被阅读40次

目录

最近在阅读Spark源码的过程中,又重新接触到了一些Java并发方面的知识,于是就见缝插针地将它们记录下来,当做复习与备忘。

阻塞队列简介

阻塞队列的定义

根据Doug Lea在JavaDoc中的解释,所谓阻塞队列,就是在普通队列的基础之上,支持以下两种操作的队列:

  • 当某线程从队列获取元素时,如果队列为空,就等待(阻塞)直至队列中有元素;
  • 当某线程向队列插入元素时,如果队列已满,就等待(阻塞)直至队列中有空间。

也就是说,阻塞队列是自带同步机制的队列。它最常用来解决线程同步中经典的生产者-消费者问题,前面讲过的Spark Core异步事件总线中,就采用阻塞队列作为事件存储。

Java中的阻塞队列

Java中阻塞队列的基类是j.u.c.BlockingQueue接口,它继承自Queue接口,并且定义了额外的方法实现同步:

    void put(E e) throws InterruptedException;

    boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;

    E take() throws InterruptedException;

    E poll(long timeout, TimeUnit unit) throws InterruptedException;

上述put()与offer()方法用于向队列插入元素,take()与poll()方法则是从队列获取元素。不同的是,put()与take()方法在插入/获取时,如果必须等待,就会一直阻塞下去;而offer()与poll()方法可以指定阻塞的时间长度。

以BlockingQueue接口为中心的继承关系如下图所示。


平时开发中比较常用的阻塞队列是基于数组实现的ArrayBlockingQueue,与基于单链表实现的LinkedBlockingQueue。本文选择后者来深入看一下阻塞队列的实现细节,因为它的性能在多数情况下更优,可以自行写benchmark程序来测测。

LinkedBlockingQueue

LinkedBlockingQueue(以下简称LBQ)是基于单链表实现的,先进先出(FIFO)的有界阻塞队列。

单链表定义

LBQ的单链表结点数据结构定义在静态内部类Node中。

    static class Node<E> {
        E item;
        Node<E> next;
        Node(E x) { item = x; }
    }

在类的内部还定义了单链表的头结点与尾结点。

    transient Node<E> head;
    private transient Node<E> last;

head始终指向链表的第一个结点,该结点是哨兵结点,不存储数据,只标记链表的开始,即head.item == null。这样可以避免只有一个结点时造成混乱。
tail始终指向链表的最后一个结点,该结点是有数据的,并满足last.next == null

LBQ在队头获取及弹出元素,在队尾插入元素。

锁和等待队列

LBQ采用双锁机制保证入队和出队可以同时进行,互不干扰。

    private final ReentrantLock takeLock = new ReentrantLock();
    private final Condition notEmpty = takeLock.newCondition();
    private final ReentrantLock putLock = new ReentrantLock();
    private final Condition notFull = putLock.newCondition();

可见定义有两个ReentrantLock,takeLock用于控制出队,putLock用于控制入队。另外,还有这两个锁分别对应的条件变量notEmpty和notFull,分别维护出队和入队线程的等待队列。ReentrantLock和Condition都是Java AQS机制的重要组成部分,之后也会细说。

值得注意的是,在某些方法中需要同时对takeLock与putLock加锁与解锁,所以LBQ内部也提供了这样的方法。

    void fullyLock() {
        putLock.lock();
        takeLock.lock();
    }

    void fullyUnlock() {
        takeLock.unlock();
        putLock.unlock();
    }

这两个方法总会成对调用,保证所有需要同时加锁和解锁的地方,其顺序都一致并且不可中断,也防止了前一个锁操作成功执行,后一个锁操作被打断导致死锁的风险。

另外,LBQ也对条件变量的Condition.signal()方法进行了简单封装,分别用来唤醒阻塞的出队操作线程和入队操作线程。

    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }

    private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            notFull.signal();
        } finally {
            putLock.unlock();
        }
    }
容量和计数
    private final int capacity;
    private final AtomicInteger count = new AtomicInteger();

capacity是LBQ的最大容量,可以在构造方法中随同名参数传入,默认值是Integer.MAX_VALUE。

    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }

count则是LBQ内当前元素的计数,由于入队和出队动作可以并发执行,所以要用原子类型AtomicInteger保证线程安全。

入队操作

由于put()和offer()方法的逻辑基本相同,所以只看offer()方法就好了。

    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        if (e == null) throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            while (count.get() == capacity) {
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(new Node<E>(e));
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return true;
    }

    private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        last = last.next = node;
    }

在入队时,首先将putLock加锁,然后用卫语句count.get() == capacity判断队列是否已满,若已满,则进入等待循环。当阻塞的时间超时后,判定入队操作失败,并返回false。
如果队列未满,或者在超时时间未到时有了空间,就调用enqueue()方法在队尾插入元素,并将计数器自增。入队后若还有更多的剩余空间,则唤醒其他等待的入队线程。
最后将putLock解锁,并检查由count.getAndIncrement()返回的值是否为0。如果为0,表示队列刚刚由空变为非空状态,因此也要唤醒等待的出队线程。

出队操作

同理,只看poll()方法。

    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E x = null;
        int c = -1;
        long nanos = unit.toNanos(timeout);
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

    private E dequeue() {
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null;
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }

将讲解入队方法时的描述反着说一遍就行了:
在出队时,首先将takeLock加锁,然后用卫语句count.get() == 0判断队列是否为空,若为空,则进入等待循环。当阻塞的时间超时后,判定出队操作失败,并返回false。
如果队列不为空,或者在超时时间未到时进了新元素,就调用dequeue()方法弹出队头元素,并将计数器自减。出队后若还有更多的剩余元素,则唤醒其他等待的出队线程。
最后将takeLock解锁,并检查由count.getAndDecrement()返回的值是否为capacity。如果为capacity,表示队列刚刚由满变为不满状态,因此也要唤醒等待的入队线程。

需要操作双锁的情况

以remove()方法为例。

    public boolean remove(Object o) {
        if (o == null) return false;
        fullyLock();
        try {
            for (Node<E> trail = head, p = trail.next;
                 p != null;
                 trail = p, p = p.next) {
                if (o.equals(p.item)) {
                    unlink(p, trail);
                    return true;
                }
            }
            return false;
        } finally {
            fullyUnlock();
        }
    }

    void unlink(Node<E> p, Node<E> trail) {
        // assert isFullyLocked();
        // p.next is not changed, to allow iterators that are
        // traversing p to maintain their weak-consistency guarantee.
        p.item = null;
        trail.next = p.next;
        if (last == p)
            last = trail;
        if (count.getAndDecrement() == capacity)
            notFull.signal();
    }

由于单链表删除结点涉及到对链表的遍历,以及对前驱和后继结点的断链和补链,因此必须将两个锁都加上,禁止一切修改。待删除成功后才能解锁,继续正常的入队和出队操作。

生产者-消费者问题示例

生产者-消费者问题的解决方法用操作系统理论中的信号量PV(wait-signal)原语描述如下:

semaphore filled = 0;
semaphore empty = BUF_CAPACITY;
mutex_semaphore mutex = 1;

procedure producer() {
  while (true) {
    item = produce();
    wait(empty);
    wait(mutex);
    buffer.put(item);
    signal(mutex);
    signal(filled);
  }
}

procedure consumer() {
  while (true) {
    wait(filled);
    wait(mutex);
    item = buffer.get();
    signal(mutex);
    signal(empty);
    consume(item);
  }
}

利用阻塞队列可以免去自己实现同步机制的麻烦,从而非常方便地实现。一个极简的示例如下:

public class ProducerConsumerExample {
    private static final int BUF_CAPACITY = 16;

    public static void main(String[] args) {
        BlockingQueue<Long> blockingQueue = new LinkedBlockingQueue<>(BUF_CAPACITY);

        Thread producerThread = new Thread(() -> {
            try {
                while (true) {
                    long value = System.currentTimeMillis() % 1000;
                    blockingQueue.put(value);
                    Thread.sleep(value);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "producer");

        Thread consumerThread = new Thread(() -> {
            try {
                while (true) {
                    System.out.println(blockingQueue.take());
                    Thread.sleep(System.currentTimeMillis() % 1000);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "consumer");

        producerThread.start();
        consumerThread.start();
    }
}

一个小(?)问题

在上面的代码(以及j.u.c包中很多类的代码)的方法体中,经常能看到类似以下的语句:

        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;

也就是有些类中定义的字段,在方法中使用时会先赋值给一个局部变量。这样做到底是为了什么?以目前我所了解到的而言,还没有特别确切的答案,但可以确定是一个非常微小的优化,与JVM及缓存有关。

以下是reference传送门:

顺便,StackOverflow最近(不知道是哪一天)改版成了1998年的样式,满满的怀旧感。上面concurrency-interest邮件列表中关于这个问题也是众说纷纭,如果仔细爬楼还会发现Doug Lea本人的回复,不过有些令人费解。

相关文章

网友评论

    本文标题:以LinkedBlockingQueue为例浅谈阻塞队列的实现

    本文链接:https://www.haomeiwen.com/subject/qqmvbqtx.html