美文网首页
LinkedBlockingQueue分析

LinkedBlockingQueue分析

作者: gczxbb | 来源:发表于2018-05-24 18:37 被阅读1次

LinkedBlockingQueue内部结构采用链式存储,元素节点Node,封装对象E泛型,Node#next指向下一个节点。

static class Node<E> {
    E item;
    Node<E> next;
    Node(E x) { item = x; }
}
存储结构图。 阻塞队列LinkedBlockingQueue链式存储结构.jpg

LinkedBlockingQueue#构造方法。

public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node<E>(null);
}

参数说明
capacity:容量,默认是Integer.MAX_VALUE。
count:当前队列元素数量,AtomicInteger线程同步,可根据count与capactiy比较值作为是否已满的依据。
head/last指针:初始化创建第一个Node节点作为头结点与尾节点,封装内容是null。


LinkedBlockingQueue插入节点

put/offer方法:向尾部插入元素,加锁putLock。区别是,若队列已满,put方法线程阻塞等待,当有空余位置时,被唤醒,继续插入,offer直接返回插入失败。

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();//以下代码段加锁
    try {
        while (count.get() == capacity) {
            notFull.await();
        }
        enqueue(node); // 到这里插入,说明有空余至少一个。
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();//若还有至少一个空余,通知其他等待的不满信号。
    } finally {
        putLock.unlock();//解锁
    }
    if (c == 0)
        signalNotEmpty();
}
  • 插入对象不能是空。

  • 封装E对象Node节点。

  • 线程并发,一定要ReentrantLock加锁,put锁,确保其他线程此刻无法访问上锁的代码段。

  • 判断count,count是AtomicInteger类型,并发保持同步。若达到最大容量capacity,此时线程将阻塞,让出锁,notFull不满条件等待,即等待不满信号唤醒。

  • 若未达到最大容量,在队列尾部enqueue插入Node节点。

  • count自增,返回的c仍是count自增前的值,若+1还小于capacity,说明此次插入后还有空余空间,发出未满通知信号。
    这里为什么会发出一个notFull未满信号呢?
    线程A1执行put方法时,获取putLock锁,其他线程无法获取该锁,当A1线程发现队列已满,在notFull上等待,出让putLock锁,线程A2获取该锁,同样在notFull上等待,此刻,存在多个阻塞的插入线程A1.2.3...都在notFull#await导致阻塞,出让putLock锁,在notFull上等待。
    直到take/poll方法获取数据,队列出现空位,signalNotFull执行notFull#signal方法,在notFull上有线程在等待,唤醒A1,signal只唤醒notFull上的一个线程。A1抢到putLock锁,执行元素插入,此刻其他线程仍休眠。enqueue插入成功,再次判断是否有多余容量,若有,notFull#signal继续唤醒在notFull上休眠的其他线程。若没有notFull上的等待线程,执行signal也没有影响,重点在于,它会继续唤醒其他notFull等待的线程。

  • ReentrantLock解锁

  • 若c==0,说明插入之前队列是空,插入后不空,一定要发出不空信号NotEmpty,让阻塞在NotEmpty上的读取线程唤醒。

public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    if (count.get() == capacity)
        return false;
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        if (count.get() < capacity) {
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        }
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return c >= 0;
}

offer方法比较简单,判断count未达到最大容量capactiy,插入节点。同理,根据空余空间发出未满信号。c的初始值是-1,若成功插入,则一定会赋值成一个>=0的值,因此。offer的返回的值可判断是否成功插入,offer不会阻塞。


LinkedBlockingQueue取出节点

take/poll方法:从队列头部加锁takeLock获取元素。若队列为空,take方法获取线程阻塞等待,有元素时被唤醒。poll返回失败。

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

分析逻辑与put一致。

生产者与消费者模型,元素在头部获取,尾部插入。多线程并发操作时,采用两种锁保证线程同步,互相独立,并行操作队列元素,数量更新AtomicInteger同步。


任重而道远

相关文章

网友评论

      本文标题:LinkedBlockingQueue分析

      本文链接:https://www.haomeiwen.com/subject/uqsudftx.html