美文网首页程序员JAVA学习文集
用火车购票的方式打开 AQS同步器(一)

用火车购票的方式打开 AQS同步器(一)

作者: 小燃儿 | 来源:发表于2020-08-26 23:38 被阅读0次

    引言

    AQS(AbstractQueuedSynchronizer,下文直接使用AQS的简称)是java JUC包下提供的,基于FIFO队列,可以用于构建锁或者其他相关同步装置的基础框架,使用者可以继承它来实现阻塞锁或者其他功能。由于是一个框架,所以其概念会比较抽象,因此这里会使用乘火车车购票的例子来帮助理解。


    AQS同步器结构-成员变量

    AQS的三个重要的成员变量,其功能都是围绕这三个变量实现的。

      /**
         * Head of the wait queue, lazily initialized.  Except for
         * initialization, it is modified only via method setHead.  Note:
         * If head exists, its waitStatus is guaranteed not to be
         * CANCELLED.
         */
        private transient volatile Node head;
    
        /**
         * Tail of the wait queue, lazily initialized.  Modified only via
         * method enq to add new wait node.
         */
        private transient volatile Node tail;
    
        /**
         * The synchronization state.
         */
        private volatile int state;
    
    • State: 同步器的状态量。类似于火车对外出售的车票总数量。
    • head:等待队列的头节点。类似于车票售完后,第一个预约候补的人,可以理解为此时这人已经再候补了。
    • tail:等待队列的尾节点。类似于所有预约候补的最后一个人。


      image

    AQS同步器结构-Node

    然后来看看Node的结构是怎么样的?

    // 状态:可能退出状态等
    volatile int waitStatus;
    // 前面的人
     volatile Node prev;
    // 后面的人
     volatile Node next;
    // 我自己
     volatile Thread thread;
    /**节点模式:排他或者共享*/
     Node nextWaiter;
    

    waitStatus: 等待状态。 AQS提供四种状态:

    1. CANCELLED:退出状态。类似购车票时,由于官方原因候补失败的人
    2. SIGNAL:唤醒状态。正在候补或者候补完成,此时后面排队的人可以尝试候补了。
    3. CONDITION: 条件状态。这个是指在指定条件下,该乘客才会去预约候补(比如有紧急事务,必须今天去某地)
    4. PROPAGATE:传播状态。对于排队的队列,可以共享候补的信息可以向后传播。


      image

    AQS同步器排他逻辑实现

    排他,是指资源被独占,此时其他线程无法获取此资源,只能等待到当前资源被释放(资源独享,比如买我了车票后,这辆火车就只有我能使用,其他人都不能使用),其原理主要是通过tryAcquire方法返回资源是否被占用,如果是就进入等待状态,如果否就执行当前任务。具体实现如下:

    接下来来看看AQS提供的主要方法(排他和共享)
    排他(资源独享,比如买我了车票后,这辆火车就只有我能使用,其他人都不能使用):

     获取车票,没有获取到则进入等待
     public final void acquire(int arg) {
            if (!tryAcquire(arg) &&  // 尝试获取车票,由于是独占的,所以返回值直接使用true or false
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))  // 没有买到车的人进行排队候补
                selfInterrupt(); // 是否自我中断(买车票的时候,总是选择可以退票或者不买票了吧)
        }
        addWaiter(Node.EXCLUSIVE): 以排他模式加入到排队当中;
    
    尝试获取车票,否则等待
    final boolean acquireQueued(final Node node, int arg) { // 等待唤醒
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();  // 排在自己前面的人
                    if (p == head && tryAcquire(arg)) { // 是队列第一个,那么自己也可以去尝试候补
                        setHead(node);  // 候补成功后,那自己是第一个了(正在候补)
                        p.next = null; // help GC // 原来的排在第一位的候补成功了就从队列中剔除了
                        failed = false;
                        return interrupted;  // 候补上车
                    }
                    if (shouldParkAfterFailedAcquire(p, node) && // 决定是否需要挂起(见下)
                        parkAndCheckInterrupt())  // 挂起等待,因为前面的人还没获取,自己肯定没机会就不浪费力气去查询候补了
                        interrupted = true;  // 取消候补
                }
            } finally {
                if (failed)
                    cancelAcquire(node); // 由于系统原因人员无法候补,标记为退出状态
            }
        }
    
    
    是否需要进行阻塞
      private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus;
            if (ws == Node.SIGNAL) // 前面的人已经是唤醒状态,正在等待有车票的时候
                /*
                 * This node has already set status asking a release
                 * to signal it, so it can safely park.
                 */
                return true;  // 自己可以挂起了,只要等待有车票的时候会通知自己
            if (ws > 0) {  // 将排在自己前面退出状态的人都剔除
                /*
                 * Predecessor was cancelled. Skip over predecessors and
                 * indicate retry.
                 */
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                /*
                 * waitStatus must be 0 or PROPAGATE.  Indicate that we
                 * need a signal, but don't park yet.  Caller will need to
                 * retry to make sure it cannot acquire before parking.
                 */
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // 将排在前面的人改为需要唤醒状态
                                                                                     // 并再给一次机会去获取车票
            }
            return false;  // 自己不需要挂起
        }
    

    释放资源:

    释放资源(其中有人退票了或者下车了,又有空余的车票了,就通知队列里的人可以尝试候补):
    public final boolean release(int arg) {
            if (tryRelease(arg)) {  // 自己退票或下车,有空余的车票了
                Node h = head;  // 候补队列中第一人
                if (h != null && h.waitStatus != 0)   // 存在 并且状态已经不是0
                    unparkSuccessor(h);  // 通知下一个人很快就可以候补了
                return true;
            }
            return false;
        }
    
    
         private void unparkSuccessor(Node node) { //第一个人,正在候补
            /*
             * If status is negative (i.e., possibly needing signal) try
             * to clear in anticipation of signalling.  It is OK if this
             * fails or if status is changed by waiting thread.
             */
            int ws = node.waitStatus;
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);  // 将状态设置为0,
    
            /*
             * Thread to unpark is held in successor, which is normally
             * just the next node.  But if cancelled or apparently null,
             * traverse backwards from tail to find the actual
             * non-cancelled successor.
             */
            Node s = node.next;  // 排在后面的人
            if (s == null || s.waitStatus > 0) { // 剔除退出状态的人
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
                LockSupport.unpark(s.thread); // 通知排在后面且没有退出的人,可以起来关注候补情况了(acquireQueued)
        }
    

    相关文章

      网友评论

        本文标题:用火车购票的方式打开 AQS同步器(一)

        本文链接:https://www.haomeiwen.com/subject/mpjqsktx.html