美文网首页
04 AQS 同步队列

04 AQS 同步队列

作者: 格林哈 | 来源:发表于2020-10-27 11:58 被阅读0次

1 数据结构

  • 阻塞的线程存在哪里?
    • AQS内部有一个Node类的FIFO双向队列,AQS依赖它同步状态。
      • 假如当前线程获取同步状态失败,AQS会将当前线程等待等信息构造成一个节点(Node)将其加入到同步队列,同时阻塞当前线程
      • 当同步状态释放,会把首节点唤醒(公平锁),使其再次获取同步状态。
 static final class Node {
        /** 标记节点是共享模式下等待 */
        static final Node SHARED = new Node();
        /** 标记节点是在独占模式下等待 */
        static final Node EXCLUSIVE = null;
        /** 因为超时或者中断,节点会被设置为取消状态,被取消的节点时不会参与到竞争中的,他会一直保持取消状态不会转变为其他状态; */
        static final int CANCELLED =  1;
        /** 后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行 */
        static final int SIGNAL    = -1;
        /** 节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal()后,改节点将会从等待队列中转移到同步队列中,加入到同步状态的获取中 */
        static final int CONDITION = -2;
        /**
         * 表示下一次共享式同步状态获取将会无条件地传播下去
         */
        static final int PROPAGATE = -3;

        /**
        *等待状态
        */
        volatile int waitStatus;

        /** 前驱节点 */
        volatile Node prev;

        /** 后继节点 */
        volatile Node next;

         /** 获取同步状态的线程 */
        volatile Thread thread;

        /**
         * 存储condition队列中的后继节点;
         */
        Node nextWaiter;

        /**
         * 共享模式返回true
         */
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        /**
         * 获取前继节点, 前驱节点不为空的时候使用
         */
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // 用于建立初始标头或SHARED标记
        }

        Node(Thread thread, Node mode) {     // 给addWaiter 使用
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // 给Condition 使用
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

2 AQS 独占用锁实现

2.1 acquire方法

  • 独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则,将会进入同步队列等待,该方法将会调用可重写的tryAcquire(int arg)方法
    public final void acquire(int arg) {
        
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt(); // 根据返回 中断当前线程
    }
    
  static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }
    
  • 主要工作

    • 尝试获取锁
    • 获取失败,执行acquireQueued
  • ReentrantLock.FairSync tryAcquire 方法


   protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                // hasQueuedPredecessors 有别的线程在队列中排了当前线程之前
                // cas设置状态为acquires,即lock方法中写死的1 
                // 成功则 设置当前线程独占锁。
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 判断是不是重入,当前线程已经持有锁, 不需要cas
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
  • addWaiter 根据给的的共享,或者独占模式 创建节点,并加入到队列
     private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
    private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}
  • acquireQueued方法 循环的尝试获取锁,直到成功为止,最后返回中断标志位
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
        //中断标志
            boolean interrupted = false;
            for (;;) {
            // 获取前继节点
                final Node p = node.predecessor();
                //如果前继节点是head,则尝试获取
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //如果p不是head或者获取锁失败,判断是否需要进行park
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

  • shouldParkAfterFailedAcquire 获取资源失败,什么时候park
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        // 如果前一个节点的状态是SIGNAL,则需要park
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
            //  waitStatus == 1 因为超时或者中断,节点会被设置为取消状态, 删除状态是已取消的节点。
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
             // 其他情况,设置前继节点的状态为SIGNAL。 不需要park
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
  • parkAndCheckInterrupt park并且校验中断
   //判断线程中断的状态实际上是为了不让循环一直执行 让他阻塞, 一直cas cpu飙升
    private final boolean parkAndCheckInterrupt() {
        // 当前线程是非中断状态,则在执行park时被阻塞
        //如果当前线程是中断状态,则park方法不起作用,会立即返回,然后parkAndCheckInterrupt方法会获取中断的状态,也就是true,并复位;
        LockSupport.park(this); 
        
        return Thread.interrupted();  //返回中断标识并进行复位
    }
  • cancelAcquire如果在循环的过程中出现了异常,则执行cancelAcquire方法,用于将该节点标记为取消状态
 private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
        if (node == null)
            return;

        node.thread = null;

        // Skip cancelled predecessors
        // 通过前继节点跳过取消状态的node pred 状态 waitStatus<=0
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        // predNext is the apparent node to unsplice. CASes below will
        // fail if not, in which case, we lost race vs another cancel
        // or signal, so no further action is necessary.
        // 取过滤后的前继节点的后继节点 predNext.waitStatus = 1
        Node predNext = pred.next;

        // Can use unconditional write instead of CAS here.
        // After this atomic step, other Nodes can skip past us.
        // Before, we are free of interference from other threads.
        // 设置状态为取消状态
        node.waitStatus = Node.CANCELLED;

        // If we are the tail, remove ourselves.
        // 1.如果当前节点是tail:
        // 尝试更新tail节点,设置tail为pred;
        // 更新失败则返回,成功则设置tail的后继节点为null
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
        
            // 如果当前节点不是head的后继节点
            // 判断当前节点的前继节点的状态是否是SIGNAL,如果不是则尝试设置前继节点的状态为SIGNAL
            // 上面两个条件如果有一个返回true,则再判断前继节点的thread是否不为空
            // 若满足以上条件,则尝试设置当前节点的前继节点的后继节点为当前节点的后继节点,也就是相当于将当前节点从队列中删除
            // If successor needs signal, try to set pred's next-link
            // so it will get one. Otherwise wake it up to propagate.
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
            // // 3.如果是head的后继节点或者状态判断或设置失败,则唤醒当前节点的后继节点
                unparkSuccessor(node);
            }

            node.next = node; // help GC
        }
    }

相关文章

  • AbstractQueuedSynchronizer 队列同步器

    AbstractQueuedSynchronizer 队列同步器(AQS) 队列同步器 (AQS), 是用来构建锁...

  • AQS框架

    AQS(AbstractQueuedSynchronized)队列同步器理解AQS需要知道1.队列结构,2.同步的...

  • JUC原理之AQS

    AQS AQS即AbstractQueuedSynchronizer(抽象队列同步器)。AQS是Reentrant...

  • 公平锁和非公平锁

    AQS内部维护着一个FIFO队列,该队列就是CLH同步队列。CLH同步队列是一个FIFO双向队列,AQS依赖它来完...

  • 5 AQS

    AQS简介 AQS:AbstractQueuedSynchronizer,即队列同步器。它是构建锁或者其他同步组件...

  • 如何封装同步队列的线程池

    同步队列 看到同步队列,第一想到的是AQS。队列同步器(AQS)是用来构建锁或者其他同步组件的基础框架,使用一个i...

  • 并发七:AQS队列同步器实现分析

    AQS 队列同步器(AbstractQueuedSynchronizer)简称AQS,是J.U.C同步构件的基础,...

  • Java基础-线程 (五)

    队列同步器(AQS:AbstractQueuedSynchronizer) AQS是用来构建锁或者其他同步组件的基...

  • Day30

    AQS 同步等待队列 & 条件等待队列 ReentrantLock同步执行,类似synchronized可重入 s...

  • JAVA并发编程(七)AQS源码简析

    AQS:AbstractQueuedSynchronizer直译"(抽象)队列同步器"。AQS是java.util...

网友评论

      本文标题:04 AQS 同步队列

      本文链接:https://www.haomeiwen.com/subject/gyztvktx.html