美文网首页juc
ReentrantLock 源码解读

ReentrantLock 源码解读

作者: 程序员阿奇 | 来源:发表于2020-07-06 13:46 被阅读0次

ReentrantLock 它是一个支持公平和非公平的可重入的锁。这些特性都是怎么实现的呢?如下代码:

public class ReentrantLockDemo {

   private static ReentrantLock lock = new ReentrantLock();

   private static volatile int num = 0;

   public static void inc() {
       lock.lock();
       try {
           num+=1;
       }catch (Exception e){
           e.printStackTrace();
       }
       finally {
           lock.unlock();
       }
   }

   public static void main(String[] args) throws InterruptedException {

       for (int i = 0;i<1000;i++){
           new Thread(()->
               inc()
           ).start();
       }
       // 不休眠 主线程 执行完 可能 其他线程 还未执行完 会造成结果和预想结果不一致
       Thread.sleep(1000);
       System.out.println(num);
   }
}

以上代码实现了在多线程情况下对一个数的累加。通过以上代码可知当 线程去执行 num+=1;时 需要先获得锁,操作结束后 再释放锁。那获得锁的操作时如何实现的呢?代码如下:

    public void lock() {
        sync.lock();
    }

以上代码可知,最终调用的是 sync.lock();,那 sync 是什么呢?继续查找代码:

  abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;
        abstract void lock();

Sync 是 ReentrantLock 的一个内部类 并且 继承了 AbstractQueuedSynchronizer 重写了 父类的一些方法,但是还是没找到 lock() 的实现,继续在源码中查找,发现有两种实现:

非公平锁的实现:

   // 非公平锁的实现:
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

公平锁的实现:

    static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
            acquire(1);
        }
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

那示例代码调用的是哪一个实现呢?经过代码的分析,发现 原来在构造 锁对象的时候 可以通过 构造参数 来指定锁的公平性,示例代码调用的是 无参构造:代码如下:

    // 无参构造
    public ReentrantLock() {
        sync = new NonfairSync();
    } 
    // 有参构造
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

通过以上代码可知,我所创建的对象是一个非公平锁,也就是说 sync.lock() 调用的是 非公平锁的实现,即:

    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }
    }

通过以上代码分析,可知 线程会先通过 CAS 去执行 compareAndSetState(0,1) 如果结果为 true 则说明当前线程可以 获得锁,并 执行 setExclusiveOwnerThread(Thread.currentThread()); 通过方法名可知:设置一个独占锁 所属线程。也就说 当前线程获得了一个独占锁。代码如下:

  /**
     * The current owner of exclusive mode synchronization.
     */
    private transient Thread exclusiveOwnerThread;

    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }

以上代码 是Sync父类 AbstractOwnableSynchronizer 的一个成员变量 主要用于保存一个获得独占锁的线程。获得锁的线程 就可以继续执行run() 方法了。当线程执行完毕 就 调用 unlock();

    public void unlock() {
        sync.release(1);
    }

    public void unlock() {
        sync.release(1);
    }

    // 父类的默认实现,子类并没有重写
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

通过以上代码发现,调用的是父类的实现。示例代码 是创建了1000个线程,并操作同一个方法。每个线程都要去获得锁,那些没有或的锁的线程怎么处理呢?通过以上代码发现,没有获得锁的线程会执行 acquire(1); 方法,找到对应的实现如下(子类并没有覆写父类方法 所以会调用父类的实现):

  // 父类的实现
   public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

上代码 可知 会先判断 尝试获得锁是否成功,并且添加一个独占的Node 到队列中是否成功,如果入队成功 并且尝试获得锁失败,则会继续执行。尝试获得锁的实现如下(子类重写了父类的实现):

   protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
   }

  /**
    * Performs non-fair tryLock.  tryAcquire is implemented in
     * subclasses, but both need nonfair try for trylock method.
     */
    final boolean nonfairTryAcquire(int acquires) {
        // 获取当线程
        final Thread current = Thread.currentThread();
        // 获取当前状态
        int c = getState();
          // 如果当前状态 等于0,
          f (c == 0) {
            // 并通过 CAS 来判断 能否获得锁
            // 如果 CAS 返回true ,则说明可以获得锁,并保存当前线程,返回 true,
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // 判断当前线程 是否和缓存的线程为同一个线程,如果为同一个线程
        else if (current == getExclusiveOwnerThread()) {
           // 则下一个状态 为 当前 state + 1;
            int nextc = c + acquires;
            //  如果 nextc < 0 抛异常
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            // 否则设置 状态 为  nextc = c + acquires
            setState(nextc);
            return true;
        }
        return false;
    }

通过以上代码可知 acquires 的值 为1,通过分析 首先获取当线程 然后获取 state 的值, 并判断 state 是否等于0,如果state 等于0,则说明当前线程可以尝试获得锁,如果 CAS 成功,则当前线程获得锁,否则 return,如果当前 state 不等于 0 ,则判断当前线程是否和 已经缓存的线程 是否为同一个线程,如果为同一个线程,则判断 当前状态 加 一并且 判断 是否小雨0,如果不小于0,则设置state 为 nextc 并返回true ,否则 返回false. 由 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 可知 只有
!tryAcquire(arg)为 true 时 也是 tryAcquire(arg) 为false 时 才能继续执行,那什么请款下 会为false 呢?
通过以上分析可知:
1. state 等于0 并且 CAS 失败 返回false
2. state 不等于0 并且 当前线程和已经获得锁的线程不是同一个线程 返回false.
acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 什么情况下为true呢?该代码由两个方法组成:
1.Node addWaiter(Node.EXCLUSIVE)
2.boolean acquireQueued(final Node node, int arg)

首先我们先分析 addWaiter,找到对应的实现如下(调用父类的默认实现):

    private Node addWaiter(Node mode) {
        // 创建一个Node 并保存当前线程
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

通过分析以上代码:
1.创建一个mode 为 EXCLUSIVE的节点。
2.获取 tail``,并pred引用指向 该节点。 3.判断该引用指向的节点是否为空? 如果不为空,则将新创建的节点node的前置节点指向pred,并通过CAS 设置 该节点为tail节点. 如果设置成功,则将pred所指向的节点的next节点 指向新创建的节点。并返回创建的节点node. 如果设置失败或者tail为空,则执行 入队操作enq(node),代码如下:

    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

通过分析以上代码,可知通过自旋,首先获取 tail节点,然后判断 该节点是否存在,如果不存在,则通过CAS 创建一个 head节点,并将 tail节点指向 head节点。如果不为空,则直接将新创建的节点的前置节点指向tail节点,并通过CAS 设置 该节点为 tail 节点,如果设置成功,则将 t 所指向的节点的next引用指向创建的 node 节点,并返回 t. 继续分析

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                // 获取该节点的前置节点
                final Node p = node.predecessor();
                // 如果该节点前置节点 p 为  head 节点 并且 state 等于 0.
               // 则 设置该节点前置节点 为 head 节点,并将 next 所指向的节点 设置为null. 则返回 false
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
               // 如果 
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
   // 获取节点的前置节点
    final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }
   // 设置 head节点
    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }

相关文章

网友评论

    本文标题:ReentrantLock 源码解读

    本文链接:https://www.haomeiwen.com/subject/pdteqktx.html