美文网首页
java多线程与高并发(五)LockSupport

java多线程与高并发(五)LockSupport

作者: 小偷阿辉 | 来源:发表于2021-04-26 11:07 被阅读0次

    回顾

    首先我们回顾一下前面四节所讲的东西

    1.线程的基本概念
    2.synchronized,底层实现原理,锁升级(无锁-偏向锁-轻量级锁-重量级锁)
    3.volatile,线程隔离可见性,禁止指令重排序
    4.AtomicXXX
    5.各种UC同步框架(ReentrantLock,CountDownLatch,CyclicBarrier,Phaser,ReadWriteLock,Semaphore,Exchange)
    6.synchronized 和 各种线程同步框架的ReenrantLock有何不同?
    6.1.synchronizec:系统自带,系统自动加锁,自动解锁,不可以出现多个不同的等待队列,默认进行四种锁的升级
    6.2.ReentrantLock:需要自动加锁,手动解锁,可以出现多个不同的等待队列,CAS的实现
    本章我们补一个LockSupport,然后分析两个面试题,紧接着我会教大家阅读源码的技巧,最后拿AQS作源码分析

    2.LockSupport

    我们下面以几个小程序案例,对LockSupport进行详细解释,在以前我们要阻塞和唤醒某一个具体的线程有很多的限制,比如

    因为wait方法需要释放锁,所以必须在synchronized中使用,否则会抛出异常lllegalMonitorStateException
    nofity方法必须在synchronized中使用,并且应该指定对象
    synchronized,nofity,wait的对象必须一致,一个synchronized代码块中只能有一个线程用wait和nofity
    以上诸多的使用不便,带来了lockSupport的好处
    先来看第一个小程序

    package com.learn.thread.four;
    
    import com.sun.tools.internal.ws.wsdl.document.soap.SOAPUse;
    
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.LockSupport;
    
    public class TestLockSupport {
        public static void main(String[] args) {
            Thread thread = new Thread(() -> {
                for (int i = 0; i < 10; i++) {
                    System.out.println(i);
                    // 使用LockSupport进行线程堵塞
                    if (i == 5) {
                        LockSupport.park();
                    }
                }
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    System.out.println(e.getMessage());
                }
    
            });
            thread.start();
        }
    }
    

    上述代码看起来是比较灵活的,可以手动控制是否上锁,通过LockSupprt.park加锁,有了加锁机制,LockSupport也提供了令牌解锁机制unpark

    package com.learn.thread.four;
    
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.LockSupport;
    
    public class TestLockSupport2 {
        public static void main(String[] args) {
            Thread thread = new Thread(() -> {
                for (int i = 0; i < 10; i++) {
                    System.out.println(i);
                    // 使用LockSupport进行线程堵塞
                    if (i == 5) {
                        LockSupport.park();
                    }
                }
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    System.out.println(e.getMessage());
                }
    
            });
            thread.start();
            // 唤醒线程t,注意这里是比park先执行的,当前与释放了一个令牌,当park拿到这个令牌后不阻塞直接放行
            LockSupport.unpark(thread);
        }
    }
    
    

    注意,这里是一个park对应着一个unpark

    package com.learn.thread.four;
    
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.LockSupport;
    
    public class TestLockSupport3 {
        public static void main(String[] args) {
            Thread thread = new Thread(() -> {
                for (int i = 0; i < 10; i++) {
                    System.out.println(i);
                    // 使用LockSupport进行线程堵塞
                    if (i == 5) {
                        LockSupport.park();
                    }
    
                    // 使用LockSupport进行线程堵塞(前面只会释放一次锁,这里会再次阻塞)
                    if (i == 8) {
                        LockSupport.park();
                    }
                }
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    System.out.println(e.getMessage());
                }
    
            });
            thread.start();
            LockSupport.unpark(thread);
        }
    
    }
    

    2.1.由以上三个小程序,我们可以总结出以下几点

    LockSupport不需要synchronized就可以实现线程的阻塞和唤醒
    LockSupport.unpark可以线程LockSupport.park,并且线程不会阻塞
    如果一个线程处于等待状态,连续调用了两次park方法,就会使该线程永远无法唤醒
    LockSupport中的park 和unpark的实现原理

    其实park()和unpark()的实现也是由Unsefa类提供的,而Unsefa类是由C和c++语言完成的,其实原理也是可以理解,它主要是通过一个变量作为标志,变量在0-1之间来回切换,当这个变量大于0的时候线程就获得了“令牌”(unpark的操作),相反park的操作就是将这个变量变成0,用于识别令牌。

    2.2.淘宝面试题1

    实现一个容器,提供两个方法add和size 写两个线程
    线程1:添加十个元素到容器中
    线程2:实时监控元素个数,当个数到5个时,线程2给出提示并结束

    小程序1
    我们先来分析一下小程序1的执行流程:
    通过内部的List来new 一个ArraryList,在自定义的add方法直接调用list的add方法,在自定义的size方法调用list的size方法。然后小程序初始化这个List,启动线程t1来做一个循环,每次循环都添加一个对象,加一个打印,然后间隔一秒,在t2线程写一个while循环,实时监控集合中对象的变换,如果数量达到5就结束t2的线程。

    package com.learn.thread.four.list;
    
    import java.util.ArrayList;
    import java.util.List;
    
    public class TestListAdd1 {
        List<Object> list = new ArrayList<Object>(16);
    
        public void add(Object object) {
            this.list.add(object);
        }
        public int size() {
            return this.list.size();
        }
    
        public static void main(String[] args) {
            TestListAdd1 testListAdd1 = new TestListAdd1();
            new Thread(() -> {
                for (int i = 0; i < 10; i++) {
                    testListAdd1.add(new Object());
                    System.out.println(i);
                }
                try {
                    Thread.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, "t1").start();
            // t2 线程监听集合的数量,发现获取不到数量为5的时候,原因是线程之间不可见的原因,
            // 因为t1 添加完对象之后,肯定会更新size 方法,但是在更新size的过程中,t2线程已经开始读了size方法,造成了数据不一致
            new Thread(() -> {
                while (true) {
                    if (testListAdd1.size() == 5 ) {
                        break;
                    }
                }
                System.out.println("t2 结束");
            },"t2").start();
    
        }
    }
    

    结论:
    方法并没有按预期的执行,这是因为,ArraryList的Size方法肯定要更新的,但是还没有更新完,线程2就读了,所以这时候永远不会检测到List的长度为5了,原因就是线程之间的不可见因素

    小程序2

    package com.learn.thread.four.list;
    
    import java.util.ArrayList;
    import java.util.List;
    
    public class TestListAdd2 {
        // 这里用volatile修饰引用类型,发现并没有做到线程的之间的可见性,因为引用对象指向的是一个地址,
        // 如果这个对象的内部值被改变了,是无法被观察到的
        volatile List<Object> list = new ArrayList<Object>(16);
    
        public void add(Object object) {
            this.list.add(object);
        }
        public int size() {
            return this.list.size();
        }
    
        public static void main(String[] args) {
            TestListAdd2 testListAdd2 = new TestListAdd2();
            new Thread(() -> {
                for (int i = 0; i < 10; i++) {
                    testListAdd2.add(new Object());
                    System.out.println(i);
                }
                try {
                    Thread.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, "t1").start();
            // t2 线程监听集合的数量,发现获取不到数量为5的时候,原因是线程之间不可见的原因,
            new Thread(() -> {
                while (true) {
                    if (testListAdd2.size() == 5 ) {
                        break;
                    }
                }
                System.out.println("t2 结束");
            },"t2").start();
    
        }
    }
    

    结论:
    小程序2在小程序1的基础上加上了线程可见volitate修饰,但是还是无法满足需求,这是因为volitate要去修饰普通的值,不要去修饰引用值,因为修饰引用类型,这个引用对象指向的是另外一个new出来的对象,如果这个对象里边的成员对象改变了,是无法被观察到的。
    下面自行写了个测试修饰引用类型的demo

    package com.learn.thread.four.list;
    
    import com.learn.thread.first.T;
    
    public class Test {
        private volatile static int a = 0;
        // private volatile static Integer a = 0;
    
        public void test() {
            a++;
            System.out.println(a);
        }
    
        public static void main(String[] args) {
            Thread[] threads = new Thread[100];
            Test test = new Test();
            for (int i = threads.length - 1; i >= 0; i--) {
                threads[i] = new Thread(() -> test.test());
                threads[i].start();
            }
    
        }
    }
    

    小程序3

    package com.learn.thread.four.list;
    
    import java.util.ArrayList;
    import java.util.List;
    
    /**
     * 用对象锁
     * wait,和nofity实现
     */
    public class TestListAdd3 {
        private List<Object> list = new ArrayList<Object>(16);
    
        public void add(Object object) {
            this.list.add(object);
        }
        public int size() {
            return this.list.size();
        }
    
        public static void main(String[] args) {
            TestListAdd3 testListAdd3 = new TestListAdd3();
            final Object lock = new Object();
            // t2 要早于t1启动
            new Thread(() -> {
                synchronized (lock) {
                    System.out.println("t2 启动");
                    if (testListAdd3.size() != 5 ) {
                        try {
                            lock.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    System.out.println("t2 结束");
                }
            },"t2").start();
            new Thread(() -> {
                synchronized (lock) {
                    System.out.println("t1 启动");
                    for (int i = 0; i < 10; i++) {
                        testListAdd3.add(new Object());
                        System.out.println(i);
                        if (i == 5) {
                            // 此处有坑
                            // 特别注意,nofity并不释放当前锁,t1 会继续往下走,等t1 执行完了, t2 才会拿到这把对象锁
                            lock.notify();
                        }
                    }
                    try {
                        Thread.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, "t1").start();
        }
    }
    

    结论:
    小程序3 也是行不通,原因是nofity方法不释放锁,当t1线程调用nofity方法,并没有释放当前锁,所以t1还是会继续运行,等待t1执行完毕,t2才会继续执行,这个时候当前List不只有5个了。

    小程序4

    package com.learn.thread.four.list;
    
    import java.util.ArrayList;
    import java.util.List;
    
    public class TestListAdd4 {
        private List<Object> list = new ArrayList<Object>(16);
    
        public void add(Object object) {
            this.list.add(object);
        }
        public int size() {
            return this.list.size();
        }
    
        public static void main(String[] args) {
            TestListAdd4 testListAdd4 = new TestListAdd4();
            final Object lock = new Object();
            // t2 要早于t1启动
            new Thread(() -> {
                synchronized (lock) {
                    System.out.println("t2 启动");
                    if (testListAdd4.size() != 5 ) {
                        try {
                            lock.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    System.out.println("t2 结束");
                    // 释放当前锁,唤醒t1
                    lock.notify();
                }
            },"t2").start();
            new Thread(() -> {
                synchronized (lock) {
                    System.out.println("t1 启动");
                    for (int i = 0; i < 10; i++) {
                        testListAdd4.add(new Object());
                        System.out.println(i);
                        if (i == 5) {
                            // 此处有坑
                            // 特别注意,nofity并不释放当前锁,t1 会继续往下走,等t1 执行完了, t2 才会拿到这把对象锁
                            // 所以下面加多一个操作,让此线程等待,阻塞此线程,让t2 去执行
                            lock.notify();
                            try {
                                lock.wait();
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
    
                    }
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, "t1").start();
        }
    }
    

    结论
    小程序4基本上满足的需求,在小程序3的基础上了,nofity之后让本线程等待,让给t2执行,然后t2执行完毕,释放nofity,回到t1执行。

    小程序5

    package com.learn.thread.four.list;
    
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    
    /**
     * 这里使用CountDown实现
     */
    public class TestListAdd5 {
        private List<Object> list = new ArrayList<Object>(16);
    
        public void add(Object object) {
            this.list.add(object);
        }
        public int size() {
            return this.list.size();
        }
    
        public static void main(String[] args) {
            TestListAdd5 testListAdd5 = new TestListAdd5();
            CountDownLatch countDownLatch = new CountDownLatch(1);
            new Thread(() -> {
                System.out.println("t2 启动");
                if (testListAdd5.size() != 5) {
                    try {
                        countDownLatch.await();
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }
                System.out.println("t2 结束");
            }, "t2").start();
            new Thread(() -> {
                System.out.println("t1 启动");
                for (int i =0; i < 10; i++) {
                    testListAdd5.add(new Object());
                    System.out.println(i);
                    if (testListAdd5.size() == 5) {
                        // 暂停t1 线程, 打开门阀
                        countDownLatch.countDown();
                    }
                    // 如果这段代码去除了,就不会给t2时间去执行,所以这里需要等待一会
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, "t1 ").start();
        }
    }
    
    

    结论:
    从执行结果来看,并没有多大的问题,但是如果我们把休眠一秒的代码去掉,会发现,执行结果不正确,这事因为t1线程对象增加到5个时,t2的线程门阀确实被打开了,但是t1线程马上又会接着执行,t1之前是休眠1秒,给t2线程执行时间,如果注释掉这段代码,t2就没有机会去实时监控了。

    小程序6

    package com.learn.thread.four.list;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    
    public class TestListAdd6 {
        private List<Object> list = new ArrayList<Object>(16);
    
        public void add(Object object) {
            this.list.add(object);
        }
        public int size() {
            return this.list.size();
        }
    
        public static void main(String[] args) throws InterruptedException {
            TestListAdd6 testListAdd6 = new TestListAdd6();
            CountDownLatch countDownLatch = new CountDownLatch(1);
            new Thread(() -> {
                System.out.println("t2 启动");
                if (testListAdd6.size() != 5) {
                    try {
                        countDownLatch.await();
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }
                System.out.println("t2 结束");
            }, "t2").start();
    
            new Thread(() -> {
                System.out.println("t1 启动");
                for (int i =0; i < 10; i++) {
                    testListAdd6.add(new Object());
                    System.out.println(i);
                    if (testListAdd6.size() == 5) {
                        //  打开门阀
                        countDownLatch.countDown();
                        // 等待
                        try {
                            countDownLatch.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        try {
                            Thread.sleep(1000L);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
    
                }
            }, "t1").start();
        }
    }
    

    结论
    这段代码很好理解,就是在t1线程打开门阀的时候,给自己再加一个门阀,但是这个线程睡眠代码,还是不能去除。

    小程序7

    package com.learn.thread.four.list;
    
    import com.learn.thread.first.T;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.locks.LockSupport;
    
    public class TestListAdd7 {
        private List<Object> list = new ArrayList<Object>(16);
        public void add(Object object) {
            this.list.add(object);
        }
        public int size() {
            return this.list.size();
        }
        static Thread t2 = null;
        static Thread t1 = null;
        public static void main(String[] args) {
            TestListAdd7 testListAdd7 = new TestListAdd7();
            t1 = new Thread(() -> {
                System.out.println("t1 启动了");
                for (int i =0; i < 10; i++) {
                    testListAdd7.add(new Object());
                    System.out.println(i);
                    if (testListAdd7.size() == 5) {
                        LockSupport.unpark(t2);
                        LockSupport.park();
                    }
    
                }
            },"t1");
            t2 = new Thread(() -> {
                System.out.println("t2 启动了");
                if (testListAdd7.size() != 5) {
                    LockSupport.park();
                }
                System.out.println("t2 结束");
                LockSupport.unpark(t1);
            },"t2");
            t2.start();
            t1.start();
    
        }
    }
    

    结论:
    这次跟之前也是大同小异,只不过锁的方式不一样罢了

    小程序8

    package com.learn.thread.four.list;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.Semaphore;
    
    public class TestListAdd9 {
        private List<Object> list = new ArrayList<Object>(16);
        public void add(Object object) {
            this.list.add(object);
        }
        public int size() {
            return this.list.size();
        }
        static Thread t1,t2 = null;
        public static void main(String[] args) {
            TestListAdd8 testListAdd8 = new TestListAdd8();
            Semaphore semaphore = new Semaphore(1);
            t1 = new Thread(() -> {
                // 保证只有一个线程执行
                try {
                    semaphore.acquire();
                    for (int i = 0; i < 5; i++) {
                        testListAdd8.add(new Object());
                        System.out.println(i);
                    }
                    // 释放当前线程,让别的线程可以加入
                    semaphore.release();
                    t2.start();
                    t2.join();
                    // 此线程继续执行
                    semaphore.acquire();
                    for (int i = 5; i < 10; i++) {
                        testListAdd8.add(new Object());
                        System.out.println(i);
                    }
                    semaphore.release();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            t2 = new Thread(() -> {
                try {
                    semaphore.acquire();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("t2 结束");
                semaphore.release();
            });
            t1.start();
        }
    }
    

    结论
    本次是一次牵强的方法,用Semaphre限制每次只有一个线程执行,当t1增加到4的时候,释放当前线程,并且将cpu交给t2执行,这时候t2也是保证只有一个线程执行,执行完后立马释放,t1调用join完之后可以继续获得Semaphre的锁,然后继续增加对象,最后释放线程。

    针对以上8个小程序,我们分别用了volitate,wait,nofity,Semphore,CountDownLatch,LockSupport,其中wait和nofity要牢牢掌握

    2.3.淘宝面试题2

    写一个固定容器,拥有put和get方法,以及getCount方法,能够支持2个生产者线程以及10个消费者线程阻塞调用

    小程序1

    package com.learn.thread.four.prodducer;
    
    import java.util.LinkedList;
    
    public class TestProducer<T> {
        private final LinkedList<T> list = new LinkedList<T>();
    
        public final int MAX = 10;
    
        private int count = 0;
        
        public synchronized void put(T t) {
            // 想想这里什么用while
            // 这是因为当linkedList集合中的个数达到最大值得时候,if判断了集合的大小等于MAX
            // 调用了wait方法,它不会再去判断一次,而是继续往下走,假如wait以后,有别的生产者线程添加数据,那么,这里就没有
            // 再次判断,又添加了一次,造成了数据错误
            while (list.size() == MAX) {
                try {
                    this.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            list.add(t);
            ++ count;
            System.out.println("生产者生产 " + count);
            // 唤醒所有在队列中等待的线程
            this.notifyAll();
        }
    
        public synchronized T get() {
            T t = null;
            while (list.size() == 0) {
                try {
                    this.wait();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            System.out.println(Thread.currentThread().getName() + "消费");
            t = list.removeFirst();
            count --;
            System.out.println(count);
            this.notifyAll();
            return t;
        }
    
        public static void main(String[] args) {
            TestProducer<String> testProducer = new TestProducer<>();
            for (int i = 0; i < 10; i++) {
                new Thread(() -> {
                    for (int j = 0; j < 5; j++) {
                        System.out.println(testProducer.get());
                    }
                }, "c" + i).start();
    
            }
            for (int i = 0; i < 2; i++) {
                new Thread(() -> {
                    for (int j = 0; j < 25; j++) {
                        testProducer.put(Thread.currentThread().getName() + "" + j);
                    }
                }, "p" + i).start();
    
            }
        }
    }
    

    这里有一个很遗憾的问题,那就是nofityAlll,如果生产者生产完,是有可能唤醒一个还是生产者的线程,所以这里可以做一个优化

    小程序2

    package com.learn.thread.four.prodducer;
    
    import java.util.LinkedList;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class TestProducer2<T> {
        private final LinkedList<T> list = new LinkedList<T>();
    
        public final int MAX = 10;
    
        private int count = 0;
    
        private ReentrantLock lock = new ReentrantLock();
    
        Condition producer = lock.newCondition();
    
        Condition comsumer = lock.newCondition();
    
        public void put(T t) {
            // 想想这里什么用while
            // 这是因为当linkedList集合中的个数达到最大值得时候,if判断了集合的大小等于MAX
            // 调用了wait方法,它不会再去判断一次,而是继续往下走,假如wait以后,有别的生产者线程添加数据,那么,这里就没有
            // 再次判断,又添加了一次,造成了数据错误
            try {
                lock.lock();
                while (list.size() == MAX) {
                    try {
                        producer.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                list.add(t);
                ++ count;
                System.out.println("生产者生产 " + count);
                // 唤醒所有消费者等待的线程
                comsumer.signalAll();
            }catch (Exception ex) {
                ex.printStackTrace();
            } finally {
                lock.unlock();
            }
    
        }
    
        public T get() {
            T t = null;
            try {
                lock.lock();
                while (list.size() == 0) {
                    try {
                        comsumer.await();
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }
                System.out.println(Thread.currentThread().getName() + "消费");
                t = list.removeFirst();
                count --;
                System.out.println(count);
                producer.signalAll();
            }catch (Exception ex) {
                ex.printStackTrace();
            } finally {
                lock.unlock();
            }
    
            return t;
        }
    
        public static void main(String[] args) {
            TestProducer2<String> testProducer = new TestProducer2<>();
            for (int i = 0; i < 10; i++) {
                new Thread(() -> {
                    for (int j = 0; j < 5; j++) {
                        System.out.println(testProducer.get());
                    }
                }, "c" + i).start();
    
            }
            for (int i = 0; i < 2; i++) {
                new Thread(() -> {
                    for (int j = 0; j < 25; j++) {
                        testProducer.put(Thread.currentThread().getName() + "" + j);
                    }
                }, "p" + i).start();
    
            }
        }
    }
    

    这里ReentrantLock的优势就体现出来了,它可以有多种情况Condition,在put方法了达到了峰值就是生产者producer.await,反之就是comsumer.await
    ReentrantLock的本质就是synchronzied里调用wait和nofity的时候,它只有一个等待队列,但是用了Condition就是多个等待队列。当我们使用producer.await的时候,就是进入了producer的等待队列,producer.signalAll指的是唤醒producer这个等待队列的线程。

    相关文章

      网友评论

          本文标题:java多线程与高并发(五)LockSupport

          本文链接:https://www.haomeiwen.com/subject/udclrltx.html