美文网首页我爱编程数据结构和算法分析
[Java源码][并发J.U.C]---用代码一步步实现AQS(

[Java源码][并发J.U.C]---用代码一步步实现AQS(

作者: nicktming | 来源:发表于2018-08-08 08:30 被阅读1次

    前言

    本文接着上文 [Java源码][并发J.U.C]---用代码一步步实现AQS(1)---独占锁 继续分析AQS中的其他源码, 包括完善mutex类中的其他方法.

    本文源代码: 源码

    上文分析到了AQS中的acquire方法获得锁是不响应线程的, 接下来分析如何响应中断式的获取锁.

    响应中断式的获取锁

    Mutex类中修改lockInterruptibly方法如下:

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }
    

    AQS中加入如下代码:

        private void doAcquireInterruptibly(int arg)
                throws InterruptedException {
                final Node node = addWaiter(Node.EXCLUSIVE);
                boolean failed = true;
                try {
                    for (;;) {
                        final Node p = node.predecessor();
                        if (p == head && tryAcquire(arg)) {
                            setHead(node);
                            p.next = null; // help GC
                            failed = false;
                            return;
                        }
                      /**
                       * 当判断是被中断而不是被唤醒的时候,抛出InterruptedException
                       * 
                       */
                        if (shouldParkAfterFailedAcquire(p, node) &&
                            parkAndCheckInterrupt())  // 2
                            throw new InterruptedException();  
                    }
                } finally {
                    if (failed)
                        cancelAcquire(node);
                }
            }
        
        public final void acquireInterruptibly(int arg)
                throws InterruptedException {
            /**
             * 如果当前线程已经被中断了 直接抛出InterruptedException
             * 注意:Thread.interrupted()会在复位当前线程的中断状态 也就是变为false
             */
            if (Thread.interrupted())       // 1
                throw new InterruptedException();
            // 尝试获取锁 如果获取不到则加入到阻塞队列中
            if (!tryAcquire(arg))  
                doAcquireInterruptibly(arg);
        }
    

    这里与acquire(int arg)有两点区别(分别在代码中的1和2处的代码):
    1. 如果当前线程已经被中断了, 会抛出InterruptedException,并且中断状态会被复位成false,因为使用的是Thread.interrupted().
    2. 在确定是被中断的时候,会抛出InterruptedException,这里需要注意两点.

    注意:
    1. parkAndCheckInterrupt()中使用的是Thread.interrupted()方法,因此该方法会把中断状态复位成false,因此整个acquireInterruptibly(int arg)方法如果抛出InterruptedException异常的话中断状态也会被复位成false.
    2. 此时抛出异常, failed依然为true, 会执行cancelAcquire(node)方法取消当前线程所对应的节点,也就是从等待队列中去除. 然后从doAcquireInterruptibly(int arg)方法中退出.

    从如下的流程图中可以更清楚的看看基本逻辑.


    juc_5(3).png

    接下来看个简单的例子测试一下

    例子1 : 测试中断式获取锁

    生成两个线程分别为thread-1thread-2, 让thread-1获得锁,并让thread-2加入该锁的等待队列中, 在thread-1还没有释放锁前也就是thread-2没有获得锁前中断thread-2看看会发生什么.

    import java.util.concurrent.TimeUnit;
    import com.sourcecode.locks.Test.Runner;
    
    public class TestLockInterruptedException {
        public static void main(String[] args) {
            Mutex m = new Mutex();
            Thread thread_1 = new Thread(new Runner(m), "thread-1");
            Thread thread_2 = new Thread(new Runner(m), "thread-2");
            thread_1.start();
            try {
                TimeUnit.SECONDS.sleep(1); //让thread-1获得锁
                thread_2.start();
                TimeUnit.SECONDS.sleep(1); //让thread-2充分进入到等待队列中
                m.printWaitingNode();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            thread_2.interrupt();
        }
        
        static class Runner implements Runnable {
            Mutex m;
            public Runner(Mutex m) {
                this.m = m;
            }
            @Override
            public void run() {
                boolean getLock = true;
                try {
                    m.lockInterruptibly();
                } catch (Exception e) {
                    e.printStackTrace();
                    //Thread.currentThread().interrupt(); //报告一下中断状态  因为抛出异常前中断状态被清空了
                    getLock = false;
                }
                System.out.println(Thread.currentThread().getName() + " runs, getLock: " + getLock);
                try {
                    TimeUnit.SECONDS.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                if(getLock) m.unlock();
            }
        }
    }
    

    测试结果如下: thread-2会进入到catch语句块中并且它的中断状态已经被复位了.

    thread-1 runs, getLock: true
    [NULL,-1]->[thread-2,0]->
    java.lang.InterruptedException
    thread-2 intrrupted status:false
    thread-2 runs, getLock: false
        at com.sourcecode.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:357)
        at com.sourcecode.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:375)
        at com.sourcecode.locks.Mutex.lockInterruptibly(Mutex.java:41)
        at com.sourcecode.locks.TestLockInterruptedException$Runner.run(TestLockInterruptedException.java:32)
        at java.lang.Thread.run(Thread.java:745)
    

    但是如果把catch语句块中的注释打开会发生什么呢?

    thread-1 runs, getLock: true
    [NULL,-1]->[thread-2,0]->
    java.lang.InterruptedException
    thread-2 intrrupted status:false
    thread-2 runs, getLock: false
        at com.sourcecode.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:357)
        at com.sourcecode.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:375)
        at com.sourcecode.locks.Mutex.lockInterruptibly(Mutex.java:41)
        at com.sourcecode.locks.TestLockInterruptedException$Runner.run(TestLockInterruptedException.java:32)
        at java.lang.Thread.run(Thread.java:745)
    java.lang.InterruptedException: sleep interrupted
        at java.lang.Thread.sleep(Native Method)
        at java.lang.Thread.sleep(Thread.java:340)
        at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
        at com.sourcecode.locks.TestLockInterruptedException$Runner.run(TestLockInterruptedException.java:41)
        at java.lang.Thread.run(Thread.java:745)
    

    可以从结果中看到TimeUnit.SECONDS.sleep(10);也抛出了异常,原因不难找到, 从下面sleep的源码中可以看到如果当前线程的中断状态是true的时候, 该方法会认为该线程被中断了,异常会抛出异常并且复位它的中断异常状态. 关于异常可以看我的另外一篇博客 [并发J.U.C] 用例子理解线程中断

     * @throws  InterruptedException
         *          if any thread has interrupted the current thread. The
         *          <i>interrupted status</i> of the current thread is
         *          cleared when this exception is thrown.
         */
        public static native void sleep(long millis) throws InterruptedException;
    

    接下来看看tryLock方法.

    tryLock方法 尝试性的去获取锁

    那什么叫尝试性的去获取锁?在接口Lock中有定义

    // 获取锁 如果锁是available立即返回true, 如果锁不存在就立即返回false
        boolean tryLock();
    

    接下来看看是如何实现的, 先在Mutex类中修改

        @Override
        public boolean tryLock() {
            // TODO Auto-generated method stub
            return sync.tryAcquire(1);
        }
    

    可以看到很简单,直接调用了sync自己实现的tryAcquire, 如果锁是可以得到的,则立即返回true表明已经获得了锁, 否则立马返回, 不会进入到锁的等待队列中.

    简单看一个tryLock的小例子

    例子2: tryLock

    import java.util.concurrent.TimeUnit;
    public class TestTryLock {
        public static void main(String[] args) {
            Mutex m = new Mutex();
            for (int i = 0; i < 5; i++) {
                new Thread(new Runner(m), "thread-" + i).start();;
            }
            try {
                TimeUnit.SECONDS.sleep(3); // 为了让每个thread充分运行
                m.printWaitingNode();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        
        static class Runner implements Runnable {
            Mutex m;
            public Runner(Mutex m) {
                this.m = m;
            }
            @Override
            public void run() {
                if (m.tryLock()) {
                    System.out.println(Thread.currentThread().getName() + " get lock and runs");
                    try {
                        TimeUnit.SECONDS.sleep(10);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                        Thread.currentThread().interrupt();
                    }
                    m.unlock();
                } else {
                    System.out.println(Thread.currentThread().getName() + " does not get lock and runs");
                }
            }
        }
    }
    

    输出如下: 都没有进入到等待队列中.

    thread-1 get lock and runs
    thread-3 does not get lock and runs
    thread-0 does not get lock and runs
    thread-2 does not get lock and runs
    thread-4 does not get lock and runs
    

    接下来看看 tryLock的另外一种形式tryLock(long time, TimeUnit unit) throws InterruptedException

    等待式并且响应中断式的tryLock->tryLock(long time, TimeUnit unit) throws InterruptedException

    先直接看源码吧, 在Mutex类中加入如下代码:

        @Override
        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
            // TODO Auto-generated method stub
            return sync.tryAcquireNanos(1, unit.toNanos(time));
        }
    

    AQS 中加入

        private boolean doAcquireNanos(int arg, long nanosTimeout)
                throws InterruptedException {
            if (nanosTimeout <= 0L)
                return false;
            final long deadline = System.nanoTime() + nanosTimeout;
            final Node node = addWaiter(Node.EXCLUSIVE);
            boolean failed = true;
            try {
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return true;
                    }
                    nanosTimeout = deadline - System.nanoTime();
                    if (nanosTimeout <= 0L)
                        return false;
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        nanosTimeout > spinForTimeoutThreshold)
                        LockSupport.parkNanos(this, nanosTimeout);
                    if (Thread.interrupted())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
        
        public final boolean tryAcquireNanos(int arg, long nanosTimeout)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            return tryAcquire(arg) ||
                doAcquireNanos(arg, nanosTimeout);
        }
    

    这次直接对比响应中断的获取锁的doAcquireInterruptibly方法的主要区别是下面这段代码:

    nanosTimeout = deadline - System.nanoTime();                    // 1
    if (nanosTimeout <= 0L)                                         // 2
            return false;
    if (shouldParkAfterFailedAcquire(p, node) &&
            nanosTimeout > spinForTimeoutThreshold)                 // 3
            LockSupport.parkNanos(this, nanosTimeout);  
    if (Thread.interrupted())                                       // 4
           throw new InterruptedException();
    

    1. 计算当前剩下多长时间
    2. 判断是否有超过所传入的等待时间
    3. 判断是否需要进行休眠
    4. 如果该线程被中断, 抛出异常

    其实与doAcquireInterruptibly方法类似, 只是加了个超时返回的操作.

    例子3: tryLock(long time, TimeUnit unit) throws InterruptedException

    启动5个线程去超时获得锁.

    import java.util.concurrent.TimeUnit;
    
    public class TestTryLockTime {
        public static void main(String[] args) {
            Mutex m = new Mutex();
            for (int i = 0; i < 5; i++) {
                new Thread(new Runner(m), "thread-" + i).start();;
            }
            try {
                TimeUnit.SECONDS.sleep(3); // 为了让每个thread充分运行
                m.printWaitingNode();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        
        static class Runner implements Runnable {
            Mutex m;
            public Runner(Mutex m) {
                this.m = m;
            }
            @Override
            public void run() {
                boolean getLock = false;
                try {
                    getLock = m.tryLock(10, TimeUnit.SECONDS);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                if (getLock && !Thread.currentThread().isInterrupted()) {
                    System.out.println(Thread.currentThread().getName() + " get lock and runs");
                    try {
                        TimeUnit.SECONDS.sleep(5);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                        Thread.currentThread().interrupt();
                    }
                    m.unlock();
                } else {
                    System.out.println(Thread.currentThread().getName() + " does not get lock and runs");
                }
            }
        }
    }
    

    输出如下: thread-1thread-3获得了锁,而其他线程由于超时等待返回了

    thread-1 get lock and runs
    [NULL,-1]->[thread-3,-1]->[thread-0,-1]->[thread-2,-1]->[thread-4,0]->
    thread-3 get lock and runs
    thread-4 does not get lock and runs
    thread-0 does not get lock and runs
    thread-2 does not get lock and runs
    

    关于异常部分与例子1类似,便不再写例子了.

    参考

    1. Java并发编程的艺术
    2. Java1.8 java.util.concurrent.locks包的源代码

    相关文章

      网友评论

        本文标题:[Java源码][并发J.U.C]---用代码一步步实现AQS(

        本文链接:https://www.haomeiwen.com/subject/ribzvftx.html