美文网首页
ReentrankLock源码阅读

ReentrankLock源码阅读

作者: kunkkazyc | 来源:发表于2018-06-18 19:45 被阅读0次

ReentrantLock是java.util.concurrent.locks包里的一个lock实现,顾名思义该锁是可重入锁,可重入的最大次数为Integer.Max,超出则抛出Error。

ReentrantLock内聚一个基于AbstractQueuedSynchronizer的同步器,Lock接口里的方法都是基于这个同步器实现的。同步器有两个内部实现,一个是公平同步器,一个是不公平同步器。公平性即等待锁时间越长的线程获取锁的优先级越高,但ReentrantLock默认是采用不公平同步器,我的理解是大多应用场景无需考虑公平性,也就是线程因竞争锁多阻塞一段时间是可接受的。只要对锁的使用得当,每个线程最终都能成功获取锁。公平性通过构造方法也是可以自行指定的。

AbstractQueuedSynchronizer(简称AQS)同样如同其名,是一个抽象的基于CLH队列的同步器,它定义了一套多线程访问共享资源的同步器框架。许多其它的同步类实现也都依赖于它,如常用的Semaphore、CountDownLatch等。同时,它提供两种模式的锁抽象:共享模式和独享模式;共享模式类似读写锁。Semaphore和CountDownLatch是共享模式的实现,而ReentrantLock是独享模式的实现,因此本文只阅读有关独享模式的代码。

ReentrantLock除了实现了Lock接口外,还定义了一系列的public和protected方法,用于检测和监控lock的state,如public getHoldCount()获取重入次数,protected getQueuedThreads()获取因竞争锁而阻塞的线程名称列表。

本文主要分析公平性和Lcok接口的方法

ReentrantLock和AQS在concurrent库中的package路径

为了达到更好的阅读效果,可先思考如下问题,然后带着问题阅读该文和源码~

编号 问题描述
1 ReentrantLock如何实现重入的?如何维护重入的数值?
2 ReentrantLock的公平性和非公平性体现在哪些指标上,如何实现的?
3 多个线程同时调用lock()方法,一个线程会成功,其他线程会阻塞,阻塞是如何实现的?即lock()的工作原理?
4 调用unlock()方法将会唤醒一个因lock()而阻塞的线程,唤醒如何实现的? 即unlock()的工作原理?
5 Lock接口类中lock()、lockInterruptibly()、tryLock()这三个方法的异同?
6 Condition类作用、工作原理以及如何使用?
7 AQS的大致工作原理?CLH队列是如何工作的?

1. ReentrantLock类结构

ReentrantLock类的内部结构相对还是比较简单的:

  1. 两个内部类实现了两个基于AQS的同步器:一个是公平的, 一个是不公平的。
  2. 实现了Lock接口类,如lock、unlock,tryLock、lockInterruptibly等方法。
  3. 提供了获取ReentrantLock运行状态的各种方法,如getHoldCount()、getQueuedThreads()等。

不知道如何调整代码里注释的颜色,默认的颜色看着很费眼,哪位知道?欢迎评论区留言指点~

    /**
    * 内部类
    **/
    class Sync extends AbstractQueuedSynchronizer {...}
    class NonfairSync extends Sync {...}
    class FairSync extends Sync {...}
   /**
    * 成员变量
    **/
    Sync sync;

   /**
    * Lock接口里方法
    **/
    lock() {..}
    lockInterruptibly() {..}
    tryLock() {..}
    tryLock(long timeout, TimeUnit unit) throws InterruptedException {..}
    unlock() {..}
    newCondition() {..}
   /**
    * 其他监控方法,这里只列出其中两个,其它请查阅源码
    **/
    getHoldCount()  {..}
    getQueuedThreads()  {..}

2. 公平性与非公平性

ReentrantLock提供了两个构造方法,默认构造方法使用非公平同步器,因为大部分业务场景发生锁竞争时,未获取锁的线程多阻塞一会儿是可以接受的,既不要求实时也不要求有序,而且非公平同步器性能会好一些;使用者可通过构造方法指定公平性。

    public ReentrantLock() {
        sync = new NonfairSync();
    }

    //通过构造方法可以指定公平性
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

非公平同步器性能为啥好?阅读lock()源码时,会回答这个问题。我们先继续阅读公平同步器和非公平同步器有哪些不同?通过源码对比,在获取锁的执行流程中都会执行tryAcquire,公平性的tryAcquire只比非公平性的tryAcquire多了一个判断条件!hasQueuedPredecessors(),仅此一行的区别。这个区别就是,非公平性的实现在竞争锁时,无需判断阻塞队列里是否已经有其他线程也在竞争锁,直接插队拿锁走人~

     /**
         * 公平版本的tryAcquire.  只有当没有阻塞线程或队列为空时,可以继续申请锁
         */
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

        /**
         * 非公平版本的tryAcquire。
         * 这个实现不是在NonfairSync类里,而是在Sync里
         * 因为无论当前使用哪种同步器,Sync里的tryLock()都是调用的nonfairTryAcquire(1)
         * 因此把nonfairTryAcquire的实现放到基类Sync里
         */
       final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

     public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

hasQueuedPredecessors方法里的tail和head是AQS里的成员变量,下个章节会介绍AQS如何维护tail、head和CLH队列的。下面列出一个关于ReentrantLock公平性的测试程序,通过测试结果可以看出公平锁是按照锁的申请次序依次分配的,而非公平锁则是随时可能被强占的。
非公平性锁的测试结果如下图,当阻塞队列里有线程时,新申请锁的线程会直接插队抢走锁。


非公平性的锁的测试结果

公平性锁的测试结果如下图,当阻塞队列里有线程时,新申请锁的线程会追加到队列的尾部。


公平性锁的测试结果
测试的代码如下
public class FairOrUnfairLockTest {
    private Lock fairLock = new MyReentrantLock(true);
    private Lock unfairLock = new MyReentrantLock(false);

    /**
     * 公平锁测试
     */
    public void testFairLock() {
        for (int i = 0; i < 3; i++) {
            Thread thread = new MyThread(new Job(fairLock), i + "");
            thread.setName("" + i);
            thread.start();
        }
    }

    /**
     * 非公平锁测试
     */
    public void testUnfairLock() {
        for (int i = 0; i < 3; i++) {
            Thread thread = new MyThread(new Job(unfairLock), i + "");
            thread.start();
        }
    }

    public static void main(String[] args) {
        FairOrUnfairLockTest fairOrUnfairLockTest = new FairOrUnfairLockTest();

        fairOrUnfairLockTest.testFairLock();
//        fairTest.testUnfairLock();
    }

    public static class MyReentrantLock extends ReentrantLock {
        public MyReentrantLock(boolean fair) {
            super(fair);
        }

        @Override
        public String toString() {
            List<Thread> waitThreads = Lists.newArrayList(getQueuedThreads());
            Collections.reverse(waitThreads);
            Thread o = this.getOwner();
            return ((o == null) ?
                    "[Unlocked]" :
                    "[Locked by thread " + o.getName() + "]  ") +
                    "[Waited by threads " + waitThreads + "]";
        }
    }

    public static class MyThread extends Thread {

        public MyThread(Runnable target, String name) {
            super(target, name);
        }

        public String toString() {
            return getName();
        }
    }

    private static class Job implements Runnable {
        private Lock lock;

        public Job(Lock lock) {
            this.lock = lock;
        }

        public void run() {
            /**
             * 释放锁后立即再次申请锁
             */
            for (int i = 0; i < 2; i++) {
                lock.lock();
                System.out.println(lock.toString());
                try {
                    long start = System.currentTimeMillis();
                    while (System.currentTimeMillis() - start < 2 * 1000) {
                    }
                } finally {
                    lock.unlock();
                }
            }
        }
    }
}

3. lock执行流程

本文将以非公平锁的lock流程为例来分析lock执行流程。先看ReentrantLock类里的lock方法,只有一行代码,执行NonfairSync里的lock方法;

    public void lock() {
        sync.lock();
    }

接着看NonfairSync里的lock方法,如果锁可用直接拿走锁返回,否则调用AQS里的acquire方法。

    final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
    }

接着看AQS里的acquire方法。有两个判断条件,执行逻辑是:先尝试获取锁tryAcquire,成功则返回;如果失败,从获取插入阻塞队列acquireQueued;最后会调用自我中断;

   AQS :: public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

接着看NonfairSync里的tryAcquire方法,上个章节提到非公平性锁的性能会更好些。这里给出分析,tryAcquire在锁可用时,无需检查阻塞队列是否为空,会插队抢走锁。如果加上这个检查(公平同步器的实现),当阻塞队列不为空时,总会执行AQS里的acquireQueued方法,acquireQueued会创建阻塞节点,然后进入阻塞 > 唤醒流程,而这个流程要从用户态切换到内核态,相对比较耗时。正是因为非公平同步器很多锁申请可以跳过这个流程,因此性能会更好些。

  protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
  }
  final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
  }

上面已经提到acquireQueued会创建阻塞节点,然后进入阻塞 > 唤醒流程,代码如下:

 AQS :: public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
           // 创建阻塞节点并插入到阻塞队列,然后进入阻塞 > 唤醒流程
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
  }
  AQS :: final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            // 进入阻塞 > 唤醒流程
            for (;;) {    
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

执行acquireQueued时会执行addWaiter(Node.EXCLUSIVE), arg),它的作用是把当前线程加入到阻塞队列里;因此在分析acquireQueued方法前,先阅读addWaiter方法和Node数据结构。addWaiter方法相对简单,不做详细解读~

   AQS :: private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

Node数据结构,其中waiterStatus最为关键,阻塞、唤醒、中断、取消时会更改这个值。

   AQS :: static final class Node {
        static final Node SHARED = new Node();
        static final Node EXCLUSIVE = null;
        //取消状态
        static final int CANCELLED =  1;
        //后继节点需要被唤醒 LockSupport.upark()
        static final int SIGNAL    = -1;
        //处在某个condition的wait状态 condition.await()
        static final int CONDITION = -2;
        // 共享模式会用到,这里略
        static final int PROPAGATE = -3;

       // 节点状态,上边已经给出简要的解释,建议阅读一下源码里详细注解;
        volatile int waitStatus;
        // 前驱
        volatile Node prev;
        // 后继
        volatile Node next;
         // 阻塞线程
        volatile Thread thread;
         // Link to next node waiting on condition, or the special value SHARED
        Node nextWaiter;
类CLH队列的AQS阻塞队列
class Parker : public os::PlatformParker {
  private:
  volatile int _counter ;
  Parker * FreeNext ;
  JavaThread * AssociatedWith ; // Current association

  public:
  Parker() : PlatformParker() {
    _counter       = 0 ;
    FreeNext       = NULL ;
    AssociatedWith = NULL ;
  }
}

class PlatformParker : public CHeapObj<mtInternal> {  
  protected:  
    pthread_mutex_t _mutex [1] ;  
    pthread_cond_t  _cond  [1] ;  
    ...  
}

class PlatformParker{
  void Parker::park(bool isAbsolute, jlong time) {
  // Ideally we'd do something useful while spinning, such
  // as calling unpackTime().

  // Optional fast-path check:
  // Return immediately if a permit is available.
  // We depend on Atomic::xchg() having full barrier semantics
  // since we are doing a lock-free update to _counter.
  if (Atomic::xchg(0, &_counter) > 0) return;

  Thread* thread = Thread::current();
  assert(thread->is_Java_thread(), "Must be JavaThread");
  JavaThread *jt = (JavaThread *)thread;

  // Optional optimization -- avoid state transitions if there's an interrupt pending.
  // Check interrupt before trying to wait
  if (Thread::is_interrupted(thread, false)) {
    return;
  }

  // Next, demultiplex/decode time arguments
  timespec absTime;
  if (time < 0 || (isAbsolute && time == 0) ) { // don't wait at all
    return;
  }
  if (time > 0) {
    unpackTime(&absTime, isAbsolute, time);
  }


  // Enter safepoint region
  // Beware of deadlocks such as 6317397.
  // The per-thread Parker:: mutex is a classic leaf-lock.
  // In particular a thread must never block on the Threads_lock while
  // holding the Parker:: mutex.  If safepoints are pending both the
  // the ThreadBlockInVM() CTOR and DTOR may grab Threads_lock.
  ThreadBlockInVM tbivm(jt);

  // Don't wait if cannot get lock since interference arises from
  // unblocking.  Also. check interrupt before trying wait
  if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) {
    return;
  }

  int status ;
  if (_counter > 0)  { // no wait needed
    _counter = 0;
    status = pthread_mutex_unlock(_mutex);
    assert (status == 0, "invariant") ;
    // Paranoia to ensure our locked and lock-free paths interact
    // correctly with each other and Java-level accesses.
    OrderAccess::fence();
    return;
  }
}



lock()的执行时序图如下图。其中HotSpot JDK里的os_linux.hpp文件有个PlatformParker类,PlatformParker类里的的park()和uppark()方法提供操作系统级别的线程阻塞和唤醒;


lock()执行流程
lock执行流程2(转自https://blog.csdn.net/luonanqin/article/details/41871909)

4. tryLock执行流程

    public boolean tryLock() {
        return sync.nonfairTryAcquire(1);
    }
    final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
    }

5. unlock执行流程

   public void unlock() {
        sync.release(1);
   }

   AQS :: public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
           /**
            waitStatus!=0表示处于CANCEL(1) 或SIGNAL(-1) 或CONDITION(-2) 或CONDITION(-3)
           也就是说waitStatus不为零表示它的后继在等待唤醒。
            **/
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
    }

   AQS :: private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }
unlock()执行流程(转自https://blog.csdn.net/luonanqin/article/details/41871909)

6. lockInterruptibly()执行流程

  public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

    /**
     * Acquires in exclusive mode, aborting if interrupted.
     * Implemented by first checking interrupt status, then invoking
     * at least once {@link #tryAcquire}, returning on
     * success.  Otherwise the thread is queued, possibly repeatedly
     * blocking and unblocking, invoking {@link #tryAcquire}
     * until success or the thread is interrupted.  This method can be
     * used to implement method {@link Lock#lockInterruptibly}.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     * @throws InterruptedException if the current thread is interrupted
     */
    public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }

   

7. newCondition()

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    /** Main lock guarding all access */
    final ReentrantLock lock;

    /** Condition for waiting takes */
    private final Condition notEmpty;

    /** Condition for waiting puts */
    private final Condition notFull;

    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
}

8. 总结

之前提到的7个问题是否有了答案?希望读者读了本文后能对ReentrantLock的使用有了更准确的把握~

编号 问题描述
1 ReentrantLock如何实现重入的?如何维护重入的数值?
2 ReentrantLock的公平性和非公平性体现在哪些指标上,如何实现的?
3 多个线程同时调用lock()方法,一个线程会成功,其他线程会阻塞,阻塞是如何实现的?即lock()的工作原理?
4 调用unlock()方法将会唤醒一个因lock()而阻塞的线程,唤醒如何实现的? 即unlock()的工作原理?
5 Lock接口类中lock()、lockInterruptibly()、tryLock()这三个方法的异同?
6 Condition类作用、工作原理以及如何使用?
7 AQS的大致工作原理?CLH队列是如何工作的?

参考:
lock和unlock https://blog.csdn.net/luonanqin/article/details/41871909
LockSupport park和unpark https://blog.csdn.net/hengyunabc/article/details/28126139
查看OpenJdk native方法实现 https://blog.csdn.net/kelindame/article/details/44625255
Unsafe类 https://www.cnblogs.com/pkufork/p/java_unsafe.html

本文为本简作者原创,无版权问题喜者可转~ 功力有限,有任何错误欢迎指正~

相关文章

网友评论

      本文标题:ReentrankLock源码阅读

      本文链接:https://www.haomeiwen.com/subject/siuneftx.html