美文网首页
ReentrantLock 源码分析

ReentrantLock 源码分析

作者: 想起个帅气的头像 | 来源:发表于2020-11-22 16:25 被阅读0次

    ReentrantLock 作为常用的多线程下锁的一种实现类,常和synchronized进行比较,本篇主要介绍ReentrantLock常用方法的源码实现。

    架构总览

    先贴一下类的整体结构:


    源码介绍

    里面有几个关键类:
    AbstractQueuedSynchroinzer
    Sync
    FairSync:公平锁实现
    NonfairSync:非公平锁实现

    AQS 做为JUC包内的核心类,定义了一系列的实现乐观锁的规范及实现。如
    tryAcquire:定义尝试获取锁的方法定义,由不同的实现类进行实现。

        protected boolean tryAcquire(int arg) {
            throw new UnsupportedOperationException();
        }
    

    acquire:定义了独占模式下线程如何入队。

    /**
         * Acquires in exclusive mode, ignoring interrupts.  Implemented
         * by invoking at least once {@link #tryAcquire},
         * returning on success.  Otherwise the thread is queued, possibly
         * repeatedly blocking and unblocking, invoking {@link
         * #tryAcquire} until success.  This method can be used
         * to implement method {@link Lock#lock}.
         *
         * @param arg the acquire argument.  This value is conveyed to
         *        {@link #tryAcquire} but is otherwise uninterpreted and
         *        can represent anything you like.
         */
        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    

    ReentrantLock内部通过Sync实现了AQS的方法定义,并提供了常用获取锁的方法,下面主要介绍下常用方法的实现过程。

    tryLock()

    /**
         * Acquires the lock only if it is not held by another thread at the time
         * of invocation.
         *
         * <p>Acquires the lock if it is not held by another thread and
         * returns immediately with the value {@code true}, setting the
         * lock hold count to one. Even when this lock has been set to use a
         * fair ordering policy, a call to {@code tryLock()} <em>will</em>
         * immediately acquire the lock if it is available, whether or not
         * other threads are currently waiting for the lock.
         * This &quot;barging&quot; behavior can be useful in certain
         * circumstances, even though it breaks fairness. If you want to honor
         * the fairness setting for this lock, then use
         * {@link #tryLock(long, TimeUnit) tryLock(0, TimeUnit.SECONDS) }
         * which is almost equivalent (it also detects interruption).
         *
         * <p>If the current thread already holds this lock then the hold
         * count is incremented by one and the method returns {@code true}.
         *
         * <p>If the lock is held by another thread then this method will return
         * immediately with the value {@code false}.
         *
         * @return {@code true} if the lock was free and was acquired by the
         *         current thread, or the lock was already held by the current
         *         thread; and {@code false} otherwise
         */
        public boolean tryLock() {
            return sync.nonfairTryAcquire(1);
        }
    

    里面只有一行代码,如果在new ReentrantLock时没有指定参数,或指定为false,默认使用非公平锁实现。这里的参数1指每lock一次,计数器加1。

        public ReentrantLock() {
            sync = new NonfairSync();
        }
    
        public ReentrantLock(boolean fair) {
            sync = fair ? new FairSync() : new NonfairSync();
        }
    

    先看下非公平锁中的nonfairTryAcquire实现:
    state是ReentrantLock中的volatile全局参数,用于标记当前锁是否已被抢占,默认是0,每抢占一次增加acquires次(一般都是1次)。

    compareAndSetXXX是以乐观锁cas的方式,尝试修改一个变量的值。本质是调用操作系统cmpxchg指令,通过与期望值进行比较,相同则修改,不同则不修改。一般配置while(true)使用。

    /**
             * Performs non-fair tryLock.  tryAcquire is implemented in
             * subclasses, but both need nonfair try for trylock method.
             */
            final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                //state是一个全局的volatile参数
                int c = getState();
                //如果c是0,则表示没有线程占有锁,可以尝试抢占
                if (c == 0) {
                    if (compareAndSetState(0, acquires)) {
                        //如果抢占成功,则设置拥有独占锁的线程是current线程
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                //如果不是0,则代表锁已经被抢占,此时判断抢占锁的线程是不是当前线程本身
                else if (current == getExclusiveOwnerThread()) {
                    //如果是重入,则增加重入的次数
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    //因为已经是当前线程占有锁,不需要再通过cas修改
                    setState(nextc);
                    return true;
                }
                // 如果发现其他线程已经占有锁,则返回false
                return false;
            }
    

    lock()

    /**
             * Performs lock.  Try immediate barge, backing up to normal
             * acquire on failure.
             */
            final void lock() {
                if (compareAndSetState(0, 1))
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    acquire(1);
            }
    

    lock方法里只有一个if判断,首先尝试获取锁,成功则已,不成功就进入acquire。

    acquire()

    /**
         * Acquires in exclusive mode, ignoring interrupts.  Implemented
         * by invoking at least once {@link #tryAcquire},
         * returning on success.  Otherwise the thread is queued, possibly
         * repeatedly blocking and unblocking, invoking {@link
         * #tryAcquire} until success.  This method can be used
         * to implement method {@link Lock#lock}.
         *
         * @param arg the acquire argument.  This value is conveyed to
         *        {@link #tryAcquire} but is otherwise uninterpreted and
         *        can represent anything you like.
         */
        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    

    只有几行的方法,全部都是方法的封装。格式化一下写法

    public final void acquire(int arg) {
        if(!tryAcquire(arg)) {
            Node node = addWaiter(Node.EXCLUSIVE);
            boolean interrupted = acquireQueued(node, arg);
            if(interrupted) {
                selfInterrupt();  
            }
        }
    }
    

    tryAcquire()的非公平锁的实现已经在上面介绍过了,如果是公平锁的实现

    /**
             * Fair version of tryAcquire.  Don't grant access unless
             * recursive call or no waiters or is first.
             */
            protected final boolean tryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (!hasQueuedPredecessors() &&   //就多了这么一个区别
                        compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
        }
    
        public final boolean hasQueuedPredecessors() {
            // The correctness of this depends on head being initialized
            // before tail and on head.next being accurate if the current
            // thread is first in queue.
            Node t = tail; // Read fields in reverse initialization order
            Node h = head;
            Node s;
            return h != t &&
                ((s = h.next) == null || s.thread != Thread.currentThread());
        }
    

    hasQueuedPredecessors() 就多了这么一个判断,也就是如果等待队列中有线程在等待,就不再尝试抢占,直接返回false,后续也加到等待队列中。

    如果没抢到锁,则将当前线程添加到等待队列中,也就是addWaiter方法。

    addWaiter()

    /**
         * Creates and enqueues node for current thread and given mode.
         *
         * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
         * @return the new node
         */
        private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            // 如果队列不为空,先快速尝试一次入队到尾节点,如果没成功就进入完成的enq调用。
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            enq(node);
            return node;
        }
    
        /**
         * Inserts node into queue, initializing if necessary. See picture above.
         * @param node the node to insert
         * @return node's predecessor
         */
        private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    

    addWaiter方法首先把当前线程封装为Node对象,且初始时head和tail节点都为null,如果是第一次调用,会先进入到enq方法中。
    通过for循环始终确保当前node可以入队。
    如果是第一次入队,会new一个空的node。head和tail都指向这个空节点。
    如果是队列中已有,则把node作为添加到队列最后,tail指向node,node和前一个组成双向链表。

    前一个 ---next---> node
    前一个 <--prev--- node
    tail -----> node

    入队后,至此addWaiter方法完成。

    acquireQueued()

     /**
         * Acquires in exclusive uninterruptible mode for thread already in
         * queue. Used by condition wait methods as well as acquire.
         *
         * @param node the node
         * @param arg the acquire argument
         * @return {@code true} if interrupted while waiting
         */
        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        //如果线程被中断,需要返回中断标示,
                        //因为parkAndCheckInterrupt里的Thread.interrupted()会复位标示。所以在这里通过变量返回。
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    acquireQueued 主要用于tryAcquire或者park。
    首先判断当前node的的前一个节点是不是头节点,如果是,且获取锁成功,则把自己设置成head并返回。
    否则进入shouldParkAfterFailedAcquire方法。

    注意:这里的interrupted标志是否被中断过,因为内部的中断已经被Thread.interrupted()复位,通过此变量返回到上层方法,调用selfInterrupt重新设置一次中断。
    目的是如果业务的代码中需要针对是否中断做逻辑处理,则通过selfInterrupt来再次触发中断来通知业务。

    shouldParkAfterFailedAcquire

    /**
         * Checks and updates status for a node that failed to acquire.
         * Returns true if thread should block. This is the main signal
         * control in all acquire loops.  Requires that pred == node.prev.
         *
         * @param pred node's predecessor holding status
         * @param node the node
         * @return {@code true} if thread should block
         */
        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus;
            if (ws == Node.SIGNAL)
                /*
                 * This node has already set status asking a release
                 * to signal it, so it can safely park.
                 */
                return true;
            if (ws > 0) {
                /*
                 * Predecessor was cancelled. Skip over predecessors and
                 * indicate retry.
                 */
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                /*
                 * waitStatus must be 0 or PROPAGATE.  Indicate that we
                 * need a signal, but don't park yet.  Caller will need to
                 * retry to make sure it cannot acquire before parking.
                 */
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
    

    shouldParkAfterFailedAcquire方法主要判断当前node能否被park。
    被park的前提是node的pred节点的waitStatus必须是Signal状态。

    parkAndCheckInterrupt

    /**
         * Convenience method to park and then check if interrupted
         *
         * @return {@code true} if interrupted
         */
        private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            return Thread.interrupted();
        }
    

    如果shouldParkAfterFailedAcquire返回true,则可以park。当被唤醒时,返回线程是否被中断过。

    至此,主要的lock方法已经说明完成。

    unlock()

    /**
         * Attempts to release this lock.
         *
         * <p>If the current thread is the holder of this lock then the hold
         * count is decremented.  If the hold count is now zero then the lock
         * is released.  If the current thread is not the holder of this
         * lock then {@link IllegalMonitorStateException} is thrown.
         *
         * @throws IllegalMonitorStateException if the current thread does not
         *         hold this lock
         */
        public void unlock() {
            sync.release(1);
        }
    
    /**
         * Releases in exclusive mode.  Implemented by unblocking one or
         * more threads if {@link #tryRelease} returns true.
         * This method can be used to implement method {@link Lock#unlock}.
         *
         * @param arg the release argument.  This value is conveyed to
         *        {@link #tryRelease} but is otherwise uninterpreted and
         *        can represent anything you like.
         * @return the value returned from {@link #tryRelease}
         */
        public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    
        protected final boolean tryRelease(int releases) {
                int c = getState() - releases;
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                boolean free = false;
                if (c == 0) {
                    free = true;
                    setExclusiveOwnerThread(null);
                }
                setState(c);
                return free;
            }
    

    unlock 的处理比较简单,通过tryRelease来判断重入锁是否已经全部释放完成。全部释放则开始unpark后继节点。

    unparkSuccessor

    /**
         * Wakes up node's successor, if one exists.
         *
         * @param node the node
         */
        private void unparkSuccessor(Node node) {
            /*
             * If status is negative (i.e., possibly needing signal) try
             * to clear in anticipation of signalling.  It is OK if this
             * fails or if status is changed by waiting thread.
             */
            int ws = node.waitStatus;
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
    
            /*
             * Thread to unpark is held in successor, which is normally
             * just the next node.  But if cancelled or apparently null,
             * traverse backwards from tail to find the actual
             * non-cancelled successor.
             */
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
                LockSupport.unpark(s.thread);
        }
    

    先将ws变量设置成0,防止重复唤醒。再判断next节点是否已经被取消,如果取消了。就从tail开始,倒序遍历,找到waitStatus<=0的node,unpark。

    cancelAcquire()

    /**
         * Cancels an ongoing attempt to acquire.
         *
         * @param node the node
         */
        private void cancelAcquire(Node node) {
            // Ignore if node doesn't exist
            if (node == null)
                return;
    
            node.thread = null;
    
            // Skip cancelled predecessors
            Node pred = node.prev;
            while (pred.waitStatus > 0)
                node.prev = pred = pred.prev;
    
            // predNext is the apparent node to unsplice. CASes below will
            // fail if not, in which case, we lost race vs another cancel
            // or signal, so no further action is necessary.
            Node predNext = pred.next;
    
            // Can use unconditional write instead of CAS here.
            // After this atomic step, other Nodes can skip past us.
            // Before, we are free of interference from other threads.
            node.waitStatus = Node.CANCELLED;
    
            // If we are the tail, remove ourselves.
            if (node == tail && compareAndSetTail(node, pred)) {
                compareAndSetNext(pred, predNext, null);
            } else {
                // If successor needs signal, try to set pred's next-link
                // so it will get one. Otherwise wake it up to propagate.
                int ws;
                if (pred != head &&
                    ((ws = pred.waitStatus) == Node.SIGNAL ||
                     (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                    pred.thread != null) {
                    Node next = node.next;
                    if (next != null && next.waitStatus <= 0)
                        compareAndSetNext(pred, predNext, next);
                } else {
                    unparkSuccessor(node);
                }
    
                node.next = node; // help GC
            }
        }
    

    最后说明下node是如何cancel的。

    1. 先判断pred节点是不是也被cancel了。如果是就一直向前找到没有cancel的节点。
    2. 如果当前节点已经是tail了,则将pred.next = null;
    3. 如果当前节点不是tail,且
      3.1 pred节点不是head, 且
      3.2 pred的ws是signal 或 pred的ws可以修改成signal
      3.3 当前节点的next也没有被取消
      以上条件都满足,就将pred.next = node.next 上。

    否则,唤醒node的next节点,即unparkSuccessor。unparkSuccessor内部有处理,如果next是null,从tail开始倒序依次唤醒。

    相关文章

      网友评论

          本文标题:ReentrantLock 源码分析

          本文链接:https://www.haomeiwen.com/subject/upajiktx.html