美文网首页
java并发包底层原理

java并发包底层原理

作者: 霸体 | 来源:发表于2020-09-16 19:25 被阅读0次

前言

JDK并发包是基于两个基础类 UnsafeLockSupport实现的:

  • unsafe 提供基于os系统调用的内存CAS操作
  • LockSupport 提供基于os系统调用的线程阻塞、唤醒

在上面两个核心类的基础上,JDK并发包基于链表实现的队列,实现了核心类 AbstractQueuedSynchronizer, 这个同步器是并发包其他一切工具类的底层实现核心;

AQS 源码分析

由于JDK里AQS的源码风格比较像C++,易读性也比较差,我重新整理了下代码注释和结构,便于分析

package zxl.org.aqs;

/**
 * AQS队列的节点实现
 *
 * @author zhouxiliang
 * @date 2020/9/11 19:34
 */
public class Node {
    static final Node SHARED = new Node();
    static final Node EXCLUSIVE = null;

    static final int CANCELLED = 1;
    static final int SIGNAL = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;

    /**
     * Node 属性
     */
    //最关键属性 每个Node都代表一个线程
    volatile Thread thread;

    //Node可以组成双向链表
    volatile Node prev;
    volatile Node next;

    //下面两个字段只是用来表示Node状态的

    /**
     * 共享的node 为 shared
     * 独享的node 为 null
     */
    Node nextWaiter;
    /**
     * 参见上面的4个状态 cancel 代表取消等待  condition 代表在等待事件发生  signal 代表事件发生了
     * propagate 是传播,应该是共享节点用的
     */
    volatile int waitStatus;

    public Node() {
    }

    public Node(Thread thread, Node mode) {
        this.nextWaiter = mode;
        this.thread = thread;
    }

    public Node(Thread thread, int waitStatus) {
        this.waitStatus = waitStatus;
        this.thread = thread;
    }

    public final boolean isShared() {
        return nextWaiter == SHARED;
    }

    public final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }
}
package zxl.org.aqs;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.LockSupport;

/**
 * AQS 的等待队列(一个竞争队列,可以挂载N个等待队列)
 *
 * @author zhouxiliang
 * @date 2020/9/11 19:43
 */
public class ConditionObject implements Condition, java.io.Serializable {
    private static final long serialVersionUID = 1173984872572414699L;
    private static final long spinForTimeoutThreshold = 1000L;
    private static final int REINTERRUPT = 1;
    private static final int THROW_IE = -1;

    /**
     * condition 的内部属性
     */
    private transient Node firstWaiter;
    private transient Node lastWaiter;
    //每个condition都必须附属于一个 synchronizer
    private AbstractQueuedSynchronizer synchronizer;

    /**
     * 将源码的内部类独立提取出来 方便理解
     *
     * @param synchronizer
     */
    public ConditionObject(AbstractQueuedSynchronizer synchronizer) {
        this.synchronizer = synchronizer;
    }


    /**
     * conditionObject 的内部核心方法
     */


    /**
     * 添加节点逻辑
     * 添加时候会检查最后一个Node是否取消,如果取消过则触发整个链表的清理过程
     *
     * @return
     */
    private Node addConditionWaiter() {
        Node t = lastWaiter;
        if (t != null && t.waitStatus != Node.CONDITION) {
            //检查最后一个Node 的状态,如果被重置为非condition,则清理一下整个链表
            unlinkCancelledWaiters();
            t = lastWaiter;
        }
        //创建一个新的condition状态的节点
        Node node = new Node(Thread.currentThread(), Node.CONDITION);
        if (t == null) {
            //初始化
            lastWaiter = firstWaiter = node;
        } else {
            //移动尾部节点 这里处理方式为单向链表
            t.nextWaiter = node;
            lastWaiter = node;
        }
        //返回新创建的节点
        return node;
    }

    /**
     * 将取消等待的Node从单向链表中删除
     */
    private void unlinkCancelledWaiters() {
        Node t = firstWaiter;
        //循环过程中用来保存前一个condition节点
        Node trail = null;
        //简单遍历,将非condition节点从单向链表里删除
        while (t != null) {
            Node next = t.nextWaiter;
            if (t.waitStatus != Node.CONDITION) {
                t.nextWaiter = null;
                if (trail == null)
                    firstWaiter = next;
                else
                    trail.nextWaiter = next;
                if (next == null)
                    lastWaiter = trail;
            } else
                trail = t;
            t = next;
        }
    }

    /**
     * 简单的从单向等待列表里 删除第一个condition节点
     * 这里的循环是为了保证确定有一个node是在 condition状态, synchronizer会判断删除节点的状态,并加入到竞争队列
     *
     * @param first
     */
    private void doSignal(Node first) {
        do {
            if ((firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            first.nextWaiter = null;
        } while (!synchronizer.transferForSignal(first) &&
                (first = firstWaiter) != null);
    }

    /**
     * 同上
     * 只是循环处理所有等待节点,键入synchronizer的竞争队列里
     *
     * @param first
     */
    private void doSignalAll(Node first) {
        lastWaiter = firstWaiter = null;
        do {
            Node next = first.nextWaiter;
            first.nextWaiter = null;
            synchronizer.transferForSignal(first);
            first = next;
        } while (first != null);
    }


    /**
     * 针对Condition接口的实现
     */

    /**
     * 增加了边界条件判断
     * 其他逻辑参考 doSignal
     */
    public final void signal() {
        if (!synchronizer.isHeldExclusively())
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        if (first != null)
            doSignal(first);
    }

    /**
     * 增加了边界条件判断
     * 其他逻辑参考 doSignalAll
     */
    public final void signalAll() {
        if (!synchronizer.isHeldExclusively())
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        if (first != null)
            doSignalAll(first);
    }

    /**
     * 1. 加入等待队列
     * 2. 通过block方式等待进入竞争队列
     * 3. 通过block方式等待变为竞争队列的第一个
     */
    public final void awaitUninterruptibly() {
        //将当前线程加入到等到队列中
        Node node = addConditionWaiter();
        //根据具体锁的实现来决定是否激活竞争队列中的一个线程
        int savedState = synchronizer.fullyRelease(node);
        boolean interrupted = false;
        //没进入竞争队列则一直等待 忽略interrupted标志
        while (!synchronizer.isOnSyncQueue(node)) {
            LockSupport.park(this);
            if (Thread.interrupted())
                interrupted = true;
        }
        //需要当前线程的node 在竞争队列中排首位才会停止阻塞,acquireQueued 是一个不停检查和通过 LockSupport.park 等待的过程
        if (synchronizer.acquireQueued(node, savedState) || interrupted)
            synchronizer.selfInterrupt();
    }

    /**
     * 基本逻辑同 awaitUninterruptibly
     * <p>
     * 不同之处在于处理 Thread.interupted 状态标识,如果线程被设置了这个状态位,则不再被动等待进入 竞争队列, 直接主动进入竞争队列
     * 另外一个注意的点,不是设置了interupted就会抛异常,还有一种就是signal+interupted是几乎同时触发的情况,不抛异常
     *
     * @throws InterruptedException
     */
    public final void await() throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        Node node = addConditionWaiter();
        int savedState = synchronizer.fullyRelease(node);
        int interruptMode = 0;
        while (!synchronizer.isOnSyncQueue(node)) {
            LockSupport.park(this);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        if (synchronizer.acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }

    private int checkInterruptWhileWaiting(Node node) {
        return Thread.interrupted() ?
                (synchronizer.transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                0;
    }

    private void reportInterruptAfterWait(int interruptMode)
            throws InterruptedException {
        if (interruptMode == THROW_IE)
            throw new InterruptedException();
        else if (interruptMode == REINTERRUPT)
            synchronizer.selfInterrupt();
    }

    /**
     * 基本流程同await
     * 在等待进入竞争队列的过程中,加入了时间判断,如果超过等待时间则直接进入竞争队列
     *
     * @param nanosTimeout
     * @return
     * @throws InterruptedException
     */
    public final long awaitNanos(long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        Node node = addConditionWaiter();
        int savedState = synchronizer.fullyRelease(node);
        final long deadline = System.nanoTime() + nanosTimeout;
        int interruptMode = 0;
        while (!synchronizer.isOnSyncQueue(node)) {
            if (nanosTimeout <= 0L) {
                synchronizer.transferAfterCancelledWait(node);
                break;
            }
            if (nanosTimeout >= spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
            nanosTimeout = deadline - System.nanoTime();
        }
        if (synchronizer.acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null)
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
        return deadline - System.nanoTime();
    }

    /**
     * 逻辑同上
     *
     * @param deadline
     * @return
     * @throws InterruptedException
     */
    public final boolean awaitUntil(Date deadline)
            throws InterruptedException {
        long abstime = deadline.getTime();
        if (Thread.interrupted())
            throw new InterruptedException();
        Node node = addConditionWaiter();
        int savedState = synchronizer.fullyRelease(node);
        boolean timedout = false;
        int interruptMode = 0;
        while (!synchronizer.isOnSyncQueue(node)) {
            if (System.currentTimeMillis() > abstime) {
                timedout = synchronizer.transferAfterCancelledWait(node);
                break;
            }
            LockSupport.parkUntil(this, abstime);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        if (synchronizer.acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null)
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
        return !timedout;
    }

    /**
     * 逻辑同上
     * 这坨坨的冗余代码。。
     *
     * @param time
     * @param unit
     * @return
     * @throws InterruptedException
     */
    public final boolean await(long time, TimeUnit unit)
            throws InterruptedException {
        long nanosTimeout = unit.toNanos(time);
        if (Thread.interrupted())
            throw new InterruptedException();
        Node node = addConditionWaiter();
        int savedState = synchronizer.fullyRelease(node);
        final long deadline = System.nanoTime() + nanosTimeout;
        boolean timedout = false;
        int interruptMode = 0;
        while (!synchronizer.isOnSyncQueue(node)) {
            if (nanosTimeout <= 0L) {
                timedout = synchronizer.transferAfterCancelledWait(node);
                break;
            }
            if (nanosTimeout >= spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
            nanosTimeout = deadline - System.nanoTime();
        }
        if (synchronizer.acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null)
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
        return !timedout;
    }

    /**
     * 工具扩展
     */

    final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {
        return sync == synchronizer;
    }

    /**
     * 遍历整个链表
     * 检查是否有等待状态的node
     *
     * @return
     */
    protected final boolean hasWaiters() {
        if (!synchronizer.isHeldExclusively())
            throw new IllegalMonitorStateException();
        for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
            if (w.waitStatus == Node.CONDITION)
                return true;
        }
        return false;
    }

    /**
     * 遍历整个链表
     * 计算等待状态的node数量
     *
     * @return
     */
    protected final int getWaitQueueLength() {
        if (!synchronizer.isHeldExclusively())
            throw new IllegalMonitorStateException();
        int n = 0;
        for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
            if (w.waitStatus == Node.CONDITION)
                ++n;
        }
        return n;
    }

    /**
     * 遍历整个链表
     * 获取等待状态的node线程集合
     *
     * @return
     */
    protected final Collection<Thread> getWaitingThreads() {
        if (!synchronizer.isHeldExclusively())
            throw new IllegalMonitorStateException();
        ArrayList<Thread> list = new ArrayList<Thread>();
        for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
            if (w.waitStatus == Node.CONDITION) {
                Thread t = w.thread;
                if (t != null)
                    list.add(t);
            }
        }
        return list;
    }
}
package zxl.org.aqs;

import sun.misc.Unsafe;

import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.locks.AbstractOwnableSynchronizer;
import java.util.concurrent.locks.LockSupport;

/**
 * AQS 的核心实现
 * 代码只有几百行
 * 但是能从几百行代码领悟多少精髓 全看悟性了
 * 理解代码的时候,注意里面并不是顺序的逻辑,会在多线程之间跳转
 */
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
    private static final long serialVersionUID = 7373984972572414691L;
    //用来判断使用自旋或者 os mutex 单位为ns  1ms=1000us=1000,000 ns cpu指令一般在ns级别
    private static final long spinForTimeoutThreshold = 1000L;
    //sun 操作内存的内部工具类
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    /**
     * 下面几个字段用来获取 字段在对象内存中的偏移量
     */
    private static final long stateOffset;
    private static final long headOffset;
    private static final long tailOffset;
    private static final long waitStatusOffset;
    private static final long nextOffset;

    static {
        try {
            stateOffset = unsafe.objectFieldOffset
                    (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
            headOffset = unsafe.objectFieldOffset
                    (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
            tailOffset = unsafe.objectFieldOffset
                    (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
            waitStatusOffset = unsafe.objectFieldOffset
                    (Node.class.getDeclaredField("waitStatus"));
            nextOffset = unsafe.objectFieldOffset
                    (Node.class.getDeclaredField("next"));

        } catch (Exception ex) {
            throw new Error(ex);
        }
    }

    private static final boolean compareAndSetWaitStatus(Node node, int expect, int update) {
        return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update);
    }

    private static final boolean compareAndSetNext(Node node, Node expect, Node update) {
        return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
    }

    /**
     * 当 node 竞争锁失败后(此时一般node处于链表的中间),是否进入 系统调用 block线程
     *
     * @param pred
     * @param node
     * @return
     */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            //前一个都在等呢 直接返回true 阻塞当前节点的线程
            return true;
        if (ws > 0) {
            //前一个节点取消了,则清理一下链表
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            //前一个节点是 propagate 则设置为 signal
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        //清理或改完状态后 返回外层调用后不要阻塞当前线程
        return false;
    }

    static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }

    /**
     * AQS 核心的三个内部成员
     * 维护了一个双向链表的 head、tail
     * 维护了一个 int型的状态标识 32位Bit
     */
    private transient volatile Node head;
    private transient volatile Node tail;
    private volatile int state;

    protected AbstractQueuedSynchronizer() {
    }

    /**
     * 利用操作系统底层的 CAS 指令完成
     * 是一种乐观锁的实现方式
     *
     * @param update
     * @return
     */
    private final boolean compareAndSetHead(Node update) {
        return unsafe.compareAndSwapObject(this, headOffset, null, update);
    }

    /**
     * 同上
     *
     * @param expect
     * @param update
     * @return
     */
    private final boolean compareAndSetTail(Node expect, Node update) {
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }

    protected final int getState() {
        return state;
    }

    protected final void setState(int newState) {
        state = newState;
    }

    /**
     * 同上
     *
     * @param expect
     * @param update
     * @return
     */
    protected final boolean compareAndSetState(int expect, int update) {
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

    /**
     * 在竞争队列里加一个 node
     *
     * @param node
     * @return
     */
    private Node enq(final Node node) {
        for (; ; ) {
            Node t = tail;
            if (t == null) {
                //初始化过程包含两步 设置head、设置tail (需要对其他线程有顺序性、可见性、原子性)
                //其他线程通过判断最后赋值的 volatile 变量 tail , 来保证初始操作的原子性
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                //这里挂载不是包含三个步骤,并不是原子性的
                //挂载成功的标志是设置tail成功(但双向链表可能还不满足 prev.next == tail)
                //所以其他线程遍历的时候,有时需要从tail 反向遍历
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

    /**
     * 跟enq 入队类似
     * 添加一个 共享或独享 节点 node
     *
     * @param mode
     * @return
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }

    /**
     * 激活双向链表中,node后面的某一个非取消节点线程
     * 如果激活的节点恰好获取到了锁,则会改变 竞争队列的head
     *
     * @param node
     */
    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0) {
            //激活后续节点的时候 当前节点的状态被吃掉了 signal、propagate -> 0
            compareAndSetWaitStatus(node, ws, 0);
        }
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            //这段代码是为了兼容其他线程正在并发修改tail的情况
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null) {
            LockSupport.unpark(s.thread);
        }
    }

    /**
     * 如果head节点是 signal 状态,则激活竞争队列里的一个非取消节点线程
     * 如果head节点是 复位 状态, 则设置为 propagate 状态后返回
     */
    private void doReleaseShared() {
        for (; ; ) {
            Node h = head;
            //链表非空
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                //右边相邻node在park的时候 会设置head的状态为 signal
                if (ws == Node.SIGNAL) {
                    //这里必须吃掉signal 才能激活后续节点线程
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    //激活右邻的node线程
                    unparkSuccessor(h);
                } else if (ws == 0 &&
                        !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) {
                    //代码执行到这里 一般代表没有右邻节点在等待
                    continue;
                }
            }
            if (h == head) {
                // loop if head changed
                // 如果head节点被 激活线程给吃掉了 则继续尝试激活等待线程
                break;
            }
        }
    }

    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
                (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                //共享锁的传播
                doReleaseShared();
        }
    }

    /**
     * 取消node的时候 需要清理链表
     * 后续节点要么挂到新的 pred节点,要么全部唤醒
     *
     * @param node
     */
    private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
        if (node == null) {
            return;
        }
        node.thread = null;
        // Skip cancelled predecessors
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;
        // predNext is the apparent node to unsplice. CASes below will
        // fail if not, in which case, we lost race vs another cancel
        // or signal, so no further action is necessary.
        Node predNext = pred.next;
        // Can use unconditional write instead of CAS here.
        // After this atomic step, other Nodes can skip past us.
        // Before, we are free of interference from other threads.
        node.waitStatus = Node.CANCELLED;
        // If we are the tail, remove ourselves.
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
            return;
        }
        // If successor needs signal, try to set pred's next-link
        // so it will get one. Otherwise wake it up to propagate.
        int ws;
        if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                        (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0) {
                compareAndSetNext(pred, predNext, next);
            }
        } else {
            unparkSuccessor(node);
        }
        node.next = node; // help GC

    }


    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }


    /**
     * node 竞争锁(竞争队列的第一位)
     * 竞争成功后 会改变head
     *
     * @param node
     * @param arg
     * @return
     */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (; ; ) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    //等待被激活
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    /**
     * 当前线程 竞争独占锁
     * 1. 先将当前线程创建一个 独占node 加入到竞争队列
     * 2. 竞争队列头部
     *
     * @param arg
     * @throws InterruptedException
     */
    private void doAcquireInterruptibly(int arg)
            throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (; ; ) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    /**
     * 基本逻辑同上
     * <p>
     * 当前线程竞争独占锁
     * 加入了超时时间,超时则直接返回失败
     *
     * @param arg
     * @param nanosTimeout
     * @return
     * @throws InterruptedException
     */
    private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (; ; ) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                nanosTimeout = deadline - System.nanoTime();
                if (nanosTimeout <= 0L)
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                        nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    /**
     * 基本逻辑同 doAcquire
     * 不同之处在于 获取到 共享锁之后,调用了 setHeadAndPropagate
     * setHeadAndPropagate 内部会继续releaseShared
     *
     * @param arg
     */
    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (; ; ) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        //获取共享锁 这行代码是关键  假设当前线程为A,则A会激活 线程B(node紧邻的后续node线程B)的执行点
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    //线程B的执行点
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    /**
     * 同上
     *
     * @param arg
     * @throws InterruptedException
     */
    private void doAcquireSharedInterruptibly(int arg)
            throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (; ; ) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (; ; ) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return true;
                    }
                }
                nanosTimeout = deadline - System.nanoTime();
                if (nanosTimeout <= 0L)
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                        nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }


    /**
     * 下面4个方法为主要的扩展点 用内部的 int类型 保存状态
     */

    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }

    protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }

    /**
     * 当前线程是否获取了锁
     *
     * @return
     */
    protected boolean isHeldExclusively() {
        throw new UnsupportedOperationException();
    }

    /**
     * 内部会快速尝试 tryAcquire 如果 成功直接返回
     * 否则 走正常的 竞争队列排队逻辑
     *
     * @param arg
     */
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

    /**
     * 同上
     * 特殊处理interupted 标识,如果中断标识则抛异常
     *
     * @param arg
     * @throws InterruptedException
     */
    public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }

    public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquire(arg) ||
                doAcquireNanos(arg, nanosTimeout);
    }

    /**
     * 如果 tryRelease成功了(子类扩展点),则激活竞争链表中的某一个节点线程
     *
     * @param arg
     * @return
     */
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            //注意判断waitStatus为非零状态
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

    public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquireShared(arg) >= 0 ||
                doAcquireSharedNanos(arg, nanosTimeout);
    }

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

    // Queue inspection methods

    public final boolean hasQueuedThreads() {
        return head != tail;
    }

    public final boolean hasContended() {
        return head != null;
    }

    public final Thread getFirstQueuedThread() {
        // handle only fast path, else relay
        return (head == tail) ? null : fullGetFirstQueuedThread();
    }

    private Thread fullGetFirstQueuedThread() {
        Node h, s;
        Thread st;
        if (((h = head) != null && (s = h.next) != null &&
                s.prev == head && (st = s.thread) != null) ||
                ((h = head) != null && (s = h.next) != null &&
                        s.prev == head && (st = s.thread) != null))
            return st;


        Node t = tail;
        Thread firstThread = null;
        while (t != null && t != head) {
            Thread tt = t.thread;
            if (tt != null)
                firstThread = tt;
            t = t.prev;
        }
        return firstThread;
    }

    public final boolean isQueued(Thread thread) {
        if (thread == null)
            throw new NullPointerException();
        for (Node p = tail; p != null; p = p.prev)
            if (p.thread == thread)
                return true;
        return false;
    }

    final boolean apparentlyFirstQueuedIsExclusive() {
        Node h, s;
        return (h = head) != null &&
                (s = h.next) != null &&
                !s.isShared() &&
                s.thread != null;
    }

    public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
                ((s = h.next) == null || s.thread != Thread.currentThread());
    }


    // Instrumentation and monitoring methods

    public final int getQueueLength() {
        int n = 0;
        for (Node p = tail; p != null; p = p.prev) {
            if (p.thread != null)
                ++n;
        }
        return n;
    }

    public final Collection<Thread> getQueuedThreads() {
        ArrayList<Thread> list = new ArrayList<Thread>();
        for (Node p = tail; p != null; p = p.prev) {
            Thread t = p.thread;
            if (t != null)
                list.add(t);
        }
        return list;
    }

    public final Collection<Thread> getExclusiveQueuedThreads() {
        ArrayList<Thread> list = new ArrayList<Thread>();
        for (Node p = tail; p != null; p = p.prev) {
            if (!p.isShared()) {
                Thread t = p.thread;
                if (t != null)
                    list.add(t);
            }
        }
        return list;
    }

    public final Collection<Thread> getSharedQueuedThreads() {
        ArrayList<Thread> list = new ArrayList<Thread>();
        for (Node p = tail; p != null; p = p.prev) {
            if (p.isShared()) {
                Thread t = p.thread;
                if (t != null)
                    list.add(t);
            }
        }
        return list;
    }

    public String toString() {
        int s = getState();
        String q = hasQueuedThreads() ? "non" : "";
        return super.toString() +
                "[State = " + s + ", " + q + "empty queue]";
    }


    // Internal support methods for Conditions

    final boolean isOnSyncQueue(Node node) {
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        if (node.next != null) // If has successor, it must be on queue
            return true;
        return findNodeFromTail(node);
    }

    private boolean findNodeFromTail(Node node) {
        Node t = tail;
        for (; ; ) {
            if (t == node)
                return true;
            if (t == null)
                return false;
            t = t.prev;
        }
    }

    /**
     * 更改node的状态为非condition
     * 将node加入到竞争队列里
     * 检查竞争队列的上一个节点是否取消过,取消过则激活对应的线程
     *
     * @param node
     * @return
     */
    final boolean transferForSignal(Node node) {
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
        //将node加入到 AQS里的竞争队列末尾 返回的是竞争队列里的上一个tail
        Node p = enq(node);
        int ws = p.waitStatus;
        //检查上一个tail节点是不是 cancel了, 或者 tail节点的状态有多线程竞争,直接激活这个节点的线程
        //在node状态改为signal之前 如果其他线程改变了传播状态waitStatus,这里会直接触发激活等待线程
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

    final boolean transferAfterCancelledWait(Node node) {
        if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
            enq(node);
            return true;
        }
        while (!isOnSyncQueue(node))
            Thread.yield();
        return false;
    }

    /**
     * 成功了则激活一个竞争队列的节点线程
     * 失败了,node会被设置为cancel状态
     *
     * @param node
     * @return
     */
    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

    // Instrumentation methods for conditions

    public final boolean owns(ConditionObject condition) {
        return condition.isOwnedBy(this);
    }

    public final boolean hasWaiters(ConditionObject condition) {
        if (!owns(condition))
            throw new IllegalArgumentException("Not owner");
        return condition.hasWaiters();
    }

    public final int getWaitQueueLength(ConditionObject condition) {
        if (!owns(condition))
            throw new IllegalArgumentException("Not owner");
        return condition.getWaitQueueLength();
    }

    public final Collection<Thread> getWaitingThreads(ConditionObject condition) {
        if (!owns(condition))
            throw new IllegalArgumentException("Not owner");
        return condition.getWaitingThreads();
    }


}

一些参考资料

ReentrantLock实现-AQS

AtomicInteger实现-CAS

相关文章

网友评论

      本文标题:java并发包底层原理

      本文链接:https://www.haomeiwen.com/subject/vlzlektx.html