美文网首页
Lock非公平锁源码解读

Lock非公平锁源码解读

作者: 断忆残缘 | 来源:发表于2021-04-15 18:31 被阅读0次

    ReentrantLock锁源码解析

    每当谈起锁,我一定要说一下大神Doug Lea,可以说是java并发编程鼻祖,juc包的作者,把每一行代码的性能做到了极致,曾经吊打jvm的synchronized锁,直接导致了jvm项目组对synchronized锁进行了优化。

    先上调试demo代码,并发编程没有场景,很难理解源码。

        static Lock lock = new ReentrantLock();
        
        public static void main(String[] args) {
            //t1
            new Thread(() -> {
                try {
                    lock.lock();
                                    // 睡眠一天
                    TimeUnit.SECONDS.sleep(60*60*24);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
    
            }, "T1").start();
    
    
            //t2
            new Thread(() -> {
    
                lock.lock();
                log.info("获取到锁了");
                lock.unlock();
            }, "T2").start();
        }
    
    

    我特意画了一张非公平锁等待队列的图,方便大家理解非公平锁等待队列,因为这是难点,其他部分不是很难。

    lock非公平锁的等待队列.png

    推荐学习方法:可以边调试demo,边看我的源码注释,因为Doug Lea循环逻辑很绕,不调试很难看懂。

    NonfairSync非公平锁#lock方法

    /**
     * Performs lock.  Try immediate barge, backing up to normal
     * acquire on failure.
     */
    final void lock() {
            /**
             * state锁状态
             * 0表示没有线程持有的状态
             * 1表示有线程持有
             *
             * cas操作是原子的,所以当多个线程来竞争的时候只有一个线程能获取到锁
             * cas解释,判断当前state状态是否为0,如果是0就改为1,反之失败
             *
             * T1线程获取到锁
             */
        if (compareAndSetState(0, 1))
                /**
                 * 将持有锁的线程,记录到锁中。
                 *
                 * 反思点:为啥要记录持有锁的线程,个人认为是为了锁重入,后面代码会讲到。
                 */
            setExclusiveOwnerThread(Thread.currentThread());
        else
                /**
                 * 核心方法,主要一下几个功能
                 * 1. 尝试获取锁
                 * 2. 创建等待队列
                 * 
                 */
            acquire(1);
    }
    

    从上面代码,我们能看出Doug Lea写的代码真的是很精练,没有很很深的功力,个人觉得写不出这样的代码。

    public final void acquire(int arg) {
        /**
         * 尝试获取锁
         *
         * 反思点:在前面已经尝试获取过一次锁了,为啥这边要第二次尝试获取锁
         * 1.在高性能的cpu时代,每一条命令执行都是很快的,锁的持有和释放都是瞬间的,有可能立马就获取到锁了
         * 2.主要原因还是减少park,减少用户态和内核态的切换,提高性能
         */
        if (!tryAcquire(arg) &&
            /**
             * 创建/加入等待队列
             */
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    
    protected final boolean tryAcquire(int acquires) {
        // 非公平锁尝试获取锁
        return nonfairTryAcquire(acquires);
    }
    
    /**
     * 返回结果true获取到锁,反之就没有获取到锁
     */
    final boolean nonfairTryAcquire(int acquires) {
        // 获取当前线程
        final Thread current = Thread.currentThread();
        // 获取当前锁的状态
        int c = getState();
      
        // 如果为0,表示锁未被任何线程持有
        if (c == 0) {
            // cas操作尝试获取锁
            if (compareAndSetState(0, acquires)) {
                // 获取到锁后,设置线程
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        /**
         *如果持有锁的线程为当前线程,锁重入(如果不记录那个线程持有,是不是没法做锁重入)
         *
         */
        else if (current == getExclusiveOwnerThread()) {
            /**
             * 因为同一个线程,所以以下操作都不需要原子性,因为只有一个线程在操作
             *
             * 锁的重入就是给state+1,表示同一个线程又重新进入一次,当放弃持有会state01
             * 当state=0,表示线程释放锁
             */
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
    
    /**
     * 创建一个新的等待节点
     */
    private Node addWaiter(Node mode) {
        // 创建一个新的节点
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        // 判断等待队列是否有尾节点,如果有把当前节点设置为尾节点
        Node pred = tail;
        // 如果尾节点不为空
        if (pred != null) {
            // 设置当前的节点的pre为前尾节点
            node.prev = pred;
            // cas设置当前节点为尾节点
            if (compareAndSetTail(pred, node)) {
                // 设置前为节点的next为当前节点
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
    
    /**
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node's predecessor
     */
    /**
     * 将节点插入队列,进行必要化的初始化
     *
     */
    private Node enq(final Node node) {
        for (;;) {
            // 获取尾节点
            Node t = tail;
            if (t == null) { // Must initialize
                // 用空节点初始化尾节点
                if (compareAndSetHead(new Node()))
                    // 头节点也指向同一个新节点
                    tail = head;
            } else {
                // 当前线程节点prev指向尾节点
                node.prev = t;
                // cas操作,将当前线程节点设置为尾节点
                if (compareAndSetTail(t, node)) {
                    // 前尾节点的next节点设置为当前线程节点
                    t.next = node;
                    return t;
                }
            }
        }
    }
    
    /**
     * Acquires in exclusive uninterruptible mode for thread already in
     * queue. Used by condition wait methods as well as acquire.
     *
     * @param node the node
     * @param arg the acquire argument
     * @return {@code true} if interrupted while waiting
     */
     
    /**
     * 加入等待队列,并进行park,直到被唤醒,重新获取到锁。
     * 如果不行,继续park
     */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                    // 获取node节点的prev节点
                final Node p = node.predecessor();
                /**
                 * 判断是否为头节点
                 * 如果是,在尝试获取一次锁
                 */
                if (p == head && tryAcquire(arg)) {
                    // 设置头节点
                    setHead(node);
                    // 清空上一个节点对当前节点的引用(help GC)
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    // 进入park,等待被唤醒。
                    // 唤醒之后,此方法返回false,不会进入下面的(interrupted = true)
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
    /**
     * Sets head of queue to be node, thus dequeuing. Called only by
     * acquire methods.  Also nulls out unused fields for sake of GC
     * and to suppress unnecessary signals and traversals.
     *
     * @param node the node
     */
    private void setHead(Node node) {
        /**
         * 将当前节点变为空的头节点
         * 
         * 反思点,为啥要有空头节点
         *  个人观点:防止重复初始化等待队列,带来的性能损耗。
         *  因为让没有等待线程时,队列中依然有一个空节点
         */
        head = node;
        node.thread = null;
        node.prev = null;
    }
    

    抱怨一下:Doug Lea 为啥能把代码写的如此精炼,为啥???

    /**
     * Checks and updates status for a node that failed to acquire.
     * Returns true if thread should block. This is the main signal
     * control in all acquire loops.  Requires that pred == node.prev.
     *
     * @param pred node's predecessor holding status
     * @param node the node
     * @return {@code true} if thread should block
     */
    /**
     * 设置标识,用来当前执行完,是否唤醒下一个节点(难度较高)
     */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // 获取上一个节点状态
        int ws = pred.waitStatus;
        // 如果是SIGNAL状态,直接true
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            /*
             * 如果前一个节点状态是取消节点就跳过,递归找上一个非取消节点
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            // 设置status为SIGNAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    

    释放锁

    /**
     * 释放锁
     */
    public void unlock() {
        // 释放一次线程持有
        sync.release(1);
    }
    
    public final boolean release(int arg) {
        // 尝试释放锁
        if (tryRelease(arg)) {
            // 获取头节点
            Node h = head;
            // 判断头节点状态
            if (h != null && h.waitStatus != 0)
                // 唤醒队列中的第一个等待线程
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    
    /**
     * 尝试释放锁
     *
     * 反思点:尝试释放锁为啥没有任何cas操作
     * 因为只有当前线程持有这把锁,别的线程无法持有,所以所有的操作都是原子性的
     */
    protected final boolean tryRelease(int releases) {
        // 释放一次锁
        int c = getState() - releases;
        // 判断当前线程是否为持有线程
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        // 当前线程是否释放锁标识
        boolean free = false;
        // c==0表示释放锁
        if (c == 0) {
            free = true;
            // 锁中的线程清空
            setExclusiveOwnerThread(null);
        }
        // 设置state标识
        setState(c);
        return free;
    }
    
    /**
     * Wakes up node's successor, if one exists.
     *
     * @param node the node
     */
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        // 查看节点等待状态
        int ws = node.waitStatus;
        // 设置头节点(空节点)状态为0,不需要唤醒别的节点
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
    
        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        // 获取头节点的下一个真实节点
        Node s = node.next;
        // 找到下一个非空或者非取消节点
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        // 唤醒park线程
        if (s != null)
            LockSupport.unpark(s.thread);
    }
    

    到此大家应该已经学习到lock非公平锁的核心内容了吧

    思考问题:

    1. lock非公平锁,在park之前,最多进行几次尝试获取锁?
    2. 通过非公平锁学习,自己学习一下公平锁,回答公平锁与非公平锁那个效率更高,为什么?

    大家可以在评论区讨论答案,也可以检查一下,自己掌握的程度。

    相关文章

      网友评论

          本文标题:Lock非公平锁源码解读

          本文链接:https://www.haomeiwen.com/subject/ulkjlltx.html