美文网首页程序员
Java并发-Lock接口与队列同步器AQS

Java并发-Lock接口与队列同步器AQS

作者: 油多坏不了菜 | 来源:发表于2019-02-27 14:10 被阅读3次

    Lock接口

    与synchronized关键字相比拥有了锁获取与释放的可操作性,可非阻塞的获取锁、可中断的获取锁、超时获取锁

    标准接口定义

     void lock();
     void lockInterruptibly() throws InterruptedException;//可中断的阻塞获取锁
    Condition newCondition();
    boolean tryLock();//非阻塞的获取锁,马上返回
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;//最大阻塞time时间,超时之后返回,可中断
    void unlock();//释放锁
    

    标准使用方式

    注意不要将获取锁的过程写到try块里面

    Lock lock = new ReentrantLock();
    lock.lock();
    try{
            //to do
    }finally{
            lock.unlock();
    }
    

    队列同步器AQS(AbstractQueuedSynchronizer)

    简单的理解是对于请求获取锁的线程,如果当前锁已经被别的线程获取,那么当前线程需要到队列中排队。如果队列中的第一个线程释放了锁,那么就会唤醒排在它后面的一个线程去获取锁(如果非公平锁,可能会被插队)。

    核心字段

    private volatile int state;//同步状态
    private transient volatile Node head;//同步队列的头指针
    private transient volatile Node tail;//同步队列的尾指针
    

    核心方法

    int getState();
    void setState(int newState);
    boolean compareAndSetState(int expect, int update);
    boolean isHeldExclusively();
    
    void acquire(long arg);
    boolean release(long arg);
    void acquireShared(long arg);
    boolean releaseShared(long arg);
    

    同步队列的Node节点

    这里最好结合源码的英文注释理解

    static final class Node {
            /** Marker to indicate a node is waiting in shared mode */
            static final Node SHARED = new Node();
            /** Marker to indicate a node is waiting in exclusive mode */
            static final Node EXCLUSIVE = null;
    
            /** waitStatus value to indicate thread has cancelled */
            static final int CANCELLED =  1;
            /** waitStatus value to indicate successor's thread needs unparking */
            static final int SIGNAL    = -1;
            /** waitStatus value to indicate thread is waiting on condition */
            static final int CONDITION = -2;
            /**
             * waitStatus value to indicate the next acquireShared should
             * unconditionally propagate
             */
            static final int PROPAGATE = -3;
            volatile int waitStatus;
            volatile Node prev;
            volatile Node next;
            volatile Thread thread;
            Node nextWaiter;
            final boolean isShared() {
                return nextWaiter == SHARED;
            }
    

    独占式同步状态的获取源码分析

    acquire方法

    首先非阻塞的获取同步状态(tryAcquire方法),如果失败的话会把当前线程信息构造一个Node节点加入到同步队列的尾部进行排队(这里可能会使线程进入waiting状态)

    public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    

    tryAcquire方法

    非阻塞的获取同步状态,失败返回false.这个是子类实现的。(模板方法模式)

    addWaiter方法

    将当前节点加入到同步队列的尾部

    private Node addWaiter(Node mode) {
    // 这里mode为EXCLUSIVE,也就是 node.nextWaiter==null.
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            Node pred = tail;
    //下面if逻辑是尝试快速插入到队列尾部,如果失败到enq函数
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            enq(node);//当前节点插入到队列尾部
            return node;
        }
    

    enq 方法

    当前节点插入到队列尾部,保证成功,可能重试多次。采用CAS更改改共享变量的标准写法。

     private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    

    acquireQueued方法

    对于刚进入队列排队的节点或者刚被唤醒的线程(LockSupport.unpark(thread)),会检查它前面的那个节点是不是head节点,如果是的话尝试获取锁。如果不是的话会判断当前线程是否可以到waiting状态(释放cpu资源,避免盲等),(判断的依据主要是当前节点的前一个节点的waitStatus是否为SIGNAL),如果可以睡眠,就会睡眠直到被前一个节点唤醒。

     final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    //如果节点的前驱节点为头节点,并且获取同步状态成功,则把当前节点设置为头节点,然后返回
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                  // 判断当前节点是否可以睡眠或者把当前节点状态改到可睡眠状态(可以睡否最重要的依据是睡后有人唤醒你,否则你就醒不来了)
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    shouldParkAfterFailedAcquire方法

    如果暂时不能获取同步状态,线程会考虑睡一会(park)

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus;
            //如果当前节点的前驱节点的waitStatus为Node.SIGNAL,则其前驱节点在释放锁之后就会唤醒(uppark)当前线程,所以当前线程可以放心的park
            if (ws == Node.SIGNAL)
                /*
                 * This node has already set status asking a release
                 * to signal it, so it can safely park.
                 */
                return true;
    //如果前驱节点已经放弃获取锁,更新其前驱节点
            if (ws > 0) {
                /*
                 * Predecessor was cancelled. Skip over predecessors and
                 * indicate retry.
                 */
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                /*
                 * waitStatus must be 0 or PROPAGATE.  Indicate that we
                 * need a signal, but don't park yet.  Caller will need to
                 * retry to make sure it cannot acquire before parking.
                 */
                //尝试把前驱节点状态改为Node.SIGNAL
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
    

    parkAndCheckInterrupt方法

    当前线程park,到waiting 状态,线程阻塞于当前方法直到被中断或者unpark

     private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            return Thread.interrupted();
        }
    

    独占式同步状态释放源码分析

    release方法

    释放同步状态,唤醒队里中的下一个节点。

    public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    

    unparkSuccessor方法

    唤醒队列中下一个节点去参与锁的竞争

    private void unparkSuccessor(Node node) {
            /*
             * If status is negative (i.e., possibly needing signal) try
             * to clear in anticipation of signalling.  It is OK if this
             * fails or if status is changed by waiting thread.
             */
            int ws = node.waitStatus;
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
    
            /*
             * Thread to unpark is held in successor, which is normally
             * just the next node.  But if cancelled or apparently null,
             * traverse backwards from tail to find the actual
             * non-cancelled successor.
             */
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
                LockSupport.unpark(s.thread);
        }
    

    共享式获取同步状态源码分析

    acquireShared方法

    如果没有获取同步状态成功,就会将当前节点更新加入到同步队列。

        public final void acquireShared(int arg) {
            if (tryAcquireShared(arg) < 0)
                doAcquireShared(arg);
        }
    

    doAcquireShared

    同步队列中获取独占式同步状态,类似于独占模式同步状态的获取。

    private void doAcquireShared(int arg) {
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head) {
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            //如果当前节点获取同步状态成功,会将当前节点设为头节点,
                           //并且传播到它的下一个节点(下一个节点尝试获取同步状态)
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            if (interrupted)
                                selfInterrupt();
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    setHeadAndPropagate方法

    设置node节点为head节点,并且尝试传播到下一个节点(因为共享模式的同步状态是可以有多个线程同时获取的)

    private void setHeadAndPropagate(Node node, int propagate) {
            Node h = head; // Record old head for check below
            setHead(node);
            if (propagate > 0 || h == null || h.waitStatus < 0 ||
                (h = head) == null || h.waitStatus < 0) {
                Node s = node.next;
             //如果下一个节点是共享模式的节点,就考虑唤醒下一个节点去竞争性获取共享状态。
                if (s == null || s.isShared())
                    doReleaseShared();
            }
        }
    

    共享式释放同步状态源码分析

    releaseShared方法

     public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }
    

    相关文章

      网友评论

        本文标题:Java并发-Lock接口与队列同步器AQS

        本文链接:https://www.haomeiwen.com/subject/uldtuqtx.html