美文网首页程序员
Java - LinkedBlockingQueue的阻塞实现

Java - LinkedBlockingQueue的阻塞实现

作者: 夹胡碰 | 来源:发表于2021-01-01 21:09 被阅读0次

    LinkedBlockingQueueBlockingQueue的链表实现,他的阻塞体现在puttake方法上,下面将通过源码介绍如何LinkedBlockingQueue是如何实现的阻塞队列。

    1. ReentrantLock+Condition

    通过AQS构建的ReentrantLockCondition实现了puttake的锁与线程挂起/唤醒模型

    /** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();
    
    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();
    
    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();
    
    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();
    
    2. put阻塞

    1)首先加put
    2)当容量满时(count.get() == capacity),执行notFull.await();挂起当前线程并释放锁,相当于Object.wait
    3)当容量从0变成1时(if c == 0),执行signalNotEmpty();唤醒一个take挂起线程,相当于Object.notify

    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly(); // put锁
        try {
            while (count.get() == capacity) {
                notFull.await(); // 容量满时,挂起线程释放锁,并加入等待队列
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty(); // 当容量从0变成1时,唤醒一个take挂起的线程
    }
    
    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            notEmpty.signal();// 唤醒一个take挂起的线程
        } finally {
            takeLock.unlock();
        }
    }
    
    3. take阻塞

    1)首先加take
    2)当容量为0时(count.get() == 0),执行notEmpty.await();挂起当前线程并释放锁,相当于Object.wait
    3)当容量从满容量减少1时(if c == capacity),执行signalNotFull();唤醒一个put挂起线程,相当于Object.notify

    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly(); // take锁
        try {
            while (count.get() == 0) {
                notEmpty.await(); // 当队列中没有对象时,挂起线程释放锁,并加入等待队列
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull(); // 当容量从满容量减少1时,唤醒一个put挂起的线程
        return x;
    }
    
    private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            notFull.signal(); // 唤醒一个put挂起的线程
        } finally {
            putLock.unlock();
        }
    }
    
    4. 拓展: offer(timeout) 与 poll(timeout)实现原理

    实现与put/take基本一样,只不过底层调用了LockSupport.parkNanos/LockSupport.unparkNanos

    • wait
    public final void await() throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        Node node = addConditionWaiter();
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {
            LockSupport.park(this); // 基础park
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }
    
    • wait timeout
    public final long awaitNanos(long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        Node node = addConditionWaiter();
        int savedState = fullyRelease(node);
        final long deadline = System.nanoTime() + nanosTimeout;
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {
            if (nanosTimeout <= 0L) {
                transferAfterCancelledWait(node);
                break;
            }
            if (nanosTimeout >= spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout); // timeout park
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
            nanosTimeout = deadline - System.nanoTime();
        }
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null)
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
        return deadline - System.nanoTime();
    }
    

    相关文章

      网友评论

        本文标题:Java - LinkedBlockingQueue的阻塞实现

        本文链接:https://www.haomeiwen.com/subject/sooqoktx.html