美文网首页
Java并发包基础——AQS源码解析

Java并发包基础——AQS源码解析

作者: 橙味菌 | 来源:发表于2019-10-03 16:49 被阅读0次

    AQS

    AbstractQueuedSynchronizer 提供了一个基于FIFO队列用于构建锁或其他相关同步装置的基础框架

    继承关系

    继承自AbstractOwnableSynchronizer

    成员变量 序列号 serialVersionUID
    线程(独占模式) exclusiveOwnerThread
    函数 获取独占线程 getExclusiveOwnerThread()
    设置独占线程 setExclusiveOwnerThread(Thread)

    数据结构

    Sync双向队列 Condition单向队列

    Node

    Node 每个被阻塞的线程都会被封装成一个Node结点,Node结点有前后Node的引用以及结点状态和结点对应线程 ​ 结点获取资源方式有共享式和独占式 ​

    节点状态 含义
    CANCELLED 表示此节点代表的线程已经被取消 1
    默认状态,无含义 0
    SIGNAL 表示此节点的下一个紧邻节点应该被unpark -1
    CONDITION 表示此节点的线程正在等待条件 (Condition单向队列的节点只有Condition和Cancelled两种情况) -2
    PROPAGATE 表示下一个aquireShared应该无条件传播 -3

    成员变量

    含义 变量
    队列首 head
    队列尾 tail
    资源 state
    自旋时间(默认1000) spinForTimeoutThreshold
    Unsafe实例 unsafe
    内存偏移地址,用于CAS stateOffset headOffset tailOffset waitStatusOffset nextOffset

    成员函数

    公共API

    作用 接口
    资源 查询资源数 getState
    独占资源 获取资源(独占,无视中断) acquire(int 获取资源数)
    获取资源(独占,可中断) acquireInterruptibly
    获取资源(独占,可中断,限时) tryAcquireNanos
    释放资源(独占) release
    共享资源 获取资源(共享,无视中断) acquireShared
    获取资源(共享,可中断) acquireSharedInterruptibly
    获取资源(共享,可中断,限时) tryAcquireSharedNanos
    释放资源(共享) releaseShared
    队列线程 队列是否为空 hasQueuedThreads
    是否有线程在获取资源 hasContended
    获取队列中第一个非head线程 getFirstQueuedThread
    线程是否在队列中 isQueued(Thread)
    判断是否有其他线程排在当前线程前 hasQueuedPredecessors
    获取队列长度 getQueueLength
    获取队列中的线程集合 getQueuedThreads
    获取队列中的独占线程集合 getExclusiveQueuedThreads
    获取队列中的共享线程集合 getSharedQueuedThreads
    条件 并发器是否持有指定条件 owns(ConditionObject)
    并发器中指定条件的队列是否为空 hasWaiters(ConditionObject)
    并发器中指定条件的队列长度 getWaitQueueLength(ConditionObject)
    并发起指定条件的队列中所有线程的集合 getWaitingThreads(ConditionObject)

    开放给子类的API

    作用 接口
    获取资源 getState()
    设置资源 setState()
    CAS设置资源 compareAndSetState()

    子类需实现接口

    作用 接口
    该线程是否正在独占资源。只有用到condition才需要去实现它。 More ActionsisHeldExclusively()
    尝试获取资源(独占方式),返回是否成功 tryAcquire(int)
    尝试释放资源(独占方式),返回是否成功 tryRelease(int)
    尝试获取资源(独占方式),返回负数 tryAcquireShared(int)
    共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。 tryReleaseShared(int)

    ConditionObject

    为AQS增加了控制线程挂起与放下的接口,实现Condition接口

    Condition接口:从Object抽取出与同步相关的方法组成的接口

    条件等待

    public final void await() throws InterruptedException {
     if (Thread.interrupted())
     throw new InterruptedException();
     //节点入列,当前线程封装为Condition型节点作为尾节点的nextWaiter插入队列(队列为空则作为头节点,尾节点Canceled则向上兼并所有Cancelled节点)
     Node node = addConditionWaiter();
     //释放当前资源数的资源并保存在临时变量中
     int savedState = fullyRelease(node);
     int interruptMode = 0;
     //节点不在Sync队列时——挂起
     while (!isOnSyncQueue(node)) {
     LockSupport.park(this);
     if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
     break;
     }
     //入列获取资源
     if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
     interruptMode = REINTERRUPT;
     if (node.nextWaiter != null) // clean up if cancelled
     unlinkCancelledWaiters();
     if (interruptMode != 0)
     reportInterruptAfterWait(interruptMode);
    }
    

    唤醒

    public final void signal() {
     if (!isHeldExclusively())
     throw new IllegalMonitorStateException();
     Node first = firstWaiter;
     if (first != null)
     doSignal(first);
    }
    ​
    ​
    public final void signalAll() {
     if (!isHeldExclusively())
     throw new IllegalMonitorStateException();
     Node first = firstWaiter;
     if (first != null)
     doSignalAll(first);
    }
    ​
    private void doSignal(Node first) {
     do {
     if ( (firstWaiter = first.nextWaiter) == null)
     lastWaiter = null;
     first.nextWaiter = null;
     } while (!transferForSignal(first) &&
     (first = firstWaiter) != null);
    }
    ​
    ​
    private void doSignalAll(Node first) {
     lastWaiter = firstWaiter = null;
     do {
     Node next = first.nextWaiter;
     first.nextWaiter = null;
     transferForSignal(first);
     first = next;
     } while (first != null);
    }
    
    final boolean transferForSignal(Node node) {
     //将节点状态更换为默认状态
     if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
     return false;
    ​
     //入列
     Node p = enq(node);
     int ws = p.waitStatus;
     //将节点前置节点设置为Signal节点,放下节点对应线程
     if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
     LockSupport.unpark(node.thread);
     return true;
    }
    

    源码分析

    acquire

    public final void acquire(int arg) {
     if (!tryAcquire(arg) &&
     acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
     selfInterrupt();
    }
    

    先插队尝试获取,失败则以独占模式入队并排队获取,排队获取失败的话中断线程

    acquire流程图

    以独占模式入队——addWaiter(Node.EXCLUSIVE)

    private Node addWaiter(Node mode) {
     Node node = new Node(Thread.currentThread(), mode);
     // Try the fast path of enq; backup to full enq on failure
     Node pred = tail;
     if (pred != null) {
     node.prev = pred;
     if (compareAndSetTail(pred, node)) {
     pred.next = node;
     return node;
     }
     }
     enq(node);
     return node;
    }
    
    addWaiter效果

    排队获取资源

    final boolean acquireQueued(final Node node, int arg) {
     boolean failed = true;
     try {
     boolean interrupted = false;
     for (;;) {
     final Node p = node.predecessor();
     //p是等候队列第一位——尝试获取资源
     if (p == head && tryAcquire(arg)) {
     setHead(node);
     p.next = null;
     failed = false;
     return interrupted;
     }
     //未获取到资源,酌情考虑挂起线程或再次尝试获取
     if (shouldParkAfterFailedAcquire(p, node) &&
     parkAndCheckInterrupt())
     interrupted = true;
     }
     } finally {
     //出现异常亦未获取到资源,取消获取
     if (failed)
     cancelAcquire(node);
     }
    }
    
    acquireQueued流程图

    尝试获取资源失败时判断是否应该挂起——shouldParkAfterFailedAcquire

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
     int ws = pred.waitStatus;
     //前一个节点状态为SINGAL——它释放资源会唤醒下一位,可以安心挂起
     if (ws == Node.SIGNAL)
     return true;
     //前一个节点状态CANCELLED——已被取消,向前兼并所有CANCELLED节点,不可挂起
     if (ws > 0) {
     do {
     node.prev = pred = pred.prev;
     } while (pred.waitStatus > 0);
     pred.next = node;
     //前一个节点不是CANCELLED,设置为SINGAL,暂时不可挂起
     } else {
     compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
     }
     return false;
    }
    

    取消获取资源——cancelAcquire

    private void cancelAcquire(Node node) {
     if (node == null)
     return;
    ​
     //释放线程
     node.thread = null;
    ​
     // 兼并节点前所有CANCELLED节点
     Node pred = node.prev;
     while (pred.waitStatus > 0)
     node.prev = pred = pred.prev;
    ​
     Node predNext = pred.next;
    ​
     //直接修改状态为CANNELED——无需CAS是因为取消获取资源优先级较高
     node.waitStatus = Node.CANCELLED;
    ​
     // 如果此节点是尾部节点直接移除
     if (node == tail && compareAndSetTail(node, pred)) {
     compareAndSetNext(pred, predNext, null);
    
     } else {
     int ws;
     //要取消获取的节点不是等待中的第一个节点/前一个节点是SINGAL节点/前一个节点CAS为SAINGAL节点成功
     //直接删除此节点,在前一个节点后拼解后续节点
     if (pred != head &&
     ((ws = pred.waitStatus) == Node.SIGNAL ||
     (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
     pred.thread != null) {
     Node next = node.next;
     if (next != null && next.waitStatus <= 0)
     compareAndSetNext(pred, predNext, next);
     } else {
     //取消获取的节点是等待中的第一个节点/前一个节点不是SINGAL节点
     //放下后续线程
     unparkSuccessor(node);
     }
    ​
     node.next = node; // help GC
     }
    }
    

    Release

    public final boolean release(int arg) {
     //尝试释放资源,成功则放下后续节点并置换head
     if (tryRelease(arg)) {
     Node h = head;
     if (h != null && h.waitStatus != 0)
     unparkSuccessor(h);
     return true;
     }
     return false;
    }
    

    独占式获取释放资源只在释放时放下线程,因而通常情况下只有head持有资源而后续节点的线程统统被挂起

    AcquireShared

    public final void acquireShared(int arg) {
     if (tryAcquireShared(arg) < 0)
     doAcquireShared(arg);
    }
    
    private void doAcquireShared(int arg) {
     //将当前线程封装成Shared型节点入队
     final Node node = addWaiter(Node.SHARED);
     boolean failed = true;
     try {
     boolean interrupted = false;
     for (;;) {
     final Node p = node.predecessor();
     //排在队首(前一位就是head)——尝试获取
     if (p == head) {
     int r = tryAcquireShared(arg);
     if (r >= 0) {
     //第一位获取成功——设置为head并doReleaseShared
     setHeadAndPropagate(node, r);
     p.next = null; // help GC
     if (interrupted)
     selfInterrupt();
     failed = false;
     return;
     }
     }
     //未成功获取——决定是否挂起
     if (shouldParkAfterFailedAcquire(p, node) &&
     parkAndCheckInterrupt())
     interrupted = true;
     }
     } finally {
     if (failed)
     cancelAcquire(node);
     }
    }
    

    共享式获取资源在成功获取资源时会调用doReleaseShared放下后续被挂起的线程,只要资源充分,可同时有多个线程持有

    releaseShared

    public final boolean releaseShared(int arg) {
     if (tryReleaseShared(arg)) {
     doReleaseShared();
     return true;
     }
     return false;
    }
    
    private void doReleaseShared() {    
     for (;;) {
     Node h = head;
     if (h != null && h != tail) {
     int ws = h.waitStatus;
     //head是Signal节点——设置为默认节点并释放后续节点
     if (ws == Node.SIGNAL) {
     if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
     continue;
     unparkSuccessor(h);
     }
     //head是默认节点——设置为Propagate节点
     else if (ws == 0 &&
     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
     continue;                // loop on failed CAS
     }
     if (h == head)                   // loop if head changed
     break;
     }
    }
    

    相关文章

      网友评论

          本文标题:Java并发包基础——AQS源码解析

          本文链接:https://www.haomeiwen.com/subject/jhidpctx.html