美文网首页Java并发编程
Java并发编程 - 深入剖析ReentrantLock之非公平

Java并发编程 - 深入剖析ReentrantLock之非公平

作者: HRocky | 来源:发表于2019-03-13 15:50 被阅读15次

这篇文章不是讲ReentrantLock的使用,而是通过调试,分析其实现原理。

非公平锁

测试代码如下:

import java.util.concurrent.locks.ReentrantLock;

public class ReentrantLockExample {

    private ReentrantLock lock = new ReentrantLock();

    public void doSomething() {

        lock.lock();

        try {
            System.out.println(Thread.currentThread().getName() + " has been acquired the lock...");
        } finally {
            lock.unlock();
            System.out.println(Thread.currentThread().getName() + " has been released the lock...");
        }

    }

    public static void main(String[] args) {

        ReentrantLockExample lockExample = new ReentrantLockExample();

        Thread A = new Thread(()->{
            lockExample.doSomething();
        }, "thread-A");

        Thread B = new Thread(()->{
            lockExample.doSomething();
        }, "thread-B");

        Thread C = new Thread(()->{
                lockExample.doSomething();
            }, "thread-C");

        Thread D = new Thread(()->{
                lockExample.doSomething();
        }, "thread-D");

        A.start();
        B.start();
        C.start();
        D.start();
    }
}

代码测试的是4个线程争抢一把锁。

1. 加锁流程解析

通过IDEA多线程调试工具进行调试,调试模拟情景为:A-B-C-D依次获取锁,等所有的获取请求结束,再依次释放锁。

第一步:线程A请求锁

调用ReentrantLock的lock方法:

public void lock() {
    sync.lock();
}

执行到此出,将处理交给sync,我们这里是非公平锁,则sync为NonfairSync,调用方法:

final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
      acquire(1);
}

通过CAS设置state变量,期望当前值是0,更新值是1。state初始值为0,无其他线程更改过,所以这里CAS设置成功,state值变为1。并且exclusiveOwnerThread设置为thread-A对象。

执行图:

thread-A加锁.png

lock对象内容:

thread-A加锁后ReentrantLock对象内容.png

第二步:线程B请求锁

挂起A线程,执行B线程。

线程B执行到此代码:

ReentrantLock.java

final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

此时state=1,此处CAS执行失败,接下来执行else语句代码。

acquire方法在AbstractQueuedSynchronizer类中定义:

AbstractQueuedSynchronizer.java

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
       acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
           selfInterrupt();
}

if条件中有两处,若第一次为false,则第二处会执行。

先来看tryAcquire, tryAcquire在AbstractQueuedSynchronizer重定义,但是没有定义具体实现,具体实现交给子类,我们这里就是NonfairSync类实现。

NonfairSync

 protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

nonfairTryAcquire在NonfairSync直接父类也就是Sync中定义:

Sync

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

此时state=1,exclusiveOwnerThread=thread-A, current = thread-B。此方法内逻辑不执行,返回false。

返回执行acquireQueued方法。

acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

执行这个方法前要先执行addWaiter方法,此方法用于创建即将入同步队列的Node(节点)。

AbstractQueueSynchronizer.java

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

此时tail = null,执行enq方法。

AbstractQueueSynchronizer.java

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}
  • 第一次循环:创建一个Node对象,设置给head。同时将head赋值给tail,这样head和tail指向同一个对象。两者不再为null。
head_tail-0.png
  • 第二次循环:将B节点的前置设置为t(t是中间变量指向的就是head/tail节点),然后将B设置为tail。执行完成后返回t,此时t和head执行同一个对象,其实返回的就是head节点。

执行完成后,head和tail的内容如下:

head_tail-1.png

注意一下,虽然addWaiter方法返回的是头节点,但是node还是指向的B节点。接着执行acquireQueued方法。

AbstractQueuedSynchronizer.java

 final boolean acquireQueued(final Node node, int arg) {// Node此时是B节点
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
  • 第一次循环:p为head节点,tryAcquire和前面一样返回false,第一个if语句不行。紧接着执行shouldParkAfterFailedAcquire方法,方法代码如下:

AbstractQueuedSynchronzier.java

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

此时pred=head, node = B, ws = 0, 执行:

compareAndSetWaitStatus(pred, ws, Node.SIGNAL);

这个执行完成后,head的waitStatus被设置成Node.SINGAL,这个表示head节点的后继者也就是B节点代表的线程需要被通知。返回false。

if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
    interrupted = true;

if代码块不执行。

  • 第二次循环: p还是head节点,tryAcquire和第一次循环一样,第一个if依然不执行,继续执行shouldParkAfterFailedAcquire,此时shouldParkAfterFailedAcquire方法内pre是head节点,而head节点经过第一次循环waitStatus被设置为-1。shouldParkAfterFailedAcquire(p, node)返回true, 则接下来执行parkAndCheckInterrupt()。
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

线程B执行到LockSupport.park(this)处被挂起。

执行图:

thread-B加锁.png

第三步:线程C请求锁

注意:此时线程A是我们主动(通过IDE操作)挂起的(RUNNING),线程B通过上一步的操作被动(park操作)挂起的(WAITTING)。

因为前面已经见到一些逻辑的执行了,这里就不再重复,直接讲关系到Node的操作。

通过addWaiter操作,为线程C创建一个Node节点:

AbstractQueuedSynchronzier.java

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

此时tail为B节点,然后将C节点的prev设置为B,通过CAS操作将C节点设置为tail,设置成功将B的next设置为C。然后直接返回C节点。

执行完成后head和tail的内容如下:

head_tail-2.png

addWaiter返回C节点后,执行acquireQueued方法:

AbstractQueuedSynchronzier.java

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

此时node=C

  • 第一次循环: C的前置节点是B,这里p=B,第一个if不执行,执行

shouldParkAfterFailedAcquire:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

pre=B,B的waitStatus=0,则执行compareAndSetWaitStatus(pred, ws, Node.SIGNAL),B的waitStatus被设为-1,表示后继节点,即C节点所代表的线程需要被唤醒。继续执行线程C被挂起。

执行图:

thread-C加锁.png

第四步:线程D请求锁

线程D请求锁和上面的流程几乎是一样了,这里就不再分别调试,执行图:

thread-D加锁.png

相关文章

网友评论

    本文标题:Java并发编程 - 深入剖析ReentrantLock之非公平

    本文链接:https://www.haomeiwen.com/subject/tmneuqtx.html