美文网首页
ReentrantLock AQS 锁分析

ReentrantLock AQS 锁分析

作者: 石器时代小古董 | 来源:发表于2020-08-14 14:23 被阅读0次

    锁是什么

    锁是资源

    独占模式

    同一时刻只能有一个线程持有锁,其他没有争抢到锁的线程将被阻塞

    ReentrantLock 锁

    ReentrantLock 内部维护了一个 Sync 对象,由它集成了 AQS 对象,实现了公平锁和非公平锁。它通过 CAS 加自旋的方式去争抢锁资源,争抢的方式就是通过 CAS 的方式去尝试改写变量的值,改写成功了就代表争抢所锁成功。如果争抢失败了,就将当前线程封装成一个 Node 对象进行入队,并且会执行一个自旋的操作继续争抢锁,如果没有争抢到会将线程挂起,等待其他线程释放锁并唤醒当前线程,继续争抢锁。

    公平锁在争抢锁时,如果发现队列中已经有其他线程等待了,会将自己封装成 Node 进行入队,并让队列中的头结点去持有锁。

    当占有线程释放锁时,会唤醒后继节点中能唤醒的最近的一个节点。

    源码实现

    public class ReentrantLock implements Lock, java.io.Serializable {
        private static final long serialVersionUID = 7373984872572414699L;
         
        private final Sync sync;
        
        abstract static class Sync extends AbstractQueuedSynchronizer {
        // 非公平锁
        static final class NonfairSync extends Sync 
        // 公平锁
        static final class FairSync extends Sync 
        // 默认是非公平锁
        public ReentrantLock() {
            sync = new NonfairSync();
        }
    
    

    重要参数

    static final class Node {
            // 共享模式
            static final Node SHARED = new Node();
            // 独占模式
            static final Node EXCLUSIVE = null;
            static final int CANCELLED =  1;
            // 当前节点需要被唤醒
            static final int SIGNAL    = -1;
            // 当前 Node 的状态
            volatile int waitStatus;
            // 父节点
            volatile Node prev;
            // 后继节点
            volatile Node next;
            // 当前的线程
            volatile Thread thread;
    }
            
    
    // 当前占有所得节点
    private transient volatile Node head;
    // 尾结点
    private transient volatile Node tail;
    // 锁资源,所有线程通过 CAS 加自旋的方式争抢这个资源
    private volatile state;// 独占模式 0 表示没有锁 1 有锁
    
    

    公平锁加锁方式

       // 尝试获取锁
       // 获取锁失败通过 addWaiter 将当前线程封装成 Node
       // 将 Node 进行入队 并挂起该线程
       public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    
     protected final boolean tryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                // 获取当前线程锁的状态
                int c = getState();
                // 如果没有锁
                if (c == 0) {
                    // 检查它的前面是否有其他线程等着 hasQueuedPredecessors 返回 false 当前前面没有线程 true 代表有线程
                    // compareAndSetState(0, acquires) 当前线程通过 CAS 方式更新锁是否成功
                    
                    if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                        // 设置当前线程独占锁
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                // 如果当前线程就是占有锁的线程
                // C!= 0 时,表示已经有线程抢到资源了
                else if (current == getExclusiveOwnerThread()) {
                    // 累加锁的个数,属于冲入锁的逻辑
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                // 抢占失败
                return false;
            }
    

    将 Thread 包装成 Node 进行入队

       private Node addWaiter(Node mode) {
            Node node = new Node(mode);
            // 自旋加 CAS 更新节点
            for (;;) {
                // 快速入队 将当前节点链接到当前 tail 的后面
                Node oldTail = tail;
                if (oldTail != null) {
                    U.putObject(node, Node.PREV, oldTail);
                    if (compareAndSetTail(oldTail, node)) {
                        oldTail.next = node;
                        return node;
                    }
                } else {
                    // 完整入队,这是第一次发生争抢的 thread,需要初始化 head 节点,将该任务链接到 head 节点后,并且作为尾节点
                    initializeSyncQueue();
                }
            }
        }
    

    acquireQueue 挂起当前线程

    acquireQueued 会开启一个自旋不断尝试获取锁,直到获取成功。没有获取到时会挂起线程。等到释放锁时被唤醒。

    final boolean acquireQueued(final Node node, int arg) {
            try {
                // 当前线程是否被中断了
                boolean interrupted = false;
                // 自旋不断尝试获取锁,
                // 当没有获取锁时会被挂起,等待被唤醒后继续执行循环
                for (;;) {
                    // 获取当前节点的上一个节点
                    final Node p = node.predecessor();
                    // 如果它的上一个节点是 head 节点,它做为下一个节点可以取抢占资源
                    if (p == head && tryAcquire(arg)) {
                        // 抢占成功了,设置当前线程为 head 节点
                        setHead(node);
                        p.next = null; // help GC
                        // 不中断线程 返回 false
                        return interrupted;
                    }
                    // shouldParkAfterFailedAcquire true 需要挂起 false 不挂起
                    // parkAndCheckInterrupt 挂起当前线程
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } catch (Throwable t) {
                cancelAcquire(node);
                throw t;
            }
        }
    

    判断线程是否需要挂起

     // pred 当前节点的父节点
     // node 当前节点
     private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            // 获取当前节点的状态
            int ws = pred.waitStatus;
            // 表示父节点可以唤醒下一个节点
            if (ws == Node.SIGNAL)
                return true;
            // 前置节点是 cancel 状态
            if (ws > 0) {
                // 找一个内唤醒的前置节点,舍弃 cancel 状态的节点
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
            }
            return false;
        }
    
       private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            return Thread.interrupted();
        }
    

    release 释放锁方法

     
      public final boolean release(int arg) {
            // 释放锁成功
            if (tryRelease(arg)) {
                Node h = head;
                // 唤醒下一个节点
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
       protected final boolean tryRelease(int releases) {
                // 修改资源值
                int c = getState() - releases;
                // 当前不是占用线程
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                boolean free = false;
                // 如果当前锁状态已经是 0 
                if (c == 0) {
                    // 设置释放成功
                    free = true;
                    setExclusiveOwnerThread(null);
                }
                setState(c);
                return free;
            }
        private void unparkSuccessor(Node node) {
            if (ws < 0)
                node.compareAndSetWaitStatus(ws, 0);
            // 获取当前节点的下一个节点
            Node s = node.next;
            // 如果 next 节点是一个 cancel 状态的节点,继续寻找后面可以被唤醒的节点
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node p = tail; p != node && p != null; p = p.prev)
                    if (p.waitStatus <= 0)
                        s = p;
            }
            // 唤醒下一个节点所对应的线程
            if (s != null)
                LockSupport.unpark(s.thread);
        }    
    

    相关文章

      网友评论

          本文标题:ReentrantLock AQS 锁分析

          本文链接:https://www.haomeiwen.com/subject/kxmgdktx.html