美文网首页
AQS之独占锁

AQS之独占锁

作者: donglq | 来源:发表于2017-12-16 15:54 被阅读0次

简介

AbstractQueuedSynchronizer(简称AQS)是JAVA中用来实现锁的基础框架,其内部维护了一个等待队列,用来存储等待获取锁的线程。队列遵循FIFO(先进先出)的原则,每次入队列都是添加到队列的尾部,tail总是指向尾部节点;出队列就是将head节点设置指向head的next节点,head节点中没有线程,其代表当前获取到锁的线程。其可实现共享锁和独占锁两种方式的锁。

+------+  prev +-----+       +-----+
| head | <---- |     | <---- | tail|  
+------+       +-----+       +-----+

队列节点类

static final class Node {

    /** 共享模式标识 */
    static final Node SHARED = new Node();

    /** 独占模式标识 */
    static final Node EXCLUSIVE = null;

    /** 节点状态值,表示线程被取消 */
    static final int CANCELLED =  1;

    /** 节点状态值,表示后一个节点需要被unparking唤醒 */
    static final int SIGNAL    = -1;

    /** 节点状态值,表示在等待某一个条件,此节点在condition队列中,而不是sync队列中 */
    static final int CONDITION = -2;

    /** 节点状态值,唤醒可以向后传递,共享模式时使用 */
    static final int PROPAGATE = -3;

    /**
     * 节点状态,sync时默认为0;condition时默认初始化为Node.CONDITION
     */
    volatile int waitStatus;

    /**
     * 前一个节点
     */
    volatile Node prev;

    /**
     * 后一个节点
     */
    volatile Node next;

    /**
     * 节点代表的线程
     */
    volatile Thread thread;

    /**
     * sync时为标识位
     */
    Node nextWaiter;

    /**
     * 节点是否为共享模式
     */
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    /**
     * 返回当前节点的前一个节点,前一个节点为空则抛异常
     *
     * @return the predecessor of this node
     */
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {
    }

    Node(Thread thread, Node mode) {     // addWaiter时使用
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Condition时使用
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

AQS中的属性

/**
 * 等待队列的头结点,惰性初始化。除初始化时,只能通过setHead方法修改。
 * 注意:如果头结点存在,它的状态确保不能是CANCELLED状态。
 */
private transient volatile Node head;

/**
 * 等待队列的尾结点,惰性初始化。只能通过enq方法添加新的等待节点
 */
private transient volatile Node tail;

/**
 * 同步状态标识
 */
private volatile int state;

AQS中的方法

  • enq,即入队列(enqueue)方法,将节点插入到队列的尾部
/**
 * 插入节点到队列尾部,必要时需要进行初始化
 * @param node 需要被插入的节点
 * @return 此节点的前一个节点
 */
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // 尾节点为空,表示队列未被初始化,设置头节点为一个空的Node,并将tail指向头结点。设置完后进行下一次循环,进入else中
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t; //设置node的前节点为队列的尾节点
            if (compareAndSetTail(t, node)) {
                t.next = node; //原子更新尾节点成功后,node成为新的尾节点tail,原来的尾节点t成为node的前节点,设置t的后节点为node
                return t;
            }
        }
    }
}
  • addWaiter,为当前线程创建节点并加入到等待队列中
/**
 * 为当前线程创建节点并加入到队列中
 *
 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
 * @return the new node
 */
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) { // 与enq方法中的else中功能相同
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node); //尾节点tail为null或者更新node为尾节点失败,执行此处
    return node;
}
  • setHead,设置队列的头结点,也是出队列。
/**
 * 设置队列的头结点,也是出队列。被acquire方法调用
 *  
 * @param node the node
 */
private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}
  • unparkSuccessor,唤醒挂起的节点
private void unparkSuccessor(Node node) {
    /*
     * 当前结点状态小于0时,设置状态为0
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
    * 唤醒的线程被successor持有,通常情况下就是下一个节点。但是如果被取消或者为null了,
    * 需要从队列尾tail向前查找离node最近的未被取消的successor
    */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread); //唤醒线程。【对同一线程先unpark再park,park是不会阻塞线程的。】
}
  • cancelAcquire,取消正在尝试获取锁的node
private void cancelAcquire(Node node) {
    // Ignore if node doesn't exist
    if (node == null)
        return;

    node.thread = null;

    // 跳过已取消的前节点
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    // predNext is the apparent node to unsplice. CASes below will
    // fail if not, in which case, we lost race vs another cancel
    // or signal, so no further action is necessary.
    Node predNext = pred.next;

    // Can use unconditional write instead of CAS here.
    // After this atomic step, other Nodes can skip past us.
    // Before, we are free of interference from other threads.
    node.waitStatus = Node.CANCELLED;

    // 如果node为尾节点,则将有效的前节点设置为尾节点
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null); //尾节点的next设置为null
    } else {
        // If successor needs signal, try to set pred's next-link
        // so it will get one. Otherwise wake it up to propagate.

        int ws;
        if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) {
            // 1. pred不是头节点
            // 2. pred的状态为Node.SIGNAL或者状态<=0同时可以设置为Node.SIGNAL状态
            // 3. pred的持有的线程不为空
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next); //pred的next节点设置为当前节点的下一个节点
        } else {
            unparkSuccessor(node);
        }
        node.next = node; // help GC
    }
}
  • shouldParkAfterFailedAcquire,检查与更新获取锁失败的节点的状态,如果线程需要挂起,返回true.
/**
 *
 * 检查与更新获取锁失败的节点的状态,如果线程需要挂起,返回true.
 *
 * @param pred node's predecessor holding status
 * @param node the node
 * @return {@code true} if thread should block
 */
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * 这个节点已经设置为允许释放锁release来唤醒它的状态,所以它可以被挂起
         */
        return true;
    if (ws > 0) {
        /*
         * 跳过所有已经被取消的前节点
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * 此处的waitStatus状态为0或者PROPAGATE。
         * 表示此节点需要一个唤醒信号,但是还没有被挂起。
         * 调用者需要重试来确保它在挂起前不能获取到锁
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
  • selfInterrupt,中断当前线程
static void selfInterrupt() {
    Thread.currentThread().interrupt();
}
  • parkAndCheckInterrupt,挂起当前线程并检查是否中断
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}
  • acquireQueued,对于已经在队列中的节点,以独占的方式获取锁
/**
 * 对于已经在队列中的节点,以独占的方式获取锁。condition的wait和acquire方法会使用它
 *
 * @param node the node
 * @param arg the acquire argument
 * @return {@code true} if interrupted while waiting
 */
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true; //是否失败
    try {
        boolean interrupted = false; //是否中断
        for (;;) {
            /**
             * 队列中的所有节点都在此处循环调用,
             * 只有pred为头节点的节点可能获取到锁,
             * 这时重新设置获取到锁的节点为头结点(获取到锁的节点出队列)并返回
             */
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) { //前节点为头节点同时获取锁成功
                setHead(node); //将node节点设置为头结点
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //检查是否需要被挂起
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
  • acquire,获取独占锁
/**
 * 调用tryAcquire获取锁,如果成功则结束
 * 如果失败,则用addWaiter为当前线程创建独占锁节点加入到等待队列中
 * 调用acquireQueued方法不断循环检查是否能获取到锁或者挂起线程,直到获取到锁后返回
 * 可用来实现Lock#lock方法
 * @param arg
 * @param arg the acquire argument.  This value is conveyed to
 *        {@link #tryAcquire} but is otherwise uninterpreted and
 *        can represent anything you like.
 */
public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
  • release,释放锁
**
 * 释放锁。
 * 可用来实现{@link Lock#unlock}
 *
 * @param arg arg the release argument.  This value is conveyed to
 *        {@link #tryRelease} but is otherwise uninterpreted and
 *        can represent anything you like.
 * @return the value returned from {@link #tryRelease}
 */
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        /*
         * 释放锁成功,头结点不为空且状态不为0,则唤醒头结点后面节点的线程;
         * 唤醒后,被唤醒的线程则在acquireQueued方法的for循环里循环获取锁
         */
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
  • tryAcquire, 需子类实现,以独占的方式获取锁
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}
  • tryRelease,需子类实现,释放独占所
protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}

相关文章

网友评论

      本文标题:AQS之独占锁

      本文链接:https://www.haomeiwen.com/subject/xbwswxtx.html