美文网首页
JUC源码走读

JUC源码走读

作者: 7d972d5e05e8 | 来源:发表于2019-12-03 23:44 被阅读0次

    一、公平锁和非公平锁

    非公平锁的源码:

       /**
         * Sync object for non-fair locks
         */
        static final class NonfairSync extends Sync {
            private static final long serialVersionUID = 7316153563782823691L;
    
            /**
             * Performs lock.  Try immediate barge, backing up to normal
             * acquire on failure.
             */
            final void lock() {
                if (compareAndSetState(0, 1))
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    acquire(1);
            }
    
            protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
        }
    

    可以看到非公平锁的lock,上来就先尝试获取锁。compareAndSetState(0,1)。失败的话,走acquire方法。
    公平锁的源码:

     /**
         * Sync object for fair locks
         */
        static final class FairSync extends Sync {
            private static final long serialVersionUID = -3000897897090466540L;
    
            final void lock() {
                acquire(1);
            }
    
            /**
             * Fair version of tryAcquire.  Don't grant access unless
             * recursive call or no waiters or is first.
             */
            protected final boolean tryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
        }
    

    可以看到,lock方法按部就班的执行acquire方法。我们再看下这个acquire方法的源码:

       public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    

    咱们就先看下tryAcquire方法,该方法在AbstractQueuedSynchronizer类中是一个模板方法,抽象的。需要子类实现。很明显,该方法在非公平和公平锁中取实现。先看下,NonfairSync对tryAcquire的实现:

     protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
    
     /**
             * Performs non-fair tryLock.  tryAcquire is implemented in
             * subclasses, but both need nonfair try for trylock method.
             */
            final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    

    可以看到,其实现中,如果发现c=0没有线程获取锁的情况下,上来就compare下去拿锁,不管先来后到。如果锁已经被占用了,看下是否是自己线程,这个是可重入的实现。否则try拿不到锁,返回false。下面再看下FairSync锁的try实现:

         protected final boolean tryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    

    可以看到和NonfairSync的唯一不同,是c==0的时候,compare前先执行hasQueuedPredecessors()方法。进去该方法看下,发现其实现的目的是看下当前锁CLH队列中,head的下一个node是不是自己。如果是自己,那么采取compare获取锁。如果不是自己,那么就不去compare,把机会留给别的线程。🤣多么的公平公正呀,哈哈哈。源码如下:

      public final boolean hasQueuedPredecessors() {
            // The correctness of this depends on head being initialized
            // before tail and on head.next being accurate if the current
            // thread is first in queue.
            Node t = tail; // Read fields in reverse initialization order
            Node h = head;
            Node s;
            return h != t &&
                ((s = h.next) == null || s.thread != Thread.currentThread());
        }
    

    这里先总结下:对于公平锁和非公平锁,在获取锁的第一个阶段try阶段,非公平锁是上来先尝试拿锁,公平锁是先看看下一个拿锁是不是自己。

    1.1 拿锁的第二个阶段,添加waiter监视器,其实就是向CLH队列添加一个Node,请看源码。

       public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    

    tryAcquire方法已经介绍完了,下面介绍下acquireQueued方法。咱们先看下addWaiter方法干了啥?

     private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            enq(node);
            return node;
        }
    

    这里面实例化了一个Node,把当前线程传进来,表示当前线程需要添加到CLH队列中,等待锁,mode是排他模式。在执行插入方法enq方法前,先执行下compare插入。如果成功了,就返回。失败了,在执行enq方法。这样做的目的是使用cas乐观锁,默认系统锁竞争不大,能提高效率。咱们在看下enq方法,是如果能保证,线程安全且一定能插入到CLH队列中。源码如下:

    private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    

    这里恍然大悟了,还是用了cas乐观锁。但是增加了for无限循环去执行😂。这里对CLH进行了懒加载,如果tail为空的时候,才会cas线程安全进行init CLH队列。如果已经初始化过了,那么执行双链表的插入Node操作。把Node插入到tail队尾。

    这里CLH队列插完后,代表线程已经在等待着锁了。那么lock方法可不是插入完Node就结束了,它要自己阻塞等待锁了。就是阻塞等待自己的Node变成CLH队列的Head节点。

    1.2、下面看下lock的第三个阶段,阻塞等待阶段。

    阻塞等待阶段就是上面看到一半的方法acquireQueued。它里面的参数addWaiter,1.1已经介绍了。就看下acquireQueued内部的源码了:

    final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    看代码可知,其思路就是无限循环自己的Node节点,有没有成为Head的next。如果成了,就去try拿锁。如果还没成Head的next,那么就执行shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt()。从这个方法就可以知道,只有Head节点释放锁之后,其next节点才有机会获取锁。这里先放一个疑问:如果Head拿到锁了,还没释放,那么Next节点线程执行到这里,发现自己成了next,那么会执行tryAcquire,这个时候会拿到锁吗?为什么?我们下一节讲解。

    1.3、在看下shouldParkAfterFailedAcquire和parkAndCheckInterrupt方法。

    其实这两个方法比较简单了,shouldParkAfterFailedAcquire如下:

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus;
            if (ws == Node.SIGNAL)
                /*
                 * This node has already set status asking a release
                 * to signal it, so it can safely park.
                 */
                return true;
            if (ws > 0) {
                /*
                 * Predecessor was cancelled. Skip over predecessors and
                 * indicate retry.
                 */
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                /*
                 * waitStatus must be 0 or PROPAGATE.  Indicate that we
                 * need a signal, but don't park yet.  Caller will need to
                 * retry to make sure it cannot acquire before parking.
                 */
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
    

    它决定当前node能不能park阻塞。决定的依据是是该Node的前驱节点是不是SIGNAL状态。如果是,那么直接阻塞。如果不是,是>0的话,表示该Node的前驱节点已经取消获取锁了,可以安全的删除它了。就有了上面do{}while()方法,一直向前找到状态小于0的Node。如果<0且不是SIGNAL,那么就需要把它变成SIGNAL状态。然后第二次进入该方法的时候,就必然是true了。parkAndCheckInterrupt方法源码如下:

      private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            return Thread.interrupted();
        }
    

    注意线程就是阻塞在park这里了。该线程的字节码会执行在这里,停止住了。让出cpu给其他线程,该线程休眠阻塞,等待其他线程唤醒。其实就是其他线程执行unlock唤醒。后面再说。如果park后被唤醒了,说明肯定有其他线程执行了unlock操作,那么该Node的线程,再次检查自己是不是Head的next节点,在执行一遍for循环里面的操作。

    到此,一次lock操作,到park方法这里算是暂时结束了。这里,如果锁竞争大的话,肯定会被多次唤醒,然后发现不是自己该获取的,然后再次park。就这样经历:park -> unpark -> park.....,一直这样下去。直到自己成为Head的next节点。

    二、这节介绍lock的兄弟unlock方法

    unlock方法的源码很简单,如下:

     public void unlock() {
            sync.release(1);
        }
    

    release方法的源码,如下:

       public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    

    可以看到核心方法在tryRelease上,该方法可以预见,应该是修改CLH队列的status,让status回到它正常的值。其源码如下:

    protected final boolean tryRelease(int releases) {
                int c = getState() - releases;
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                boolean free = false;
                if (c == 0) {
                    free = true;
                    setExclusiveOwnerThread(null);
                }
                setState(c);
                return free;
            }
    

    看到c=status - releases,正是让status回归正常。如果它持有锁,并且会根据重入的次数,一次递减。恰好是最后一个递减(c=0),那么它就释放锁,返回值为true。回到上面的release方法,如果返回值为true,那么就会执行unparkSuccessor方法,用来通知CLH的Head节点后面的线程。源码如下:

    private void unparkSuccessor(Node node) {
            /*
             * If status is negative (i.e., possibly needing signal) try
             * to clear in anticipation of signalling.  It is OK if this
             * fails or if status is changed by waiting thread.
             */
            int ws = node.waitStatus;
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
    
            /*
             * Thread to unpark is held in successor, which is normally
             * just the next node.  But if cancelled or apparently null,
             * traverse backwards from tail to find the actual
             * non-cancelled successor.
             */
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
                LockSupport.unpark(s.thread);
        }
    

    该方法的参数Node其实就是CLH队列的Head节点。其实为啥要先判断ws<0? 先看下面的node.next节点,如果该节点已经取消获取锁的话,那么会从tail往前遍历,找到第一个waitStatus<=0的节点,去唤醒它。
    我理解这么做的目的应该是:
    1、如果遇到Head.next节点,取消等待锁的话,那么很有可能是前面的节点占用锁太长,导致后面的线程节点超时,取消等待了。那么从Head往后遍历,大概率也是超时,可能要遍历很久才能找到一个合适的节点。从后往前遍历,毕竟越后,说明插入时间最近,最有可能是合适的节点,提高了效率。

    1.3、unlock后,会唤醒指定的succssor节点。那么我们继续1.1介绍的,线程Node park住的地方。

    lock的源码如下:

        private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            return Thread.interrupted();
        }
    

    park这里终于可以返回了,然后检查下它在阻塞期间,是否被其他线程中断过,并返回中断的标志。再看下面的源码:

    final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    从parkAndCheckInterrupt返回后,又可以执行到for循环的开始位置了,会重新tryAcquire。这个时候,基本上该节点就是Head的next节点了。去获取锁的时候,CLH队列的status基本上=0,cas就会成功,进而拿到锁。整个过程终于结束,总结来看整个lock方法总结在如下:

      if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
    

    返回值是interrupted,可能为true,也可能false。固其返回值,并不能代表锁的获取状态,这点要注意。

    相关文章

      网友评论

          本文标题:JUC源码走读

          本文链接:https://www.haomeiwen.com/subject/xjohgctx.html