美文网首页
关于ReentrantLock的简单分析

关于ReentrantLock的简单分析

作者: 梦想实现家_Z | 来源:发表于2019-03-17 22:41 被阅读0次

1.ReentrantLock的基本使用方式
以下代码是我从ReentrantLock类注释中找到的使用示例:

class X {
        private final ReentrantLock lock = new ReentrantLock();

        public void method() {
            //上锁
            lock.lock();
            try {
                // 执行方法
            } finally {
                //解锁
                lock.unlock();
            }
        }
    }

先了解一下ReentrantLock的结构图:


46F1DD34-6603-43B5-A379-544AB060A4E6.png

先来看一下new ReentrantLock()做了什么操作:

  /**
     * 构造函数,默认是构造非公平锁
     */
    public ReentrantLock() {
        sync = new NonfairSync();
    }

所以根据构造函数的提示,我们来看一下NonfairSync:


D20446F3-577A-402B-83CC-0AA5F97CB066.png
/**
 * 非公平锁
 */
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    /**
     * 上锁
     */
    final void lock() {
        //利用CAS设置state的值,如果当前state是0,并且成功设置为1,说明当前线程成功地拿到了锁
        // (ps:这是公平锁和非公平锁的区别)
        // 非公平锁一开始是直接去抢占锁,抢占失败才会去排队
        if (compareAndSetState(0, 1))
            //成功拿到锁后,将当前锁地某个字段标记为当前线程(方便后面重入)
            setExclusiveOwnerThread(Thread.currentThread());
        else
            //抢占失败,准备去排队
            acquire(1);
    }

    /**
     * 尝试去获取锁.
     * 如果能直接将state从0设置为1,就拿到锁,返回true;
     * 如果是重入锁,就在state上再加acquires,并返回true.
     * 否则返回false
     * (ps:后面会深入代码去验证这个逻辑)
     *
     * @param acquires
     * @return
     */
    protected final boolean tryAcquire(int acquires) {
        //nonfairTryAcquire(acquires)是调用了父类Sync的实现。
        return nonfairTryAcquire(acquires);
    }
}

从非公平锁往上一直延伸到Sync:

/**
 * Sync
 */
abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = -5179523762034025860L;

    /**
     * 因为公平锁和非公平锁的加锁逻辑不同,需要给出一个抽象的lock()方法,让两个子类有不同的实现
     */
    abstract void lock();

    /**
     * 尝试获取锁,默认非公平锁。(对应非公平锁中的tryAcquire()方法)
     */
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        //判断当前state是否是0
        if (c == 0) {
            //尝试获取锁
            if (compareAndSetState(0, acquires)) {
                //成功后设置线程标记,并返回true
                setExclusiveOwnerThread(current);
                return true;
            }
        } else if (current == getExclusiveOwnerThread()) {
            //如果是重入锁,直接在state上加acquires,并设置回state,返回true
            int nextc = c + acquires;
            if (nextc < 0) // 不合法的结果直接抛错
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        //既不能直接抢占锁,也不是重入锁,加锁失败,返回false
        return false;
    }

    /**
     * 尝试释放锁
     *
     * @param releases
     * @return
     */
    protected final boolean tryRelease(int releases) {
        //先尝试state减掉releases,但此时并没有设置回state
        int c = getState() - releases;
        //如果不是已经抢占到锁的线程去释放锁,就会抛异常
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        //如果释放后的state=0,就将锁的线程标记置为null
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        //将释放后的结果设置回state(ps:state有可能是释放一部分,除非释放后state=0,否则此时锁还是被这个线程占有)
        setState(c);
        //返回结果,只有全部释放后才会返回true
        return free;
    }
}

按照lock()方法的调用链寻找,发现acquire(1)是通过父类AbstractQueuedSynchronizer实现的

AQS

public final void acquire(int arg) {
       /**
         * 1.先尝试直接获取锁,拿到锁后返回true
         * 2.在判断是否可以重入,重入后返回true
         * 3.否则返回false
         * tryAcquire(arg)---->NonfairSync.tryAcquire(arg)---->Sync.nonfairTryAcquire(arg)
         *
         *
         * 1.初始化head节点和tail节点
         * 2.将node节点添加在尾部
         * addWaiter(Node.EXCLUSIVE)
         *
         * 1.在获取锁或等待的过程中是否被中断
         * 2.被中断就执行selfInterrupt()
         * acquireQueued(Node,arg)
         */
        if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            //如果没有拿到锁,并且在等待unpark的过程中被打算了,那就直接中断当前线程
            selfInterrupt();
    }

   /**
     * 中断当前线程
     */
    static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }

   /**
     * 向双向链表尾部添加Node,最后返回当前节点
     *
     * @param mode
     * @return
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // 这是一个快速添加node的方式(即假设链表已经被初始化,就可以直接加节点,否则执行enq初始化链表),失败就立马执行enq方法
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            //利用CAS设置tail节点为当前node
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //enq方法的返回值不影响node
        enq(node);
        // 返回刚刚添加在尾部的node节点,但不代表该节点现在是tail节点,
        // 有可能此时另一个线程又在后面加了一个节点
        return node;
    }

   /**
     * 使用自旋的方式向双向链表尾部添加Node,并且返回Node前面的节点
     * 1.如果当前链表中没有任何节点的话,需要做一个初始化,会new Node()初始化head和tail节点
     * 2.初始化后会将需要添加的Node节点加在链表尾部
     * 3.最后返回刚刚添加的Node的前一个节点
     *
     * @param node
     * @return
     */
    private Node enq(final Node node) {
        //自旋,有一点不好的就是,如果一直自旋的话,cpu会飙高,但是概率特别低
        for (; ; ) {
            Node t = tail;
            if (t == null) {
                //CAS初始化head节点
                if (compareAndSetHead(new Node()))
                    //初始化tail节点
                    tail = head;
            } else {
                //将node加到链表尾部,并且返回node的前置节点
                node.prev = t;
                //CAS设置tail节点
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

   /**
     * 在获取锁或等待的过程中是否被打断
     *
     * @param node
     * @param arg
     * @return
     */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            //自旋
            for (; ; ) {
                //获取node节点的前置节点
                final Node p = node.predecessor();
                //如果前置节点是头节点,并且获取了锁
                if (p == head && tryAcquire(arg)) {
                    //将node设置为head节点
                    setHead(node);
                    //此时p已经不是head节点,下面一行代码是为了让p成为一个游离对象,更快被回收
                    p.next = null;
                    //设置failed=false,在finally中不会执行cancelAcquire()方法
                    failed = false;
                    //如果成功拿到锁,返回false
                    return interrupted;
                }
                //
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            //出现异常才会执行cancelAcquire()
            if (failed)
                cancelAcquire(node);
        }
    }

   /**
     * 判断node是否可以准备park
     *
     * @param pred
     * @param node
     * @return
     */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        //获取pred节点的状态
        int ws = pred.waitStatus;

        if (ws == Node.SIGNAL)
            /*
             *  代表后续节点需要唤醒
             *  pred已经准备好unpark它的后继节点了,所以node节点可以准备park了
             */
            return true;
        if (ws > 0) {
            /*
             * 删除连续CANCELLED状态的节点
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * 设置pred的状态为SIGNAL
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

   /**
     * park当前线程,直至被中断或者unpark
     *
     * @return
     */
    private final boolean parkAndCheckInterrupt() {
        //需要通过LockSupport.unpark()解除阻塞
        LockSupport.park(this);
        return Thread.interrupted();
    }

   /**
     * 如果执行这个方法,说明当前线程在等待锁的过程中被中断,这个时候就需要把node节点从链表中删除
     *
     * @param node
     */
    private void cancelAcquire(Node node) {
        // 如果node=null,就忽略
        if (node == null)
            return;
        //该node不再关联任何线程
        node.thread = null;

        // 寻找一个有效的前置节点
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        /**
         * 记录下pred.next,用于后面CAS更新
         * 上面的代码一直在设置node.prev和prev,并没有设置prev.next,所以这个pred.next并不一定是node节点
         */
        Node predNext = pred.next;

        // 将node状态设置为CANCELLED,其他线程也可以主动删除它
        node.waitStatus = Node.CANCELLED;

        // 如果node是tail节点,将prev节点设置为tail节点,并且将prev.next设置为null
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            // 如果node节点既不是head节点,也不是tail节点
            int ws;
            if (pred != head &&
                    ((ws = pred.waitStatus) == Node.SIGNAL ||
                            (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                    pred.thread != null) {
                //获取node节点的下一个节点
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    //将prev的下一个节点设置为next
                    compareAndSetNext(pred, predNext, next);
            } else {
                //剩下的就是如果node是head节点,唤醒node节点的下一个有效节点
                unparkSuccessor(node);
            }
            // help GC
            node.next = node;
        }
    }

   /**
     * 唤醒node节点的下一个节点
     * @param node
     */
    private void unparkSuccessor(Node node) {

        int ws = node.waitStatus;
        if (ws < 0)
            //如果node节点的状态小于0,就将它的状态置为0,表示已经完成了
            compareAndSetWaitStatus(node, ws, 0);

        //获取node节点的下一个节点
        Node s = node.next;
        //判断该节点是否有效
        if (s == null || s.waitStatus > 0) {
            s = null;
            //如果该节点无效,从tail节点向前遍历,直至找到距离node节点最近的有效节点
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    //将s设置为有效节点
                    s = t;
        }
        if (s != null)
            //唤醒该有效节点
            LockSupport.unpark(s.thread);
    }

以上为lock.lock()加锁的整个调用链路

现在分析一下解锁的代码

/**
     * 解锁
     */
    public void unlock() {
        sync.release(1);
    }


    /**
     * Sync抽象类中的解锁
     *
     * @param arg
     * @return
     */
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            //如果解锁成功
            Node h = head;
            if (h != null && h.waitStatus != 0)
                //准备唤醒head的下一个有效节点
                unparkSuccessor(h);
            //返回true
            return true;
        }
        //解锁失败
        return false;
    }

    /**
     * 尝试解锁
     *
     * @param releases
     * @return
     */
    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        //如果解锁的线程不是当初获取锁的线程,就抛异常
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            //如果释放后c=0,才设置锁的线程标记为null,解锁成功
            free = true;
            setExclusiveOwnerThread(null);
        }
        //如果c!=0,说明是一个重入锁,只需要将计算后的值设置回state,但是不能删除锁的线程标记
        setState(c);
        return free;
    }

总结:上面的代码分析只是简单地分析了一下默认的非公平锁的加锁和解锁机制,和公平锁的加锁解锁机制有一些简单的区别,大家可以根据以上分析思路独自分析,谢谢

相关文章

网友评论

      本文标题:关于ReentrantLock的简单分析

      本文链接:https://www.haomeiwen.com/subject/zkoumqtx.html