美文网首页
JUC源码走读

JUC源码走读

作者: 7d972d5e05e8 | 来源:发表于2019-12-03 23:44 被阅读0次

一、公平锁和非公平锁

非公平锁的源码:

   /**
     * Sync object for non-fair locks
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

可以看到非公平锁的lock,上来就先尝试获取锁。compareAndSetState(0,1)。失败的话,走acquire方法。
公平锁的源码:

 /**
     * Sync object for fair locks
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
            acquire(1);
        }

        /**
         * Fair version of tryAcquire.  Don't grant access unless
         * recursive call or no waiters or is first.
         */
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

可以看到,lock方法按部就班的执行acquire方法。我们再看下这个acquire方法的源码:

   public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

咱们就先看下tryAcquire方法,该方法在AbstractQueuedSynchronizer类中是一个模板方法,抽象的。需要子类实现。很明显,该方法在非公平和公平锁中取实现。先看下,NonfairSync对tryAcquire的实现:

 protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }

 /**
         * Performs non-fair tryLock.  tryAcquire is implemented in
         * subclasses, but both need nonfair try for trylock method.
         */
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

可以看到,其实现中,如果发现c=0没有线程获取锁的情况下,上来就compare下去拿锁,不管先来后到。如果锁已经被占用了,看下是否是自己线程,这个是可重入的实现。否则try拿不到锁,返回false。下面再看下FairSync锁的try实现:

     protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

可以看到和NonfairSync的唯一不同,是c==0的时候,compare前先执行hasQueuedPredecessors()方法。进去该方法看下,发现其实现的目的是看下当前锁CLH队列中,head的下一个node是不是自己。如果是自己,那么采取compare获取锁。如果不是自己,那么就不去compare,把机会留给别的线程。🤣多么的公平公正呀,哈哈哈。源码如下:

  public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

这里先总结下:对于公平锁和非公平锁,在获取锁的第一个阶段try阶段,非公平锁是上来先尝试拿锁,公平锁是先看看下一个拿锁是不是自己。

1.1 拿锁的第二个阶段,添加waiter监视器,其实就是向CLH队列添加一个Node,请看源码。

   public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

tryAcquire方法已经介绍完了,下面介绍下acquireQueued方法。咱们先看下addWaiter方法干了啥?

 private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

这里面实例化了一个Node,把当前线程传进来,表示当前线程需要添加到CLH队列中,等待锁,mode是排他模式。在执行插入方法enq方法前,先执行下compare插入。如果成功了,就返回。失败了,在执行enq方法。这样做的目的是使用cas乐观锁,默认系统锁竞争不大,能提高效率。咱们在看下enq方法,是如果能保证,线程安全且一定能插入到CLH队列中。源码如下:

private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

这里恍然大悟了,还是用了cas乐观锁。但是增加了for无限循环去执行😂。这里对CLH进行了懒加载,如果tail为空的时候,才会cas线程安全进行init CLH队列。如果已经初始化过了,那么执行双链表的插入Node操作。把Node插入到tail队尾。

这里CLH队列插完后,代表线程已经在等待着锁了。那么lock方法可不是插入完Node就结束了,它要自己阻塞等待锁了。就是阻塞等待自己的Node变成CLH队列的Head节点。

1.2、下面看下lock的第三个阶段,阻塞等待阶段。

阻塞等待阶段就是上面看到一半的方法acquireQueued。它里面的参数addWaiter,1.1已经介绍了。就看下acquireQueued内部的源码了:

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

看代码可知,其思路就是无限循环自己的Node节点,有没有成为Head的next。如果成了,就去try拿锁。如果还没成Head的next,那么就执行shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt()。从这个方法就可以知道,只有Head节点释放锁之后,其next节点才有机会获取锁。这里先放一个疑问:如果Head拿到锁了,还没释放,那么Next节点线程执行到这里,发现自己成了next,那么会执行tryAcquire,这个时候会拿到锁吗?为什么?我们下一节讲解。

1.3、在看下shouldParkAfterFailedAcquire和parkAndCheckInterrupt方法。

其实这两个方法比较简单了,shouldParkAfterFailedAcquire如下:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

它决定当前node能不能park阻塞。决定的依据是是该Node的前驱节点是不是SIGNAL状态。如果是,那么直接阻塞。如果不是,是>0的话,表示该Node的前驱节点已经取消获取锁了,可以安全的删除它了。就有了上面do{}while()方法,一直向前找到状态小于0的Node。如果<0且不是SIGNAL,那么就需要把它变成SIGNAL状态。然后第二次进入该方法的时候,就必然是true了。parkAndCheckInterrupt方法源码如下:

  private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

注意线程就是阻塞在park这里了。该线程的字节码会执行在这里,停止住了。让出cpu给其他线程,该线程休眠阻塞,等待其他线程唤醒。其实就是其他线程执行unlock唤醒。后面再说。如果park后被唤醒了,说明肯定有其他线程执行了unlock操作,那么该Node的线程,再次检查自己是不是Head的next节点,在执行一遍for循环里面的操作。

到此,一次lock操作,到park方法这里算是暂时结束了。这里,如果锁竞争大的话,肯定会被多次唤醒,然后发现不是自己该获取的,然后再次park。就这样经历:park -> unpark -> park.....,一直这样下去。直到自己成为Head的next节点。

二、这节介绍lock的兄弟unlock方法

unlock方法的源码很简单,如下:

 public void unlock() {
        sync.release(1);
    }

release方法的源码,如下:

   public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

可以看到核心方法在tryRelease上,该方法可以预见,应该是修改CLH队列的status,让status回到它正常的值。其源码如下:

protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

看到c=status - releases,正是让status回归正常。如果它持有锁,并且会根据重入的次数,一次递减。恰好是最后一个递减(c=0),那么它就释放锁,返回值为true。回到上面的release方法,如果返回值为true,那么就会执行unparkSuccessor方法,用来通知CLH的Head节点后面的线程。源码如下:

private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

该方法的参数Node其实就是CLH队列的Head节点。其实为啥要先判断ws<0? 先看下面的node.next节点,如果该节点已经取消获取锁的话,那么会从tail往前遍历,找到第一个waitStatus<=0的节点,去唤醒它。
我理解这么做的目的应该是:
1、如果遇到Head.next节点,取消等待锁的话,那么很有可能是前面的节点占用锁太长,导致后面的线程节点超时,取消等待了。那么从Head往后遍历,大概率也是超时,可能要遍历很久才能找到一个合适的节点。从后往前遍历,毕竟越后,说明插入时间最近,最有可能是合适的节点,提高了效率。

1.3、unlock后,会唤醒指定的succssor节点。那么我们继续1.1介绍的,线程Node park住的地方。

lock的源码如下:

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

park这里终于可以返回了,然后检查下它在阻塞期间,是否被其他线程中断过,并返回中断的标志。再看下面的源码:

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

从parkAndCheckInterrupt返回后,又可以执行到for循环的开始位置了,会重新tryAcquire。这个时候,基本上该节点就是Head的next节点了。去获取锁的时候,CLH队列的status基本上=0,cas就会成功,进而拿到锁。整个过程终于结束,总结来看整个lock方法总结在如下:

  if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }

返回值是interrupted,可能为true,也可能false。固其返回值,并不能代表锁的获取状态,这点要注意。

相关文章

网友评论

      本文标题:JUC源码走读

      本文链接:https://www.haomeiwen.com/subject/xjohgctx.html