美文网首页
reentrantLock锁

reentrantLock锁

作者: justonemoretry | 来源:发表于2022-05-04 21:55 被阅读0次

锁的实现原理

主要依赖于一个volatile的变量state,加锁时通过cas操作将这个变量值由0改为1,这里使用volatile变量,是利用volatile变量写读,带有的锁释放-锁获取语义,保证加锁期间的共享变量的可见性。

lock实现源码

lock锁的实现主要是持有sync类,这个类又继承自AbstractQueuedSynchronizer,这个aqs中完成主要的加解锁操作,子类中实现具体的差异点。
下面贴一下具体代码:
lock持有sync对象

public class ReentrantLock implements Lock, java.io.Serializable {
    private static final long serialVersionUID = 7373984872572414699L;
    /** Synchronizer providing all implementation mechanics */
    private final Sync sync;
   /**
     * Acquires the lock.
     *
     * <p>Acquires the lock if it is not held by another thread and returns
     * immediately, setting the lock hold count to one.
     *
     * <p>If the current thread already holds the lock then the hold
     * count is incremented by one and the method returns immediately.
     *
     * <p>If the lock is held by another thread then the
     * current thread becomes disabled for thread scheduling
     * purposes and lies dormant until the lock has been acquired,
     * at which time the lock hold count is set to one.
     */
    public void lock() {
       // lock及tryAquire等加锁、解锁方法,均通过sync对象实现
       // 这个sync对应可以是公平锁或非公平锁
        sync.lock();
    }

sync类实现

   /**
     * Base of synchronization control for this lock. Subclassed
     * into fair and nonfair versions below. Uses AQS state to
     * represent the number of holds on the lock.
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;

        /**
         * Performs {@link Lock#lock}. The main reason for subclassing
         * is to allow fast path for nonfair version.
         */
        abstract void lock();

        /**
         * Performs non-fair tryLock.  tryAcquire is implemented in
         * subclasses, but both need nonfair try for trylock method.
         */
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                // 持锁情况下,直接赋值即可
                setState(nextc);
                return true;
            }
            return false;
        }

        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

        protected final boolean isHeldExclusively() {
            // While we must in general read state before owner,
            // we don't need to do so to check if current thread is owner
            return getExclusiveOwnerThread() == Thread.currentThread();
        }

        final ConditionObject newCondition() {
            return new ConditionObject();
        }

        // Methods relayed from outer class

        final Thread getOwner() {
            return getState() == 0 ? null : getExclusiveOwnerThread();
        }

        final int getHoldCount() {
            return isHeldExclusively() ? getState() : 0;
        }

        final boolean isLocked() {
            return getState() != 0;
        }

        /**
         * Reconstitutes the instance from a stream (that is, deserializes it).
         */
        private void readObject(java.io.ObjectInputStream s)
            throws java.io.IOException, ClassNotFoundException {
            s.defaultReadObject();
            setState(0); // reset to unlocked state
        }
    }

AQS代码
这里面代码很多,先简单贴下等待队列的head和tail,以及volatile变量state,整体的实现和操作系统互斥锁比较像,试着获取锁,锁已被其它线程获取时,进行一定的自旋,还是拿不到锁,线程挂起放到等待队列中。

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    /**
     * Head of the wait queue, lazily initialized.  Except for
     * initialization, it is modified only via method setHead.  Note:
     * If head exists, its waitStatus is guaranteed not to be
     * CANCELLED.
     */
    private transient volatile Node head;

    /**
     * Tail of the wait queue, lazily initialized.  Modified only via
     * method enq to add new wait node.
     */
    private transient volatile Node tail;

    /**
     * The synchronization state.
     */
    private volatile int state;

公平锁与非公平锁

公平与非公平的主要区别就是,在获取锁时,是否看队列中是否还有其它等待线程,非公平锁直接执行cas操作,试图修改共享变量state,而公平锁则是先判断是否有其它线程等待获取锁的时间比当前线程更长,有的话则将线程放到等待队列中。
公平锁加锁实现

  /**
     * Sync object for fair locks
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
            // 上面sync.lock()直接调到这里,这里面的acquire会先调
            // 用下面的tryAcquire试图获取锁,获取锁失败走后续的自旋
            // 后挂起在等待队列上的操作
            acquire(1);
        }

        /**
         * Fair version of tryAcquire.  Don't grant access unless
         * recursive call or no waiters or is first.
         */
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            // 这里volatile读,添加锁获取的语义
            int c = getState();
            if (c == 0) {
                // 这里是公平锁和非公平锁最大的区别,等待队列中是否有排列好的前任线程
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

hasQueuedPredecessors()关键方法
这段代码不长,但是表达式写得并不好懂,不过注释比较清晰,就是是否有其它线程等待获取锁的时间比当前线程长。

/**
     * Queries whether any threads have been waiting to acquire longer
     * than the current thread.
     *
     * <p>An invocation of this method is equivalent to (but may be
     * more efficient than):
     *  <pre> {@code
     * getFirstQueuedThread() != Thread.currentThread() &&
     * hasQueuedThreads()}</pre>
     *
     * <p>Note that because cancellations due to interrupts and
     * timeouts may occur at any time, a {@code true} return does not
     * guarantee that some other thread will acquire before the current
     * thread.  Likewise, it is possible for another thread to win a
     * race to enqueue after this method has returned {@code false},
     * due to the queue being empty.
     *
     * <p>This method is designed to be used by a fair synchronizer to
     * avoid <a href="AbstractQueuedSynchronizer#barging">barging</a>.
     * Such a synchronizer's {@link #tryAcquire} method should return
     * {@code false}, and its {@link #tryAcquireShared} method should
     * return a negative value, if this method returns {@code true}
     * (unless this is a reentrant acquire).  For example, the {@code
     * tryAcquire} method for a fair, reentrant, exclusive mode
     * synchronizer might look like this:
     *
     *  <pre> {@code
     * protected boolean tryAcquire(int arg) {
     *   if (isHeldExclusively()) {
     *     // A reentrant acquire; increment hold count
     *     return true;
     *   } else if (hasQueuedPredecessors()) {
     *     return false;
     *   } else {
     *     // try to acquire normally
     *   }
     * }}</pre>
     *
     * @return {@code true} if there is a queued thread preceding the
     *         current thread, and {@code false} if the current thread
     *         is at the head of the queue or the queue is empty
     * @since 1.7
     */
    public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
       // 这里的写法不太好理解,上面注释写得有,等同于队列不为空
       // 且当前线程不是head后面的第一个线程,这里面还有个很有意思的判断,
      // s==null也可以代表不是第一个,这里这么写是因为node节点插入双向链表时,并不是原子操作,而是
      // 先cas设置tailf指向新插入节点,然后再设置header.next指向tailf,在后面这一步之前,就会出现s == null
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

非公平锁抢锁操作

/**
     * Sync object for non-fair locks
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            // lock进来就用cas操作试图抢锁
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
           // 这个操作在上面的sync类实现中
            return nonfairTryAcquire(acquires);
        }
    }

非公平锁nonfairTryAcquire实现,在sync类中

// sync类中实现方法,为方便和公平锁对比,放到这里
       /**
         * Performs non-fair tryLock.  tryAcquire is implemented in
         * subclasses, but both need nonfair try for trylock method.
         */
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                // 这里不判断等待队列里是否有等待更久的线程,直接cas操作
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

释放锁过程
释放锁过程公平锁和非公平锁一致,这里面还涉及到一些aqs的知识,下节再说。

/**
     * Attempts to release this lock.
     *
     * <p>If the current thread is the holder of this lock then the hold
     * count is decremented.  If the hold count is now zero then the lock
     * is released.  If the current thread is not the holder of this
     * lock then {@link IllegalMonitorStateException} is thrown.
     *
     * @throws IllegalMonitorStateException if the current thread does not
     *         hold this lock
     */
    public void unlock() {
        // 实现在aqs类中
        sync.release(1);
    }

aqs类中release相关实现

   /**
     * Releases in exclusive mode.  Implemented by unblocking one or
     * more threads if {@link #tryRelease} returns true.
     * This method can be used to implement method {@link Lock#unlock}.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryRelease} but is otherwise uninterpreted and
     *        can represent anything you like.
     * @return the value returned from {@link #tryRelease}
     */
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

sync中tryRelease实现

protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            // 为0时,才进行锁释放,因为这是可重入锁
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

锁的初始化
默认初始化为非公平锁,可以通过构造方法指定。

   /**
     * Creates an instance of {@code ReentrantLock}.
     * This is equivalent to using {@code ReentrantLock(false)}.
     */
    public ReentrantLock() {
        sync = new NonfairSync();
    }

    /**
     * Creates an instance of {@code ReentrantLock} with the
     * given fairness policy.
     *
     * @param fair {@code true} if this lock should use a fair ordering policy
     */
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

两者优劣对比

  • 公平锁的性能开销会更大,因为非公平锁可以直接抢锁,如果这个操作在上一个线程刚释放锁,这种情况下是有可能成功的,那么就会省去一次线程阻塞唤醒的代价,这个抢锁线程直接拿到锁。而公平锁只要等待队列中有其它线程,就会添加到队列里面,就会需要一次线程阻塞唤醒的操作,这也是默认锁构造为非公平锁的原因。
  • 非公平锁可能会导致线程饥饿,即后面请求锁的线程很多,并一直获锁成功,那么等待队列中的线程可能永远拿不到锁。

AQS源码实现

这里把aqs相关实现单开一节,上面的源码分析更多地是reentrantLock的具体实现,aqs作为整个concurrent包实现的基石,通过扩展这个抽象类,能实现很多不同的数据结构,除了这里的reentrantLock,还有不可重入锁,共享锁等结构,这些后面再说。
上面等于先讲了其中一种扩展,这也是跟代码的正常思路,下面就先看下reentrantLock独占锁实现过程中,先跳过的aqs代码。

加锁过程

加锁acquire代码
这个是lock方法直接调用的,具体代码见上方公平锁和非公平锁部分。

   /**
     * Acquires in exclusive mode, ignoring interrupts.  Implemented
     * by invoking at least once {@link #tryAcquire},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquire} until success.  This method can be used
     * to implement method {@link Lock#lock}.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     */
    public final void acquire(int arg) {
        // 调用具体实现类的tryAcquire方法,从下面的node类型,可以看出这是独占锁
        // 获锁失败则把线程加到等待队列中,等待唤醒接着抢锁
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

addWaiter实现
将获锁失败的线程加入到双向链表队列尾部,节点分为独占和共享两种。

   /**
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) {
        // 根据入参构造独占节点或共享节点
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        // 不为空时,直接设置到尾部
        if (pred != null) {
            node.prev = pred;
            // 这里设置失败,说明还有其它线程竞争,走后面的enq
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

  /**
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node's predecessor
     */
    private Node enq(final Node node) {
      // 存在竞争的情况下,里面的cas操作都可能不成功,需要自旋  
      for (;;) {
            Node t = tail;
            // 懒加载,加载完再执行后面
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

acquireQueued实现
注释里说,已经在队列中的线程获取锁在独占并且忽略中断的模式下,condition的wait方法也调用了这个,这个后面再说。

   /**
     * Acquires in exclusive uninterruptible mode for thread already in
     * queue. Used by condition wait methods as well as acquire.
     *
     * @param node the node
     * @param arg the acquire argument
     * @return {@code true} if interrupted while waiting
     */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            // 自旋,要么获锁,要么中断
            for (;;) {
                // 前一个节点是否是head,head是虚节点,是的话试着加锁
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    // head存放的是当前持锁线程
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 获锁失败后是否要阻塞线程,防止无限循环浪费资源
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
image.png

shouldParkAfterFailedAcquire方法实现
这个方法是校验并更新获取锁失败节点的状态,返回true代表线程应该被阻塞

    /**
     * Checks and updates status for a node that failed to acquire.
     * Returns true if thread should block. This is the main signal
     * control in all acquire loops.  Requires that pred == node.prev.
     *
     * @param pred node's predecessor holding status
     * @param node the node
     * @return {@code true} if thread should block
     */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        // 前置节点状态为signal,说明该节点后面的节点能被唤醒
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                // 删除已经取消状态的节点
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

parkAndCheckInterrupt实现

// AbstractQueuedSynchronizer
/**
     * Convenience method to park and then check if interrupted
     *
     * @return {@code true} if interrupted
     */
    private final boolean parkAndCheckInterrupt() {
        // 挂起线程,阻塞调用栈
        LockSupport.park(this);
        return Thread.interrupted();
    }

cancel状态节点的产生

在acquireQueued方法的finally中,如果没有获锁成功,将会cancelAcquire。

// java.util.concurrent.locks.AbstractQueuedSynchronizer

private void cancelAcquire(Node node) {
  // 将无效节点过滤
    if (node == null)
        return;
  // 设置该节点不关联任何线程,也就是虚节点
    node.thread = null;
    Node pred = node.prev;
  // 通过前驱节点,跳过取消状态的node
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;
  // 获取过滤后的前驱节点的后继节点
    Node predNext = pred.next;
  // 把当前node的状态设置为CANCELLED
    node.waitStatus = Node.CANCELLED;
  // 如果当前节点是尾节点,将从后往前的第一个非取消状态的节点设置为尾节点
  // 更新失败的话,则进入else,如果更新成功,将tail的后继节点设置为null
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        int ws;
    // 如果当前节点不是head的后继节点,1:判断当前节点前驱节点的是否为SIGNAL,2:如果不是,则把前驱节点设置为SINGAL看是否成功
    // 如果1和2中有一个为true,再判断当前节点的线程是否为null
    // 如果上述条件都满足,把当前节点的前驱节点的后继指针指向当前节点的后继节点
        if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
      // 如果当前节点是head的后继节点,或者上述条件不满足,那就唤醒当前节点的后继节点
            unparkSuccessor(node);
        }
        node.next = node; // help GC
    }
}

上面这个代码可以看到,在将node节点踢出的时候,只做了pred.next指向node.next的操作,并没有做node.next.pre指向pred的操作,这是因为防止pred是已经不在队列中的节点,操作不安全,所以这里不设置。
设置前驱指针是在shouldParkAfterFailedAcquire中,遇到cancel状态的节点会不断清除,这个过程是在获锁失败的时候来做的,说明共享资源已被占用,此时操作节点不会出现安全问题。

do {
    node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);

解锁过程

框架中释放锁的代码为:

// java.util.concurrent.locks.AbstractQueuedSynchronizer

public final boolean release(int arg) {
    // 具体的同步器实现解锁方法,返回true代表可以释放锁
        if (tryRelease(arg)) {
        Node h = head;
                // 头节点不为空,且头结点不为初始化状态,可以唤醒后面的线程
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

在ReentrantLock里面的公平锁和非公平锁的父类Sync定义了可重入锁的释放锁机制,这个具体的代码见上面的公平锁和非公平锁部分,就是减少state的值,state为0时,认为可以释放锁。

// java.util.concurrent.locks.ReentrantLock.Sync

// 方法返回当前锁是不是没有被线程持有
protected final boolean tryRelease(int releases) {
    // 减少可重入次数
    int c = getState() - releases;
    // 当前线程不是持有锁的线程,抛出异常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    // 如果持有线程全部释放,将当前独占锁所有线程设置为null,并更新state
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

这里的判断条件为什么是h != null && h.waitStatus != 0呢

  • h == null Head还没初始化。初始情况下,head == null,第一个节点入队,Head会被初始化一个虚拟节点。所以说,这里如果还没来得及入队,就会出现head == null 的情况。
  • h != null && waitStatus == 0 表明后继节点对应的线程仍在运行中,不需要唤醒。
  • h != null && waitStatus < 0 表明后继节点可能被阻塞了,需要唤醒。这也是之前shouldParkAfterFailedAcquire中,会先设置前置节点状态为-1,signal状态。
    unparkSuccessor方法实现原理
// java.util.concurrent.locks.AbstractQueuedSynchronizer

private void unparkSuccessor(Node node) {
    // 获取头结点waitStatus
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    // 获取当前节点的下一个节点
    Node s = node.next;
    // 如果下个节点是null或者下个节点被cancelled,就找到队列最开始的非cancelled的节点
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 就从尾部节点开始找,到队首,找到队列第一个waitStatus<0的节点。
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 如果当前节点的下个节点不为空,而且状态<=0,就把当前节点unpark
    if (s != null)
        LockSupport.unpark(s.thread);
}

为什么要从后往前找第一个非Cancelled的节点呢?原因如下。
之前的addWaiter方法:

// java.util.concurrent.locks.AbstractQueuedSynchronizer

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

我们从这里可以看到,节点入队并不是原子操作,也就是说,node.prev = pred; compareAndSetTail(pred, node) 这两个地方可以看作Tail入队的原子操作,但是此时pred.next = node;还没执行,如果这个时候执行了unparkSuccessor方法,就没办法从前往后找了,所以需要从后往前找。还有一点原因,在产生CANCELLED状态节点的时候,先断开的是Next指针,Prev指针并未断开,因此也是必须要从后往前遍历才能够遍历完全部的Node。

中断执行后的恢复流程

唤醒后,会执行return Thread.interrupted();,这个函数返回的是当前执行线程的中断状态,并清除。

// java.util.concurrent.locks.AbstractQueuedSynchronizer

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

再回到acquireQueued代码,当parkAndCheckInterrupt返回True或者False的时候,interrupted的值不同,但都会执行下次循环。如果这个时候获取锁成功,就会把当前interrupted返回。

// java.util.concurrent.locks.AbstractQueuedSynchronizer

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
            }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

如果acquireQueued为True,就会执行selfInterrupt方法。

// java.util.concurrent.locks.AbstractQueuedSynchronizer

static void selfInterrupt() {
    Thread.currentThread().interrupt();
}

该方法其实是为了中断线程。但为什么获取了锁以后还要中断线程呢?这部分属于Java提供的协作式中断知识内容。

  • 当中断线程被唤醒时,并不知道被唤醒的原因,可能是当前线程在等待中被中断,也可能是释放了锁以后被唤醒。因此我们通过Thread.interrupted()方法检查中断标记(该方法返回了当前线程的中断状态,并将当前线程的中断标识设置为False),并记录下来,如果发现该线程被中断过,就再中断一次。
  • 线程在等待资源的过程中被唤醒,唤醒后还是会不断地去尝试获取锁,直到抢到锁为止。也就是说,在整个流程中,并不响应中断,只是记录中断记录。最后抢到锁返回了,那么如果被中断过的话,就需要补充一次中断。

这里的处理方式主要是运用线程池中基本运作单元Worder中的runWorker,通过Thread.interrupted()进行额外的判断处理,感兴趣的同学可以看下ThreadPoolExecutor源码。

小结

Q:某个线程获取锁失败的后续流程是什么呢?

A:存在某种排队等候机制,线程继续等待,仍然保留获取锁的可能,获取锁流程仍在继续。

Q:既然说到了排队等候机制,那么就一定会有某种队列形成,这样的队列是什么数据结构呢?

A:是CLH变体的FIFO双端队列。CLH锁其实就是一种是基于逻辑队列非线程饥饿的一种自旋公平锁,由于是 Craig、Landin 和 Hagersten三位大佬的发明,因此命名为CLH锁。

Q:处于排队等候机制中的线程,什么时候可以有机会获取锁呢?

A:线程被唤醒后,会根据前置节点是否是head,决定是否继续cas操作获锁。

Q:如果处于排队等候机制中的线程一直无法获取锁,需要一直等待么?还是有别的策略来解决这一问题?

A:线程所在节点的状态会变成取消状态,取消状态的节点会从队列中释放。

Q:Lock函数通过Acquire方法进行加锁,但是具体是如何加锁的呢?

A:AQS的Acquire会调用tryAcquire方法,tryAcquire由各个自定义同步器实现,通过tryAcquire完成加锁过程。

AQS应用

ReentrantLock的可重入锁

ReentrantLock的可重入性是AQS很好的应用之一,在了解完上述知识点以后,我们很容易得知ReentrantLock实现可重入的方法。在ReentrantLock里面,不管是公平锁还是非公平锁,都有一段逻辑。
公平锁:

// java.util.concurrent.locks.ReentrantLock.FairSync#tryAcquire

if (c == 0) {
    if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
        setExclusiveOwnerThread(current);
        return true;
    }
} else if (current == getExclusiveOwnerThread()) {
    int nextc = c + acquires;
    if (nextc < 0)
        throw new Error("Maximum lock count exceeded");
    setState(nextc);
    return true;
}

非公平锁:

// java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquire

if (c == 0) {
    if (compareAndSetState(0, acquires)){
        setExclusiveOwnerThread(current);
        return true;
    }
} else if (current == getExclusiveOwnerThread()) {
    int nextc = c + acquires;
    if (nextc < 0) // overflow
        throw new Error("Maximum lock count exceeded");
    setState(nextc);
    return true;
}

从上面这两段都可以看到,有一个同步状态State来控制整体可重入的情况。State是Volatile修饰的,用于保证一定的可见性和有序性。

// java.util.concurrent.locks.AbstractQueuedSynchronizer

private volatile int state;

接下来看State这个字段主要的过程:

  • State初始化的时候为0,表示没有任何线程持有锁。
  • 当有线程持有该锁时,值就会在原来的基础上+1,同一个线程多次获得锁是,就会多次+1,这里就是可重入的概念。
  • 解锁也是对这个字段-1,一直到0,此线程对锁释放。

JUC中的应用场景

除了上边ReentrantLock的可重入性的应用,AQS作为并发编程的框架,为很多其他同步工具提供了良好的解决方案。下面列出了JUC中的几种同步工具,大体介绍一下AQS的应用场景:


image.png

自定义同步器

了解AQS基本原理以后,按照上面所说的AQS知识点,自己实现一个同步工具。

public class LeeLock  {

    private static class Sync extends AbstractQueuedSynchronizer {
        @Override
        protected boolean tryAcquire (int arg) {
            return compareAndSetState(0, 1);
        }

        @Override
        protected boolean tryRelease (int arg) {
            setState(0);
            return true;
        }

        @Override
        protected boolean isHeldExclusively () {
            return getState() == 1;
        }
    }
    
    private Sync sync = new Sync();
    
    public void lock () {
        sync.acquire(1);
    }
    
    public void unlock () {
        sync.release(1);
    }
}

通过我们自己定义的Lock完成一定的同步功能。

public class LeeMain {

    static int count = 0;
    static LeeLock leeLock = new LeeLock();

    public static void main (String[] args) throws InterruptedException {

        Runnable runnable = new Runnable() {
            @Override
            public void run () {
                try {
                    leeLock.lock();
                    for (int i = 0; i < 10000; i++) {
                        count++;
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    leeLock.unlock();
                }

            }
        };
        Thread thread1 = new Thread(runnable);
        Thread thread2 = new Thread(runnable);
        thread1.start();
        thread2.start();
        thread1.join();
        thread2.join();
        System.out.println(count);
    }
}

上述代码每次运行结果都会是20000。通过简单的几行代码就能实现同步功能,这就是AQS的强大之处。

参考链接

(JDK)ReentrantLock手撕AQS
不可不说的Java“锁”事
从ReentrantLock的实现看AQS的原理及应用

相关文章

网友评论

      本文标题:reentrantLock锁

      本文链接:https://www.haomeiwen.com/subject/wkrgyrtx.html