美文网首页技术栈
2019-04-23——Java并发包 AQS&Lock

2019-04-23——Java并发包 AQS&Lock

作者: 烟雨乱平生 | 来源:发表于2019-04-23 18:04 被阅读0次

    首先是一个例子

    static void t1(){
        final Lock lock = new ReentrantLock();
        new Thread(() -> {
            try{
                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                /*获取锁*/
                lock.lock();
                System.out.println("线程1获取到锁");
                System.out.println("线程1执行操作开始");
                for (int i = 0; i < 10; i++) {
                    System.out.println("线程1执行操作");
                    try {
                        Thread.sleep(20000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println("线程1执行操作完成");
            }finally {
                System.out.println("线程1释放锁");
                lock.unlock();
            }
        },"t1").start();
    
        new Thread(()->{
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            try{
                /*获取锁*/
                lock.lock();
                System.out.println("线程2获取到锁");
                System.out.println("线程2执行操作开始");
                for (int i = 0; i < 10; i++) {
                    System.out.println("线程2执行操作");
                    try {
                        Thread.sleep(200);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println("线程2执行操作完成");
            }finally {
                System.out.println("线程2释放锁");
                lock.unlock();
            }
    
        },"t2").start();
    }
    

    在该例子中有两个线程t1t2,这两个线程都在竞争lock
    假设先执行到线程t1lock.lock()代码段。
    接下来分析lock方法

    public void lock() {
        sync.lock();
    }
    
    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }
    
    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
    

    (1)假设这个时候没有其他线程竞争到锁(即t2线程还没有执行到lock.lock()代码段),即AbstractQueuedSynchronizerstate属性没有被修改,那么它此时的默认值为0,所以compareAndSetState方法会执行成功,即线程t1竞争到锁。所以接下来会执行setExclusiveOwnerThread方法。

    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }
    

    (2)假设在t1线程竞争到锁后,t2线程也执行到了这个方法,由于t1已经将AbstractQueuedSynchronizerstate属性改为1了,所以t2线程执行compareAndSetState方法会返回false。所以将会调用acquire方法。

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
    
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
    

    t2线程执行acquire方法的时候会首先执行tryAcquire方法尝试获取锁,通过调用链执行到nonfairTryAcquire方法。final Thread current = Thread.currentThread();得到的current为线程t2
    在执行到int c = getState();的时候,t1线程有可能释放了锁,也有可能没有释放锁。

    • 假设此时持有锁的线程已经释放锁了,即t1线程已经执行lock.unlock();,并且释放锁了。
      此时int c = getState();之后获取的c的值为0。所以会执行
    if (compareAndSetState(0, acquires)) {
        setExclusiveOwnerThread(current);
        return true;
    }
    

    假设此时没有其他线程竞争到锁,那么t2线程将会竞争到锁,将AbstractQueuedSynchronizerstate属性改为1并返回true。如果此时被并发的其他线程竞争到了锁,那么nonfairTryAcquire方法将会直接返回false

    • 假设此时持有锁的线程还没有释放锁,即t1线程仍然在执行中没有释放锁。
      此时int c = getState();之后获取的c的值不为0。由于currentt2getExclusiveOwnerThread()t1,所以将会直接返回false。
    • 假设持有所得线程再次调用lock方法
      此时将会执行
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    

        即可重入锁。

    如果t2调用tryAcquire方法获取锁失败将会执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
    
    
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    

    首先调用addWaiter方法将该线程入队。

    1. 通过当前线程t2,创建一个节点
    2. 获取双向队列的尾节点
    3. 如果尾节点不为空则将t2节点入队尾。
    4. 如果t2节点入队成功则返回该节点,否则执行下一步
    5. 如果当前队列的尾节点为空,或则新的节点没有入队成功(即出现并发情况其他线程入队了),则调用enq方法。

    看下enq方法

    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
    
    1. 获取双向队列的尾节点
    2. 如果尾节点为空(即并发入队的节点现在有全部出队了,或者第一个线程入队),则创建一个新的空节点,头尾指针均指向该空节点。返回第一步。
    3. 如果尾节点不为空则尝试将该节点入队,直到成功为止。

    当线程入队后执行acquireQueued方法

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
    1. 获取当前节点的前置节点
    2. 如果当前节点的前置节点是头节点则尝试获取独占锁
    3. 如果获取到锁,就把当前节点设置成头节点
    4. 如果前置节点不是头节点或者获取锁失败继续往下执行
    5. 执行shouldParkAfterFailedAcquire方法
    6. 如果上一步返回true,则执行parkAndCheckInterrupt方法,否则执行第一步
    7. 如果parkAndCheckInterrupt方法返回true,则将interrupted设置为true

    看下shouldParkAfterFailedAcquire方法

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    
    1. 如果前置节点的等待状态是SIGNAL,则直接返回true
    2. 如果前置节点的状态是CANCELLED(只有CANCELLED的值大于0),则寻找前一个不为CANCELLED状态的节点,返回false
    3. 否则的话将前置节点的等待状态置为SIGNAL,返回false

    在看下parkAndCheckInterrupt方法

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
    
    1. 阻塞当前线程
    2. 清除当前线程的中断状态并返回线程是否为中断状态

    所以acquireQueued方法的语义是:
    如果当前节点的前置节点是头节点,则允许该节点参与锁资源的竞争,如果竞争锁失败则获取当前节点的前一个为取消的节点,设置其等待状态为信号量,并阻塞当前线程,等待前一个节点唤醒,并返回该线程的中断标记

    整个获取锁的过程可以描述为:


    lock.png

    当前有个线程已经获取锁并执行,剩下的线程首先尝试竞争独显锁,竞争失败则进入等待队列。当节点的前置节点为头节点则允许线程自旋去竞争锁,如果锁被新的线程竞争走了,则该节点进入阻塞状态。

    相关文章

      网友评论

        本文标题:2019-04-23——Java并发包 AQS&Lock

        本文链接:https://www.haomeiwen.com/subject/alsvgqtx.html