美文网首页
深入理解Condition

深入理解Condition

作者: 彳亍口巴 | 来源:发表于2020-07-25 13:12 被阅读0次

可重入锁(ReentrantLock)是 synchronized 关键字的扩展,更加灵活。还有一种ReentrantLock应用场景是和Condition搭配使用,实现多线程环境下等待状态条件的功能。Object.wait 和 Object.notify 是和 synchronized 配合使用的,条件变量Condition.await 和 Condition.signal 是和ReentrantLock相关联的。


一个简单Demo

先通过一个Demo看看怎么使用Condition,这里主线程通知条件满足,另一个线程继续运行,可以看到的是Condition.await/signal方法需要和一个ReentrantLock绑定。

public class ReenterLockCondition implements Runnable {
    public static ReentrantLock lock = new ReentrantLock();
    public static Condition condition = lock.newCondition();

    @Override
    public void run() {
        try {
            lock.lock();
            condition.await();
            System.out.println(String.format("条件满足,线程%s运行!", Thread.currentThread().getName()));
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }

    }

    public static void main(String args[]) throws InterruptedException {
        ReenterLockCondition reenterLockCondition = new ReenterLockCondition();
        Thread thread1 = new Thread(reenterLockCondition);
        thread1.setName("T1");
        thread1.start();
        Thread.sleep(2000);
        System.out.println("通知T1条件满足");
        lock.lock();
        condition.signal();
        lock.unlock();
    }
}

JDK并发包中的 ArrayBlockingQueue 就使用了Condition来同步队列的空/满状态。先看条件变量的定义:

/** Main lock guarding all access */
final ReentrantLock lock;

/** Condition for waiting takes */
private final Condition notEmpty;

/** Condition for waiting puts */
private final Condition notFull;

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

ArrayBlockingQueue 构造的时候初始化了2个条件变量:非空,非满,他们都是和同一个重入锁关联的。

元素入队的时候,如果队列一直满的话就一直阻塞等待非满条件为真,否则的话就插入对应的元素,并且通知非空条件为真。

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    notEmpty.signal();
}

同样的,出队的时候,如果队列没有元素就等待非空的条件为真,否则出队,并通知非满条件为真。

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();
    return x;
}

实现分析

接下来看看条件变量是怎么实现的?可重入锁关联的条件变量实现类是AQS内部类ConditionObject,接下来通过其中的await和signal方法的源码看看Condition的实现。

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    
    // 释放关联的锁
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    
    // 如果该节点没有放入同步队列(sync queue),则阻塞等待
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    
    // 节点进入了同步队列,则争用锁,锁获取成功后,await就退出了
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

// 新增一个条件节点
private Node addConditionWaiter() {
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

public final void signal() {
    // 先判断当前线程是否获取了锁,否则异常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

// 摘除头节点,把头节点移到同步队列
private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {
     // CAS 将状态设置为0,0是一个无效的状态
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    /*
     * Splice onto queue and try to set waitStatus of predecessor to
     * indicate that thread is (probably) waiting. If cancelled or
     * attempt to set waitStatus fails, wake up to resync (in which
     * case the waitStatus can be transiently and harmlessly wrong).
     */
    Node p = enq(node);
    int ws = p.waitStatus;
    
    // 唤醒线程
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

// 这个方法是AQS中的,node进入同步队列
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

引用(本文章只供本人学习以及学习的记录,如有侵权,请联系我删除)

深入理解条件变量Condition

相关文章

网友评论

      本文标题:深入理解Condition

      本文链接:https://www.haomeiwen.com/subject/mcuwlktx.html