美文网首页
ReentrantLock源码解读

ReentrantLock源码解读

作者: marsjhe | 来源:发表于2018-11-24 16:13 被阅读0次

    前言

    写这篇文章之前,还是先安利一本书:《java并发编程的艺术》。这本书对锁的实现的很多细节都解释的还是很清楚的,加上自己配合源码进行理解,读懂ReentrantLock这个类的实现应该不是那么困难。本文只对独占模式进行分析。


    一行行分析ReentrantLock源码

    直接步入正题,先贴一段代码看看如何使用ReentrantLock:

    public class ReentrantLockTest {
    
        public static void main(String[] args) {
            ReentrantLock lock = new ReentrantLock(true);  //1
            lock.lock();  //2
            try {
                //do something
            } finally {
                lock.unlock(); //3
            }
        }
    }
    
    ReentrantLock的构造

    上面代码的步骤1是调用ReentrantLock构造方法进行初始化,这里ReentrantLock给我们提供了两种锁的实现,一个是公平锁,一个是非公平锁。这两种锁顾名思义,一个排队干活,一个抢着干~~

    //默认构造函数,得到的是非公平锁的实现
     public ReentrantLock() {
           sync = new NonfairSync();
     }
    //传入true得到公平锁的实现,传入false则得到公平锁的实现
     public ReentrantLock(boolean fair) {
           sync = fair ? new FairSync() : new NonfairSync();
     }
    
    lock方法的解析

    ReentrantLock锁的使用的入口在lock方法,下面咱们针对公平锁lock方法的实现进行分析一波(能看懂这个相信对非公平锁的lock的实现的理解也就不会有什么难度了)。

    这里我把所有的方法都放在一起,方便大家阅读:

        //这里在并发情况下会有竞争
        final void lock() {
                acquire(1);
        }
    
        //来至于父类AQS中
        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
        //公平锁自身提供的实现方法,来保证锁的获取是按照FIFO原则.也就是队列模型,先入先出。
        protected final boolean tryAcquire(int acquires) {
            //获取当前线程
            final Thread current = Thread.currentThread();
           //拿到锁标记的状态值,为0则代表这把锁没人占用
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                    //将干活的人的身份标记一下
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                //这里是重入锁的关键代码,只要是获取锁的线程再次去拿这把锁,则可以直接获取成功,
                //并将state的值+1后重新设置,供后面释放锁的时候进行多次释放使用。
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                //这里有个优雅的小细节:咱们发现设置状态时并没有使用compareAndSetState这种方法,
                //而是直接设置。那是因为在这种条件下不会有竞争,只可能是获取锁的线程才能去改变这个值。
                setState(nextc);
                return true;
            }
            return false;
        }
    
        //用来判断是否在它之前已经有人排在队列当中了,如果有,则返回true
        public final boolean hasQueuedPredecessors() {
            Node t = tail; 
            Node h = head;
            Node s;
            //这里返回时的判断条件可能有点难理解。假设当前是A线程。
            //1.第一种情况发生在有一个B线程进度比A快,已经准备开始排队了。可以看下面addWaiter方法
            //的调用,在进行compareAndSetTail交换后,有可能还没来得及将pred.next指向这个新节点node,
            //这个时候说明已经有人在A线程前面去排队拿锁了。
           //2.第二种情况简单明了。A线程不是排在队列的第一个的,也证明了有人排在他前面了。
            return h != t &&
                ((s = h.next) == null || s.thread != Thread.currentThread());
        }
    
        //用来添加新的节点到队列的尾部。
        private Node addWaiter(Node mode) {
            //根据传进来的参数mode=Node.EXCLUSIVE,表示将要构造一个独占锁。
            Node node = new Node(Thread.currentThread(), mode);
            Node pred = tail;
            //tail为空的情况下直接调用enq方法去进行head和tail的初始化。
            if (pred != null) {
               //tail不为空的情况下,将新构造节点的前驱设置为原尾部节点。
                node.prev = pred;
               //使用CAS进行交换,如果成功,则将原尾部节点的后继节点设置为新节点,做双向列表关联;
               //(这里要注意一点,交换成功的同时有其他线程读取该列表,有可能读取不到新节点。例如A线程
               //执行完下方步骤1后,还未执行步骤2,遍历的时候将会获取不到新节点,这也是
               //hasQueuedPredecessors方法中的第一种情况)
               //如果不成功,则代表有竞争,有其他线程修改了尾部,则去调用下方enq方法
                if (compareAndSetTail(pred, node)) {   //1
                    pred.next = node;   //2
                    return node;
                }
            }
            enq(node);
            return node;
        }
    
         private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) { // Must initialize
                    //初始化head和tail,初始化完成后,会继续执行外面的死循环,进行compareAndSetTail将
                   //新节点设置到尾部,和上述执行流程一样,这里就不详述了。
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    
        //再进行一次尝试和进入堵塞
        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    //获取当前node的前驱
                    final Node p = node.predecessor();
                    //如果前驱是head的话就再进行一次尝试,这种设计会节约很多的资源。
                    //这里尝试成功后该线程就不会有后续的park和unpark之说了。
                    if (p == head && tryAcquire(arg)) {
                         //如果获取成功就将head设置成当前node,并将存储的thread和prev都清空
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
        //来判断进行尝试获取失败后是否进行park
        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            //node的waitStatus初始化都是0
            int ws = pred.waitStatus;
            if (ws == Node.SIGNAL)
                //第一次进来肯定不是-1状态的,需要compareAndSetWaitStatus方法进行设置后才会是-1
                return true;
            if (ws > 0) {
               //这里的作用是用来剔除被cancel后的节点,只要是cancel后的节点waitStatus 都会被标记成1。
                //用该状态来过滤掉这些节点。
               //由于节点的唤醒是由它的prev节点来进行唤醒的,我们必须要保证它的prev是处于活着的状态
               //所以这里一直遍历往上找,总会找到一个正常的prev来帮助其unpark。
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
               //设置prev为-1状态,(该状态下能够唤醒它的下一个去干活)。
                //这里结束后会跳到acquireQueued的死循环再次循环一次。
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
        
        //要执行这个方法的前提是shouldParkAfterFailedAcquire这个方法必须返回true
        private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            return Thread.interrupted();
        }
        
        //阻塞线程的方法
        public static void park(Object blocker) {
            Thread t = Thread.currentThread();
            // 设置Blocker,设置为当前lock。
            setBlocker(t, blocker);
            // 等待获取许可,这里会进行堵塞,直到有人帮忙调用该线程的unpark方法才会获取到许可,
            //并继续走下面的流程。
            UNSAFE.park(false, 0L);
            // 设置Blocker,将该线程的parkBlocker字段设置为null,这个是在线程被唤醒后执行的。
            setBlocker(t, null);
        }
    
    unlock方法的解析
        //调用该方法进行解锁
        public void unlock() {
            sync.release(1);
        }
        
        //改变state的值并唤醒队列中的下一个线程来干活
        public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                //这里会判断头部是不是null,并看其waitStatus 状态是否有唤醒它的后继节点的资格。
                //这里的头部其实也就是当前线程所代表的节点。
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }  
    
        //尝试着释放锁
        protected final boolean tryRelease(int releases) {
                //将锁标记state的值-1
                int c = getState() - releases;
                //如果干活的人和自己的身份不一致,则抛异常出去
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                boolean free = false;
                //这里判断状态-1后是不是等于0。
                //如果不是,则代表重入了很多次,锁暂时不释放。
                //如果是,则将free置为true,释放锁,将身份标记置为null。
                if (c == 0) {
                    free = true;
                    setExclusiveOwnerThread(null);
                }
                setState(c);
                return free;
        }
    
        //去唤醒后继节点中的thread来干活
        private void unparkSuccessor(Node node) {
        
            int ws = node.waitStatus;
            //如果head中的waitStatus<0,则置为0
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
            //这里会检查head的下一个节点是不是null以及是否是cancel状态
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
                //如果next是cancel状态,则将s置为空,并重队列尾部进行往前遍历,直到找到最后
                //一个waitStatus <=0的node来做为next节点去唤醒
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
                //去唤醒s指向的next节点,调用这里可以让UNSAFE.park(false, 0L);处的线程获取到许可。
                //到这里解锁的功能就执行完毕了~
                LockSupport.unpark(s.thread);
        }
    

    总结

    ReentrantLock.png

    扩展

    扩展个ReentrantReadWriteLock 读锁获取锁的流程图


    ReentrantReadWriteLock.png

    End

    相关文章

      网友评论

          本文标题:ReentrantLock源码解读

          本文链接:https://www.haomeiwen.com/subject/shweqqtx.html