美文网首页我爱编程
[Java源码][并发J.U.C]---LockSupport

[Java源码][并发J.U.C]---LockSupport

作者: nicktming | 来源:发表于2018-08-07 00:20 被阅读0次

    前言

    本篇文章主要分析LockSupport,在系列文章中的AQS就有用到. 本文会通过源码分析中看看LockSupport如何使用.

    本文代码: 代码下载

    LockSupport源码

    LockSupportjava.util.concurrent.locks包中,总共的代码就一两百行左右,如下:

    import java.lang.reflect.Field;
    import sun.misc.Unsafe;
    
    public class LockSupport {
        
        private LockSupport() {} 
    
        private static void setBlocker(Thread t, Object arg) {
            UNSAFE.putObject(t, parkBlockerOffset, arg);
        }
        
        public static void unpark(Thread thread) {
            if (thread != null)
                UNSAFE.unpark(thread);
        }
        
        public static void park(Object blocker) {
            Thread t = Thread.currentThread();
            setBlocker(t, blocker);
            UNSAFE.park(false, 0L);
            setBlocker(t, null);
        }
        
        public static void parkNanos(Object blocker, long nanos) {
            if (nanos > 0) {
                Thread t = Thread.currentThread();
                setBlocker(t, blocker);
                UNSAFE.park(false, nanos);
                setBlocker(t, null);
            }
        }
        
        public static void parkUntil(Object blocker, long deadline) {
            Thread t = Thread.currentThread();
            setBlocker(t, blocker);
            UNSAFE.park(true, deadline);
            setBlocker(t, null);
        }
        
        public static Object getBlocker(Thread t) {
            if (t == null)
                throw new NullPointerException();
            return UNSAFE.getObjectVolatile(t, parkBlockerOffset);
        }
        
        public static void park() {
            UNSAFE.park(false, 0L);
        }
        
        public static void parkNanos(long nanos) {
            if (nanos > 0)
                UNSAFE.park(false, nanos);
        }
        
        public static void parkUntil(long deadline) {
            UNSAFE.park(true, deadline);
        }
        
        static final int nextSecondarySeed() {
            int r;
            Thread t = Thread.currentThread();
            if ((r = UNSAFE.getInt(t, SECONDARY)) != 0) {
                r ^= r << 13;   // xorshift
                r ^= r >>> 17;
                r ^= r << 5;
            }
            else if ((r = java.util.concurrent.ThreadLocalRandom.current().nextInt()) == 0)
                r = 1; // avoid zero
            UNSAFE.putInt(t, SECONDARY, r);
            return r;
        }
    
        // Hotspot implementation via intrinsics API
        private static final sun.misc.Unsafe UNSAFE;
        private static final long parkBlockerOffset;
        private static final long SEED;
        private static final long PROBE;
        private static final long SECONDARY;
        static {
            try {
        //UNSAFE = sun.misc.Unsafe.getUnsafe(); 源码
                    Field f = Unsafe.class.getDeclaredField("theUnsafe");
                    f.setAccessible(true);
                    UNSAFE = (Unsafe)f.get(null);
                    
                Class<?> tk = Thread.class;
                parkBlockerOffset = UNSAFE.objectFieldOffset
                    (tk.getDeclaredField("parkBlocker"));
                SEED = UNSAFE.objectFieldOffset
                    (tk.getDeclaredField("threadLocalRandomSeed"));
                PROBE = UNSAFE.objectFieldOffset
                    (tk.getDeclaredField("threadLocalRandomProbe"));
                SECONDARY = UNSAFE.objectFieldOffset
                    (tk.getDeclaredField("threadLocalRandomSecondarySeed"));
            } catch (Exception ex) { throw new Error(ex); }
        }
    }
    

    代码中只有在获得UNSAFE的时候改成了反射实现,其余的都是维持原样.因为需要有特定的classloader才可以加载Unsafe类,因此在自己的代码中用反射来拿到实例对象.
    另外Unsafe类的作用会有专门博客分析,暂时可以认为该类可以利用底层的方法成功操作.
    注意: tkThread.class,操作的是Thread类的成员变量.

    从上面源码中可以看到:

    1. 只有一个构造方法并且还是私有的,所以没办法new出一个对象.
    2. 方法几乎都是静态的,所以通过类就可以调用.
    3. 所有的park*方法最终都会调用UNSAFE.park(boolean, long)方法,unpark最终还是会调用UNSAFE.unpark(Thread)方法.
    4. 有一个setBlockergetBlocker方法.

    所以从该源码中也具体看不出逻辑和实现,所以看看Unsafeparkunpark方法.

    sun.misc.Unsafe中的parkunpark

    sun.misc.Unsafe中的源码

    public native void unpark(Object paramObject);
    public native void park(boolean paramBoolean, long paramLong);
    

    这个是本地方法,实现如下:

    源码在park.hpp(这个是1.7的,没有找到1.8的,有地址,但是还是没有找到具体的类的位置)
    每个线程都有一个Parker实例,每个实例有一个_counter属性,初始化值为0.

    图片.png

    对应的parkunpark源码, 源码文件

    park方法

    图片.png

    isAbsolute表示是否是绝对时间,可以看到parkUntil(long deadline)调用的时候表示的是绝对时间

    1. 如果_counter变量大于0,便可以获得许可permit.
    2. 可选性的优化,在阻塞前检查一下中断状态.如果当前线程处于中断状态,则直接返回._counter没有变化,此时的_counter处于
    3. 检查一下时间是否到时,比如那种超时获取许可的方法等,另外如果time > 0,则调用unpackTime(&absTime, isAbsolute, time);方法

    图片.png

    4. 生成一个ThreadBlockInVM
    5. 这句话我还没有弄明白
    6. 再次检查_counter是不是大于0,如果是的话表示可以获得许可并且把_counter并且unlock _mutex.

    图片.png

    7. 该线程进行等待状态, 如果time等于0,将会调用pthread_cond_wait (_cond, _mutex),否则会进入safe_cond_timedwait (_cond, _mutex, &absTime)应该是TimedWait状态(我没有测试,应该是)
    8. 等到等待返回后, 则把_counter设置为0并且unlock mutex.

    unpark

    图片.png

    直接设置_counter为1,如果s也就是_counter以前的值是0的话需要调用pthread_cond_singal (_cond)唤醒被阻塞的线程.

    稍微解释一下_counter的值等于0的时候的情况:
    1. 该线程还没有调用过park方法.
    2. 该线程在unpark前调用park方法,此时该线程会阻塞,并且_counter值为0. 如果在unpark后调用park方法,此时该线程不会阻塞但是_counter值也为0.

    在明白了基本原理之后,我们抛出别的锁之类的东西只关注_counter画个简单的逻辑图帮助理解.

    logic.png

    接下来看看具体的例子理解一下.

    例子1

    park的意思表示想要停车,停车当然需要获得许可,但是默认是不能获得许可(因为_counter默认为0).

    public class TestLockSupport1 {
        public static void main(String[] args) {
            //test_1();
            //test_2();
            //test_3();
            //test_4();
        }
        
        public static void test_1() {
            //默认的时候当前线程的_counter = 0
            LockSupport.unpark(Thread.currentThread()); //_counter = 1
            LockSupport.park(); //_counter = 0
            System.out.println("i can execute."); // 可以执行
        }
        
        public static void test_2() {
            //默认的时候当前线程的_counter = 0
            LockSupport.park(); //阻塞
            System.out.println("i cannot execute."); //不能执行
        }
        
        public static void test_3() {
            LockSupport.parkNanos(10);
            System.out.println("i can execute after 10ns"); //此时的_counter等于0
            LockSupport.park();
            System.out.println("i cannnot execute");
        }
        
        public static void test_4() {
            long start = System.currentTimeMillis();
            System.out.println("start:" + start);
            LockSupport.parkUntil(start + 10000);
            System.out.println("end:" + System.currentTimeMillis());
            System.out.println("i can execute after 10s"); //此时的_counter等于0
            LockSupport.park();
            System.out.println("i cannnot execute");
        }
    }
    

    test_1: 按照上面的分析,当前线程的_counter默认是0,所以当调用unpark会把该线程的_counter设置为1,因此再调用park的时候表示可以获得许可,也就是不会阻塞,所以i can execute会被执行.
    test_2: 因为默认的时候_counter是0,所以当调用park的时候会阻塞当前线程,因此i cannot execute.这句话不会执行.
    test_3/test_4: 调用park的另外一种形式,在等待一定时间后如果还没有获得许可就直接返回了.

    例子2

    在调用park方法可能阻塞的时候,除了超时等待这种情况,还可以让别的线程中断该阻塞的线程从阻塞状态中恢复.

    public class TestLockSupport2 {
        public static void main(String[] args) throws InterruptedException {
            Thread thread = new Thread(new Runner1(), "mythread-1");
            thread.start();
            Thread.sleep(1);  //保证thread可以充分运行
            thread.interrupt();
            Thread.sleep(1);  //保证after park() interrupted status 可以运行
            System.out.println("main thread end!");
        }
        
        static class Runner1 implements Runnable {
            @Override
            public void run() {
                System.out.println("before park() interrupted status:" + Thread.currentThread().isInterrupted());
                LockSupport.park();
                System.out.println("after park() interrupted status:" + Thread.currentThread().isInterrupted());
            }
        }
    }
    

    输出如下:

    before park() interrupted status:false
    after park() interrupted status:true
    main thread end!
    

    从结果中可以看到,中断该线程的时候只是改变该线程的中断状态并从阻塞状态中恢复, 并且没有抛出InterruptedException异常.
    关于中断可以参考我的另外一个博客 [并发J.U.C] 用例子理解线程中断
    关于异常可以参考我的另外一个博客 理解异常的基本语法 (1)

    例子3

    关于blocker的一个简单使用

    public class TestLockSupport3 {
    
        public static void main(String[] args) throws InterruptedException {
            Thread thread = new Thread(new Runner1(), "mythread-1");
            thread.start();
            Thread.sleep(2);  //保证thread可以充分运行
            System.out.println("thread blocker:" + LockSupport.getBlocker(thread));
            thread.interrupt();
            Thread.sleep(1);  //保证after park() interrupted status 可以运行
            System.out.println("thread blocker:" + LockSupport.getBlocker(thread));
            System.out.println("main thread end!");
        }
        
        static class Runner1 implements Runnable {
            String str = "i am a blocker instance.";
            @Override
            public void run() {
                System.out.println("before park() interrupted status:" + Thread.currentThread().isInterrupted());
                LockSupport.park(str);
                System.out.println("after park() interrupted status:" + Thread.currentThread().isInterrupted());
            }
        }
    }
    

    输出

    before park() interrupted status:false
    thread blocker:i am a blocker instance.
    after park() interrupted status:true
    thread blocker:null
    main thread end!
    

    相关文章

      网友评论

        本文标题:[Java源码][并发J.U.C]---LockSupport

        本文链接:https://www.haomeiwen.com/subject/pamivftx.html