美文网首页
AQS源码浅析(3)——同步队列排队方法

AQS源码浅析(3)——同步队列排队方法

作者: 墨_0b54 | 来源:发表于2022-04-06 19:44 被阅读0次

    一、 enq往队列里加入一个节点

    node以CAS同步的方式插入队列的完整过程。

    private Node enq(final Node node) { //将节点插入同步队列尾部
        for (;;) {
            Node t = tail;
            if (t == null) { //初始化一个空的node作为头尾节点
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t; //自旋设置node的prev指针(操作只影响node自身,不影响队列)
                if (compareAndSetTail(t, node)) { //注意:如果CAS设置node为Tail尾节点失败,node的prev已经不为null
                    t.next = node; //自旋设置node为tail成功,设置原本尾节点的next
                    return t; //返回node的前驱节点
                }
            }
        }
    }
    

    二、addWaiter根据模式节点创建或者加入队列

    根据模式创建节点并加入等待队列。

    //参数:Node.EXCLUSIVE 为独占模式,Node.SHARED 为共享模式
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        Node pred = tail;
        if (pred != null) { //尾节点不等于null
            node.prev = pred;
            if (compareAndSetTail(pred, node)) { //设置尾节点
                pred.next = node;
                return node;
            }
        }//尝试快速加入队列尾部,有竞争时或者队列未初始化会失败。
        enq(node);
        return node;
    }
    

    三、setHead设置头节点

    • 将队列头设置为节点,从而出队。
    • 仅由锁的获取方法调用。
    • 为了 GC 和防止不必要的唤醒和遍历,清空未使用的字段。
    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }
    

    四、setHeadAndPropagate设置头节点

    • propagate 大于0代表后续共享模式获取锁能成功,否则都不能成功
    • 设置当前节点为队列头
    • 如果参数propagate > 0 或者head节点状态小于0,并且下一个节点是共享节点,直接传播执行doReleaseShared方法
    • 与setHead的区别是setHeadAndPropagate需要传播唤醒
    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        /*
         * Try to signal next queued node if:
         *   Propagation was indicated by caller,
         *     or was recorded (as h.waitStatus either before
         *     or after setHead) by a previous operation
         *     (note: this uses sign-check of waitStatus because
         *      PROPAGATE status may transition to SIGNAL.)
         * and
         *   The next node is waiting in shared mode,
         *     or we don't know, because it appears null
         *
         * The conservatism in both of these checks may cause
         * unnecessary wake-ups, but only when there are multiple
         * racing acquires/releases, so most need signals now or soon
         * anyway.
         */
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())//下一个节点是共享模式节点
                doReleaseShared();//传播doReleaseShared行为
        }
    }
    

    五、doReleaseShared共享模式下是否共享值

    共享模式的释放操作——唤醒后继并确保传播。 (注:独占模式下,如果需要唤醒,释放锁的操作就相当于调用head的unparkSuccessor方法)。唤醒头部SIGNAL状态的节点,如果头节点是状态是0(代表没有可以唤醒的后继),将头节点状态设置为PROPAGATE,可以保证有后继时继续执行doReleaseShared

    private void doReleaseShared() { //同步唤醒后继节点线程并确保传播唤醒
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) {//自旋,直到唤醒一个后继,或成功设置head的state为PROPAGATE
            Node h = head;
            if (h != null && h != tail) {//确保同步队列有线程节点
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {//head的state为SIGNAL唤醒后继节点
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//修改head的state失败,有竞争就自旋
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);//唤醒后继节点
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) //head的state设置为PROPAGATE来确保传播
                    continue;                // 失败,自旋
            }
            if (h == head)                   // head节点未改变代表此次操作成功
                break;
        }
    }
    

    六、unparkSuccessor唤醒后继节点线程

    如果node存在后继,唤醒node的后继节点。
    要解除阻塞的线程在后继节点中,通常只是下一个节点。但如果取消或明显为空,则从尾部向后遍历以找到实际未取消的继任者。
    如果不存在:从tail开始,通过prev向head遍历找到node未取消的后继节点然后唤醒

    private void unparkSuccessor(Node node) { 
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0) //清除负数状态
            compareAndSetWaitStatus(node, ws, 0);
        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;//得到node的后继节点
        if (s == null || s.waitStatus > 0) { //不存在或者已取消(大于0代表已取消),从尾部开始向前找后继
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)//唤醒后继线程
            LockSupport.unpark(s.thread);
    }
    

    相关文章

      网友评论

          本文标题:AQS源码浅析(3)——同步队列排队方法

          本文链接:https://www.haomeiwen.com/subject/narrsrtx.html