美文网首页多线程一些收藏
ReentrantLock重入锁和 AQS同步器源码解析

ReentrantLock重入锁和 AQS同步器源码解析

作者: Cloud_Leung | 来源:发表于2020-03-31 15:15 被阅读0次

    ReentrantLock重入锁和 AQS同步器源码解析

    • AQS就是AbstractQueuedSynchronizer,是一个java的同步器,用来管理多线程对共享资源的争抢,以及对线程的排队和唤醒。
    • 同步器支持独占式的获取资源也支持共享式的获取资源
    • 独占式资源需要实现tryAcquire和tryRelease方法
    • 共享式资源需要实现tryAcquireShared和tryReleaseShared
    • 同步器内部维护了一个双向链表,用来保存争抢锁的队列
    • 同步器主要利用cas实现线程安全
    • java api的ReentrantLock(可重入锁)和CountDownLatch都利用了同步器来实现
    • ReentrantLock是利用独占式资源实现
    • CountDownLatch是利用共享式资源实现

    ReentrantLock 可重入锁分析

    • 可重入锁分为公平锁和非公平锁

    • 公平锁是所有争抢锁的线程会形成一个队列,以先来先得顺序获取锁

    • 非公平锁是每个新到的线程都会去尝试获取一次锁,获取不到才会排到队列里

    • 非公平锁NonfairSync分析

      • Sync继承自AQS,NonfairSync 继承自 Sync

      • NonfairSync实现了lock方法以及 tryAcquire方法,同时Sync里有一个nonfairTryAcquire方法用于非公平锁获取锁

      • 接下来开始看非公平锁的源码解析,首先是lock方法

      • final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }
        // lock方法是加锁方法,该方法会首先用cas尝试获得一次锁,如果成功设置获得锁线程会当前线程
        // 否则执行acquire加锁方法,该方法是AQS的方法
        
      • public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
        // 该方法首先执行左边tryAcquire尝试获得锁,该方法由子类NonfairSync实现,
        // 如果获得锁则&&后面不执行,如果不能获得锁,则执行acquireQueued方法将锁加入队列。
        // 而tryAcquire方法在子类实际是调用Sync的nonfairTryAcquire方法
        
      • final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
        // 该方法会获取当前资源状态,state,0代表无人持有锁,大于0则代表由线程持有锁
        // 如果是0,则尝试cas获取,如果当前获取锁的线程是当前线程,则将锁状态加1,代表重入一次
        // 否则获取不到锁返回false
        
      • private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            enq(node);
            return node;
        }
        // 该方法会将当前获取锁的线程封装成Node链入链表尾部,
        // 如果当前链表是空的话,会执行enq初始化一个链表
        
      • private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
        // 这里首先会开启一个循环自旋,然后判断链表是否是空,
        // 如果是空会初始化一个空节点作为链表的头,然后在将当前节点链入节点的尾部
        // 那么为什么要把当前节点链表链入空节点后,而不是head节点呢,
        // 这是因为head节点始终是作为当前持有线程的节点,
        // 但是当前持有线程的节点可能会因为直接争抢到锁而没有进入链表,比如当前这个情况,
        // 该线程没获得锁,但是链表空的,所以要把他自己放在第二个位置
        
      • final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
        // 节点链入链表后,会开启自旋为自己获取锁,
        // 首先会判断当前节点的前置节点是不是head节点,
        // 如果是说明当前节点可以尝试获取一次锁,如果获取到则返回,
        // 获取不到则会shouldParkAfterFailedAcquire,将前置节点设置为Signal,
        // 告诉他释放锁后进行通知,然后会执行parkAndCheckInterrupt,
        // 该方法内部会执行LockSupport.park(this);,该行代码会将当前线程阻塞,直到被unPark唤醒
        // 当然 可能线程可能被interrupted而结束
        
      • 到这里加锁方法就结束了,接下来分析一下锁的释放,释放锁的方法

      • public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
        // 释放锁的方法是由重入锁的unLock方法调用AQS的release方法实现
        // 该方法首先会执行tryRelease 进行锁的释放,该方法是由Sync实现,
        // 释放后会判断链表节点是否是空,并且首节点是否不等于0(Signal状态是-1),
        // 然后执行唤醒下个线程的操作unparkSuccessor
        
      • protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
        // 先看Sync的tryRelease方法,先判断当前线程是否是获取到锁的线程,如果不是则抛出异常
        // 接下来当锁状态减1(参数就是1)后是否等于0,等于0代表锁已经完全释放,可以返回true,否则返回false
        // 只有返回true AQS才会去处理唤等待线程
        
      • private void unparkSuccessor(Node node) {
            int ws = node.waitStatus;
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
                LockSupport.unpark(s.thread);
        }
        // 在这里,如果状态Signal(小于0)代表是需要通知,然后首先将首节点设置为0
        // 然后获取到下一个节点,如果下一个节点是null或者下一个节点的状态是废弃(状态大于0代表是废弃的节点),
        // 则从尾部开始逐渐向前着,直到找到一个可用节点(小于等于0的节点),
        // 这里为什么要从尾部向前找呢?我也不知道。。。。
        // 执行unPark唤醒找到的节点对应的线程
        
      • 释放锁的流程也就这么结束了,这就是非公平锁的流程,那么公平锁的流程是什么样的呢?刚才我们提到了,非公平锁的不公平点在于每个新来争抢的线程都会去尝试获取一次锁,也就是插队加塞一次,获取不到才会去乖乖排队,那么公平锁在获取锁的时候一定就是要判断有没有人在等待,有的话就去排队,没有才去获取锁,接下来看看源码

      • 在FairSync下的tryAcquire方法

      •     protected final boolean tryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
        }
        // 所以我们看到,线程在锁资源没有人持有的情况下,会首先执行hasQueuedPredecessors判断是否有线程在等待。
        // 如果没人等待才会执行cas获取锁。
        // 而如果持有锁的是当前线程会将锁资源加1,这和非公平锁一致。
        //最后如果因为有人等待没获取到锁,AQS的acquire方法会执行acquireQueued方法将节点链入链表
        // 而释放锁对于非公平锁和公平锁来说都是一致的。
        // 这就是非公平锁和公平锁不一致的地方
        

    相关文章

      网友评论

        本文标题:ReentrantLock重入锁和 AQS同步器源码解析

        本文链接:https://www.haomeiwen.com/subject/viiwuhtx.html