美文网首页Java
ReentranLock源码浅析

ReentranLock源码浅析

作者: 小金狗子 | 来源:发表于2020-01-21 14:47 被阅读0次

    本文为作者个人理解难免有误,欢迎指正

    ReentranLock 的基本构成

    
    private final Sync sync;
    
    abstract static class Sync extends AbstractQueuedSynchronizer{...}
    
    static final class NonfairSync extends Sync{...}
    
    static final class FairSync extends Sync{...}
    
    

    常用的lock方法具体实现是在FairSync和NonfairSync。Nonfaire体现在lock时尝试直接获取锁,如果获取失败,则使用和公平锁一样通过AbstractQueuedSynchronizer的acquire去加锁。

    加锁

    
    final void lock() {
    
          if (compareAndSetState(0, 1))
    
            setExclusiveOwnerThread(Thread.currentThread());
    
          else
    
            acquire(1);
    
    }
    
    

    对于锁的获取实质上是改变AbstractQueuedSynchronizer中state属性的值。
    改变的过程大致是将state属性在的java内存中的偏移映射为本地内存地址,再通过Atomic::cmpxchg

    改值。这些都是native方法,在java层面上可以不关心。简单点讲,本例中我们只需要记住所谓的加锁就是将state值从0改为1。

    AbstractQueuedSynchronizer是核心!!!!

    到这里,我们需要先看下AbstractQueuedSynchronizer的基本结构。

    
        private transient volatile Node head;
    
        /**
    
        * Tail of the wait queue, lazily initialized.  Modified only via
    
        * method enq to add new wait node.
    
        */
    
        private transient volatile Node tail;
    
        /**
    
        * The synchronization state.
    
        */
    
        private volatile int state;
    
    
    
    

    Node组成双向链表,在源码中的称呼为CLH队列。

    
            +------+  prev +-----+       +-----+
            | head | <---- |     | <---- | tail|  
            |      | ----> |     | ----> |     |
            +------+  next +-----+       +-----+
    
    

    Node的结构

    
    /** Marker to indicate a node is waiting in shared mode */
    
            static final Node SHARED = new Node();
    
            /** Marker to indicate a node is waiting in exclusive mode */
    
            static final Node EXCLUSIVE = null;
    
            /** waitStatus value to indicate thread has cancelled */
    
            static final int CANCELLED =  1;
    
            /** waitStatus value to indicate successor's thread needs unparking */
    
            static final int SIGNAL    = -1;
    
            /** waitStatus value to indicate thread is waiting on condition */
    
            static final int CONDITION = -2;
    
            /**
    
            * waitStatus value to indicate the next acquireShared should
    
            * unconditionally propagate
    
            */
    
            static final int PROPAGATE = -3;
    
            volatile int waitStatus;
    
    
    
            volatile Node prev;
    
            volatile Node next;
    
            volatile Thread thread;
    
    
    
            Node nextWaiter;
    
    

    因为是个双向链表,所以自然有前驱和后继。thread表示尝试加锁的线程。waitStatus的值从-3到1,默认值0(下文用DEFAULT表示)

    在本例中我们聚焦SIGNAL。它表示被标记为SIGNAL的节点的后继结点(的线程)需要被唤起!

    acquire方法的实现

    
    if (!tryAcquire(arg) &&
    
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    
                selfInterrupt();
    
    

    首先看下tryAcquire方法.以FairSync为例,对于NonfairSync,对比代码发现就少了!hasQueuedPredecessors()这个逻辑

    
    final Thread current = Thread.currentThread();
    
                int c = getState();
    
                if (c == 0) {
    
                    if (!hasQueuedPredecessors() &&
    
                        compareAndSetState(0, acquires)) {
    
                        setExclusiveOwnerThread(current);
    
                        return true;
    
                    }
    
                }
    
                else if (current == getExclusiveOwnerThread()) {
    
                    int nextc = c + acquires;
    
                    if (nextc < 0)
    
                        throw new Error("Maximum lock count exceeded");
    
                    setState(nextc);
    
                    return true;
    
                }
    
                return false;
    
    

    先抛开hasQueuedPredecessors(),从代码来看
    如果没获得锁,就做加锁操作,已经获得锁,就对当前state 加acquires(本例中为1,即+1)

    为了略微全面一些,我们以公平锁为例.

    没有人持有锁时,head和tail都为null,所以也不会有Queued Predecessors,tryAcquire结果为true,acquire直接返回,加锁成功结束。

    相同线程执行lock,即锁重入时,对state做递增操作(+1)并state赋值。

    如果是不同线程的话,就需要继续执行acquireQueued

    先看addWaiter方法

    
    private Node addWaiter(Node mode) {
    
            Node node = new Node(Thread.currentThread(), mode);
    
            // Try the fast path of enq; backup to full enq on failure
    
            Node pred = tail;
    
            if (pred != null) {
    
                node.prev = pred;
    
                if (compareAndSetTail(pred, node)) {
    
                    pred.next = node;
    
                    return node;
    
                }
    
            }
    
            enq(node);
    
            return node;
    
        }
    
    
    
    private Node enq(final Node node) {
    
            for (;;) {
    
                Node t = tail;
    
                if (t == null) { // Must initialize
    
                    if (compareAndSetHead(new Node()))
    
                        tail = head;
    
                } else {
    
                    node.prev = t;
    
                    if (compareAndSetTail(t, node)) {
    
                        t.next = node;
    
                        return t;
    
                    }
    
                }
    
            }
    
        }   
    
    

    创建了一个Node,Node是加锁线程的代,我们取名N1。由于head和tail都为null,进入enq方法。首先创建一个Node(取名N0)初始化head和tail,初始化后如下图,

    
            +------+
    
            | head |
    
            | tail |
    
            +------+
    
    

    继续循环,走入else后,N1是tail,N0是head

    
            +------+        +------+
    
            |  N0  | <----  |  N1  |
    
            | head | ---->  | tail |
    
            +------+        +------+
    
    

    加入成功后就会执行

    
    final boolean acquireQueued(final Node node, int arg) {
    
            boolean failed = true;
    
            try {
    
                boolean interrupted = false;
    
                for (;;) {
    
                    final Node p = node.predecessor();
    
                    if (p == head && tryAcquire(arg)) {
    
                        setHead(node);
    
                        p.next = null; // help GC
    
                        failed = false;
    
                        return interrupted;
    
                    }
    
                    if (shouldParkAfterFailedAcquire(p, node) &&
    
                        parkAndCheckInterrupt())
    
                        interrupted = true;
    
                }
    
            } finally {
    
                if (failed)
    
                    cancelAcquire(node);
    
            }
    
        }
    
    

    从上图的结构看,node的前驱就是head,但是tryAcquire是不成功的,因为已经被别的线程加锁了,所以走shouldParkAfterFailedAcquire()

    
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    
            int ws = pred.waitStatus;
    
            if (ws == Node.SIGNAL)
    
                /*
    
                * This node has already set status asking a release
    
                * to signal it, so it can safely park.
    
                */
    
                return true;
    
            if (ws > 0) {
    
                /*
    
                * Predecessor was cancelled. Skip over predecessors and
    
                * indicate retry.
    
                */
    
                do {
    
                    node.prev = pred = pred.prev;
    
                } while (pred.waitStatus > 0);
    
                pred.next = node;
    
            } else {
    
                /*
    
                * waitStatus must be 0 or PROPAGATE.  Indicate that we
    
                * need a signal, but don't park yet.  Caller will need to
    
                * retry to make sure it cannot acquire before parking.
    
                */
    
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    
            }
    
            return false;
    
        }
    
    

    刚开始,我们创建的Node和默认新增的都是没有设置waitStatus的,所以N0(pred)和N1(node)都默认值0。

    经过这段代码,实质上更改waitStatus的值

    
            +------+        +------+
    
            |  N0  | <----  |  N1  |
    
            |SIGNAL| ---->  |DEFAULT|
    
            +------+        +------+
    
    

    此时acquireQueued继续循环,shouldParkAfterFailedAcquire走入第一个if返回true,执行parkAndCheckInterrupt,线程就会park在LockSupport.park(this);

    
        private final boolean parkAndCheckInterrupt() {
    
            LockSupport.park(this);
    
            return Thread.interrupted();
    
        }
    
    

    如果别的线程继续加锁

    
            +------+        +------+      +------+        +------+
    
            |  N0  | <----  |  N1  | <---- | Nn-1 | <----  |  Nn  |
    
            |SIGNAL| ---->  |SIGNAL| ----> |SIGNAL| ---->  |DEFAULT|
    
            +------+        +------+      +------+        +------+
    
    

    锁释放

    
        public final boolean release(int arg) {
    
            if (tryRelease(arg)) {
    
                Node h = head;
    
                if (h != null && h.waitStatus != 0)
    
                    unparkSuccessor(h);
    
                return true;
    
            }
    
            return false;
    
        }
    
    
    
        protected final boolean tryRelease(int releases) {
    
                    int c = getState() - releases;
    
                    if (Thread.currentThread() != getExclusiveOwnerThread())
    
                        throw new IllegalMonitorStateException();
    
                    boolean free = false;
    
                    if (c == 0) {
    
                        free = true;
    
                        setExclusiveOwnerThread(null);
    
                    }
    
                    setState(c);
    
                    return free;
    
        }
    
    

    首先对在tryRelease中清除当前持有锁的线程信息,更改state的值。

    
        private void unparkSuccessor(Node node) {
    
            /*
    
            * If status is negative (i.e., possibly needing signal) try
    
            * to clear in anticipation of signalling.  It is OK if this
    
            * fails or if status is changed by waiting thread.
    
            */
    
            int ws = node.waitStatus;
    
            if (ws < 0)
    
                compareAndSetWaitStatus(node, ws, 0);
    
            /*
    
            * Thread to unpark is held in successor, which is normally
    
            * just the next node.  But if cancelled or apparently null,
    
            * traverse backwards from tail to find the actual
    
            * non-cancelled successor.
    
            */
    
            Node s = node.next;
    
            if (s == null || s.waitStatus > 0) {
    
                s = null;
    
                for (Node t = tail; t != null && t != node; t = t.prev)
    
                    if (t.waitStatus <= 0)
    
                        s = t;
    
            }
    
            if (s != null)
    
                LockSupport.unpark(s.thread);
    
        }
    
    

    unparkSuccessor主要是将当前节点的值归0,然后唤醒下一个节点。

    
            +------+        +------+
    
            |  N0  | <----  |  N1  |
    
            |DEFAULT| ----> |DEFAULT|
    
            +------+        +------+
    
    

    当下一节点的被唤醒时,原来part在LockSupport.park(this)上的线程就开始继续执行acquireQueued中的for循环了。重点看一下

    
    final Node p = node.predecessor();
    
                    if (p == head && tryAcquire(arg)) {
    
                        setHead(node);
    
                        p.next = null; // help GC
    
                        failed = false;
    
                        return interrupted;
    
                    }
    
    

    N1的前驱还是head,并且因为锁以释放,在公平锁的情况下,能尝试加锁成功。那么原本的N0节点就被移除了,N1就变成了HEAD,同时也是tail。Java

    相关文章

      网友评论

        本文标题:ReentranLock源码浅析

        本文链接:https://www.haomeiwen.com/subject/pnjezctx.html