AQS

作者: Joker____ | 来源:发表于2018-03-27 13:59 被阅读0次

    简述

    AQS全称AbstractQueuedSynchronizer,提供实现阻塞锁和相关的框架
    JDK中使用AQS来实现的同步工具类有ReentrantLock、ReentrantReadWriteLock、Semaphore、CountDownLatch。
    AQS中有一个状态变量,子类通过改变这个状态变量来改变锁的状态。


    AQS原理解析

    通过Lock来讲解,讲解ReentrantLock中的非公平锁

    final void lock() {
                //通过cas将状态设置成1
                if (compareAndSetState(0, 1))
                    //将线程设置成当前线程
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                //获取线程
                    acquire(1);
    }
     public final void acquire(int arg) {
            //尝试获取(子类实现),获取失败加入CLH队列、中断当前线程
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
    }
    
    protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
    }
    final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {//锁未被占用
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {//当前线程是本线程
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
    }
    //添加等待者节点、采用尾插法
    private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            enq(node);
            return node;
    }
    /**
    * 通过死循环遍历,只有遍历到p是头节点并且能够获得资源才可以退出。
    * shouldParkAfterFailedAcquire判断当前节点的状态,
    * 通过parkAndCheckInterrupt将线程设成waitting状态
    * 需要中断或unpack唤醒
    */
     final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
    }
    

    release

    public void unlock() {
            sync.release(1);
    }
    public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
    }
    
    protected final boolean tryRelease(int releases) {
                int c = getState() - releases;
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                boolean free = false;
                if (c == 0) {
                    free = true;
                    setExclusiveOwnerThread(null);
                }
                setState(c);
                return free;
    }
    
    private void unparkSuccessor(Node node) {
            /*
             * If status is negative (i.e., possibly needing signal) try
             * to clear in anticipation of signalling.  It is OK if this
             * fails or if status is changed by waiting thread.
             */
            int ws = node.waitStatus;
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
    
            /*找到队列中可以运行的,最靠近头部的节点
             */
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            //唤醒该线程
            if (s != null)
                LockSupport.unpark(s.thread);
    }
    

    讲解CLH队列

    <pre>
         *      +------+  prev +-----+       +-----+
         * head |      | <---- |     | <---- |     |  tail
         *      +------+       +-----+       +-----+
         * </pre>
    

    CLH队列通过双向链表实现队列的功能,将等待的线程放入到队列中。


    AQS优点

    • 用户可以通过AQS快速的实现自己的同步工具类
    • AQS相对于内置锁有更高的性能。AQS通过CAS实现,实现了无锁化。

    相关文章

      网友评论

          本文标题:AQS

          本文链接:https://www.haomeiwen.com/subject/sjbrcftx.html