美文网首页
(二)线程间的共享和协作

(二)线程间的共享和协作

作者: 讲道理很没道理 | 来源:发表于2019-05-16 15:38 被阅读0次

    线程间的共享

    synchronized内置锁

    线程开始运行,拥有自己的栈空间,就如同一个脚本一样,按照既定的代码一步一步地执行,直到终止。但是,每个运行中的线程,如果仅仅是孤立地运行,那么没有一点儿价值,或者说价值很少,如果多个线程能够相互配合完成工作,包括数据之间的共享,协同处理事情。这将会带来巨大的价值。
    Java支持多个线程同时访问一个对象或者对象的成员变量,关键字synchronized可以修饰方法或者以同步块的形式来进行使用,它主要确保多个线程在同一个时刻,只能有一个线程处于方法或者同步块中,它保证了线程对变量访问的可见性和排他性,又称为内置锁机制。

    对象锁和类锁:

    对象锁是用于对象实例方法,或者一个对象实例上的,类锁是用于类的静态方法或者一个类的class对象上的。我们知道,类的对象实例可以有很多个,但是每个类只有一个class对象,所以不同对象实例的对象锁是互不干扰的,但是每个类只有一个类锁。
    但是有一点必须注意的是,其实类锁只是一个概念上的东西,并不是真实存在的,类锁其实锁的是每个类的对应的class对象。类锁和对象锁之间也是互不干扰的。
    对象锁和类锁,以及锁static变量之间的运行情况

    /**
     * @author sxylml
     * @Date : 2019/4/26 13:29
     * @Description:
     */
    public class SynTest {
    
        private long count = 0;
        /**
         * 作为一个锁
         */
        private Object object = new Object();
        private int anInt;
    
        public long getCount() {
            return count;
        }
    
        public void setCount(long count) {
            this.count = count;
        }
    
        public void incCount() {
            this.count++;
        }
    
        /**
         * synchronized 用在同步块上
         */
        public void incCount2() {
            synchronized (object) {
                this.count++;
            }
        }
    
        /**
         * 用在方法上
         */
        public synchronized void incCount3() {
            this.count++;
        }
    
        /**
         * 用在同步块上,但是锁的是当前类的对象实例
         */
        public void incCount4() {
            synchronized (this) {
                this.count++;
            }
        }
    
    
        private static class Count extends Thread {
    
            private SynTest simplOper;
    
            public Count(SynTest simplOper) {
                this.simplOper = simplOper;
            }
    
            @Override
            public void run() {
                for (int i = 0; i < 100000; i++) {
                    //1:没加锁结果 大部分应该不全等于预计的值
                    simplOper.incCount();
    //               加锁后结果是正确的
    //                simplOper.incCount2();
    //                simplOper.incCount3();
    //                simplOper.incCount4();
                }
            }
        }
    
    
        public static void main(String[] args) throws InterruptedException {
    
            int max = 100;
            for (int i = 0; i < max; i++) {
                SynTest simplOper = new SynTest();
    //            循环启动 max 个线程
                for (int j = 0; j < max; j++) {
                    new Count(simplOper).start();
                }
                Thread.sleep(100);
                System.out.println("第" + (i + 1) + "次测试:count = " + simplOper.count);
            }
    
    
        }
    
    }
    
    

    错误的加锁和原因分析

    /**
     * @author sxylml
     * @Date : 2019/5/10 15:05
     * @Description: 错误的加锁和原因分析
     */
    public class TestIntegerSyn {
    
        public static void main(String[] args) {
            Worker worker = new Worker(1);
            for (int i = 0; i < 5; i++) {
                new Thread(worker).start();
            }
        }
    
        private static class Worker implements Runnable {
    
            private Integer i;
            private Object o = new Object();
    
            public Worker(Integer i) {
                this.i = i;
            }
    
            @Override
            public void run() {
    //           错误的加锁: 锁的对象i 发生了变化,导致结果并不正确, 所以需要正确的加锁:锁的对象要不变。是同一个对象
                synchronized (i) {
    //                正确的加锁
    //            synchronized (o) {
    //            synchronized (this) {
                    Thread thread = Thread.currentThread();
                    System.out.println(thread.getName() + "--@" + System.identityHashCode(i));
    //                反编译后可以发现 i++不是原子性的 会创建新的对象 参考 valueOf
                    i++;
    //                正确应该:2,3,4,5,6                    System.identityHashCode 近似的理解为地址
                    System.out.println("线程名:" + thread.getName() + "-------i的值[" + i + "]  i的地址 -@" + System.identityHashCode(i));
                    System.out.println("线程名:" + thread.getName() + "-------i的值[" + i + "] this 的地址 -@" + System.identityHashCode(this));
                    System.out.println("线程名:" + thread.getName() + "-------i的值[" + i + "]  o的地址 -@" + System.identityHashCode(o));
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
    //                System.out.println(thread.getName() + "-------" + i + "--@" + System.identityHashCode(i));
                }
            }
        }
    }
    
    

    image.png

    通过反编译后可以看出i++ 实际操作是:

    image.png

    Integer.valueOf() 源码:

    image.png

    结论:本质上是返回了一个新的Integer对象。也就是实际加锁的是不同的Integer对象。

    volatile, 最轻量的同步机制

    volatile 保证了不同线程对这个变量进行操作时的可见性,即一个线程修改了变量的值,这新值对其它线程来说是立即可见的。
    如下面代码:不加volatile 关键字的时候,子线程无法感知主线程修改了 ready的值,从而不会退出循环,而加了volatile 子线程能感知到主线程修改了ready的值,从而退出循环。
    但是:volatile 不能保证数据在多个线程下同时写时的线程安全。
    volatile 只保证可见性,不保证原子性。

    import tools.SleepTools;
    /**
     * @author sxylml
     * @Date : 2019/5/11 13:43
     * @Description: 演示Volatile的提供的可见性
     */
    public class VolatileCase {
        //    private static boolean ready;
        private volatile static boolean ready;
        private static int number;
    
        private static class PrintThread extends Thread {
            @Override
            public void run() {
                System.out.println("PrintThread is running.......");
                //无限循环 通过主线程修改ready 的值  看这子线程能感知到变化跳出循环不
                while (!ready) {
                }
                System.out.println("number = " + number);
    
            }
        }
    
        public static void main(String[] args) {
            new PrintThread().start();
            SleepTools.second(1);
            number = 51;
            ready = true;
            SleepTools.second(5);
            System.out.println("main is ended!");
        }
    
    }
    
    

    运行结果:

    没有volatitle关键字
    image.png
    有volatitle关键字
    image.png

    演示volatile 的不安全性

    package com.ch1.vola;
    
    /**
     * @author sxylml
     * @Date : 2019/5/11 14:01
     * @Description: 演示volatile 的不安全性。没有原子性
     */
    public class NotSafe {
        private volatile long count = 0;
    
        public long getCount() {
            return count;
        }
    
        public void setCount(long count) {
            this.count = count;
        }
    
        //count进行累加
        public void incCount() {
            count++;
        }
    
        //线程
        private static class Count extends Thread {
    
            private NotSafe simplOper;
    
            public Count(NotSafe simplOper) {
                this.simplOper = simplOper;
            }
    
            @Override
            public void run() {
                for (int i = 0; i < 10000; i++) {
                    simplOper.incCount();
                }
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            NotSafe simplOper = new NotSafe();
            //启动两个线程
            Count count1 = new Count(simplOper);
            Count count2 = new Count(simplOper);
            count1.start();
            count2.start();
            Thread.sleep(50);
            System.out.println(simplOper.count);//20000?
        }
    }
    
    

    不能保证运行结果每次的正确性。


    image.png

    ThreadLocal 和Synchronized 都可以用来解决多线程并发访问。
    可是ThreadLocal 和Synchronized 有本质上的差别。
    synchronized 是利用锁的机制,使得代码或者代码块在某一时刻仅仅只能被一个线程访问。
    而ThreadLocal 是为每一个线程都提供了变量副本,使得每个线程在某以时间访问到的并非同一个对象,这样就隔离了多个线程对数据的数据共享。
    Spring 的事务就是借助了ThreadLocal 类,Spring 会从数据库连接池中获取一个connection, 然后把connection 放入ThreadLocal中,也就和线程绑定了,事务需要提交或者回滚,只要从ThreadLocal 中拿到connection 进行操作。
    为何Spring的事务要借助ThreadLocal 类。
    以JDBC 为例:正常的事务可能如下:
    dbc = new DataBaseConnection();//第1行
    Connection con = dbc.getConnection();//第2行
    con.setAutoCommit(false);// //第3行
    con.executeUpdate(...);//第4行
    con.executeUpdate(...);//第5行
    con.executeUpdate(...);//第6行
    con.commit();////第7行
    上述代码,可以分成三个部分:
    事务准备阶段:第1~3行
    业务处理阶段:第4~6行
    事务提交阶段:第7行
    可以很明显的看到,不管我们开启事务还是执行具体的sql都需要一个具体的数据库连接。
    现在我们开发应用一般都采用三层结构,如果我们控制事务的代码都放在DAO(DataAccessObject)对象中,在DAO对象的每个方法当中去打开事务和关闭事务,当Service对象在调用DAO时,如果只调用一个DAO,那我们这样实现则效果不错,但往往我们的Service会调用一系列的DAO对数据库进行多次操作,那么,这个时候我们就无法控制事务的边界了,因为实际应用当中,我们的Service调用的DAO的个数是不确定的,可根据需求而变化,而且还可能出现Service调用Service的情况。


    image.png
    image.png

    但是需要注意一个问题,如何让三个DAO使用同一个数据源连接呢?我们就必须为每个DAO传递同一个数据库连接,要么就是在DAO实例化的时候作为构造方法的参数传递,要么在每个DAO的实例方法中作为方法的参数传递。这两种方式无疑对我们的Spring框架或者开发人员来说都不合适。为了让这个数据库连接可以跨阶段传递,又不显示的进行参数传递,就必须使用别的办法。
    Web容器中,每个完整的请求周期会由一个线程来处理。因此,如果我们能将一些参数绑定到线程的话,就可以实现在软件架构中跨层次的参数共享(是隐式的共享)。而JAVA中恰好提供了绑定的方法--使用ThreadLocal。
    结合使用Spring里的IOC和AOP,就可以很好的解决这一点。
    只要将一个数据库连接放入ThreadLocal中,当前线程执行时只要有使用数据库连接的地方就从ThreadLocal获得就行了。

    ThreadLocal 的使用

    ThreadLocal类接口很简单,只有4个方法,我们先来了解一下:
    • void set(Object value)
    设置当前线程的线程局部变量的值。
    • public Object get()
    该方法返回当前线程所对应的线程局部变量。
    • public void remove()
    将当前线程局部变量的值删除,目的是为了减少内存的占用,该方法是JDK 5.0新增的方法。需要指出的是,当线程结束后,对应该线程的局部变量将自动被垃圾回收,所以显式调用该方法清除线程的局部变量并不是必须的操作,但它可以加快内存回收的速度。
    • protected Object initialValue()
    返回该线程局部变量的初始值,该方法是一个protected的方法,显然是为了让子类覆盖而设计的。这个方法是一个延迟调用方法,在线程第1次调用get()或set(Object)时才执行,并且仅执行1次。ThreadLocal中的缺省实现直接返回一个null。
    public final static ThreadLocal<String> RESOURCE = new ThreadLocal<String>();
    RESOURCE代表一个能够存放String类型的ThreadLocal对象。此时不论什么一个线程能够并发访问这个变量,对它进行写入、读取操作,都是线程安全的。


    image.png image.png
    image.png

    上面先取到当前线程,然后调用getMap方法获取对应的ThreadLocalMap,ThreadLocalMap是ThreadLocal的静态内部类,然后Thread类中有一个这样类型成员,所以getMap是直接返回Thread的成员。

    看下ThreadLocal的内部类ThreadLocalMap部分源码:

    image.png

    可以看到有个Entry内部静态类,它继承了WeakReference,总之它记录了两个信息,一个是ThreadLocal<?>类型,一个是Object类型的值。getEntry方法则是获取某个ThreadLocal对应的值,set方法就是更新或赋值相应的ThreadLocal对应的值。


    image.png
    image.png

    回顾我们的get方法,其实就是拿到每个线程独有的ThreadLocalMap
    然后再用ThreadLocal的当前实例,拿到Map中的相应的Entry,然后就可以拿到相应的值返回出去。当然,如果Map为空,还会先进行map的创建,初始化等工作。

    引发的内存泄漏分析

    image.png

    这个o,我们可以称之为对象引用,而new Object()我们可以称之为在内存中产生了一个对象实例。
    当写下 o=null时,只是表示o不再指向堆中object的对象实例,不代表这个对象实例不存在了。

    强引用就是指在程序代码之中普遍存在的,类似“Object obj=new Object()”这类的引用,只要强引用还存在,垃圾收集器永远不会回收掉被引用的对象实例。

    软引用是用来描述一些还有用但并非必需的对象。对于软引用关联着的对象,在系统将要发生内存溢出异常之前,将会把这些对象实例列进回收范围之中进行第二次回收。如果这次回收还没有足够的内存,才会抛出内存溢出异常。在JDK 1.2之后,提供了SoftReference类来实现软引用。

    弱引用也是用来描述非必需对象的,但是它的强度比软引用更弱一些,被弱引用关联的对象实例只能生存到下一次垃圾收集发生之前。当垃圾收集器工作时,无论当前内存是否足够,都会回收掉只被弱引用关联的对象实例。在JDK 1.2之后,提供了WeakReference类来实现弱引用。

    虚引用也称为幽灵引用或者幻影引用,它是最弱的一种引用关系。一个对象实例是否有虚引用的存在,完全不会对其生存时间构成影响,也无法通过虚引用来取得一个对象实例。为一个对象设置虚引用关联的唯一目的就是能在这个对象实例被收集器回收时收到一个系统通知。在JDK 1.2之后,提供了PhantomReference类来实现虚引用。

    内存泄漏的现象

    package com.ch1.threadlocal;
    
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author sxylml
     * @Date : 2019/5/10 16:45
     * @Description:  使用不同的场景查看对应的内存使用情况
     */
    public class ThreadLocalMemoryLeak {
        private static final int TASK_LOOP_SIZE = 100;
    
        final static ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(5, 5, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>());
    
        static class LocalVariable {
            /**
             * 5M大小的数组  1MB=1024KB
             */
            private byte[] a = new byte[1024 * 1024 * 5];
        }
    
        ThreadLocal<LocalVariable> localVariable;
        //= new ThreadLocal<>();
    
        public static void main(String[] args) throws InterruptedException {
            /*5*5=25*/
            for (int i = 0; i < TASK_LOOP_SIZE; ++i) {
                poolExecutor.execute(new Runnable() {
                    @Override
                    public void run() {
                        // 场景1:什么都不执行
    
    
                        // 场景2:每个任务中new出一个数组
                        //   new LocalVariable();
    
    
                        //场景3 启用ThreadLocal 没有使用remove()
                        ThreadLocalMemoryLeak oom = new ThreadLocalMemoryLeak();
                        oom.localVariable = new ThreadLocal<>();
                        oom.localVariable.set(new LocalVariable());
                        // new LocalVariable();
    
                        System.out.println("use local varaible");
                        // 通过 visualVM 查看内存使用情况,没用调用remove() 和调用remove() 的差距
    
                        // 场景4:在3的基础上使用remove()
                        //oom.localVariable.remove();
    
    
                    }
                });
    
                Thread.sleep(100);
            }
            System.out.println("pool execute over");
        }
    }
    
    

    场景1,首先任务中不执行任何有意义的代码,当所有的任务提交执行完成后,可以看见,我们这个应用的内存占用基本上为25M左右

    image.png

    场景2,然后我们只简单的在每个任务中new出一个数组,执行完成后我们可以看见,内存占用。

    image.png

    场景3,当我们启用了ThreadLocal以后内存飙升到200M左右

    image.png
    场景4,当我们启用了ThreadLocal 并且加入remove() 方法后内存又回到场景2差不多。
    image.png

    根据我们前面对ThreadLocal的分析,我们可以知道每个Thread 维护一个 ThreadLocalMap,这个映射表的 key 是 ThreadLocal实例本身,value 是真正需要存储的 Object,也就是说 ThreadLocal 本身并不存储值,它只是作为一个 key 来让线程从 ThreadLocalMap 获取 value。仔细观察ThreadLocalMap,这个map是使用 ThreadLocal 的弱引用作为 Key 的,弱引用的对象在 GC 时会被回收。
    因此使用了ThreadLocal后,引用链如图所示


    image.png

    图中的虚线表示弱引用。
    这样,当把threadlocal变量置为null以后,没有任何强引用指向threadlocal实例,所以threadlocal将会被gc回收。这样一来,ThreadLocalMap中就会出现key为null的Entry,就没有办法访问这些key为null的Entry的value,如果当前线程再迟迟不结束的话,这些key为null的Entry的value就会一直存在一条强引用链:Thread Ref -> Thread -> ThreaLocalMap -> Entry -> value,而这块value永远不会被访问到了,所以存在着内存泄露。
    只有当前thread结束以后,current thread就不会存在栈中,强引用断开,Current Thread、Map value将全部被GC回收。最好的做法是不在需要使用ThreadLocal变量后,都调用它的remove()方法,清除数据。
    所以回到我们前面的实验场景,场景3中,虽然线程池里面的任务执行完毕了,但是线程池里面的5个线程会一直存在直到JVM退出,我们set了线程的localVariable变量后没有调用localVariable.remove()方法,导致线程池里面的5个线程的threadLocals变量里面的new LocalVariable()实例没有被释放。
    其实考察ThreadLocal的实现,我们可以看见,无论是get()、set()在某些时候,调用了expungeStaleEntry方法用来清除Entry中Key为null的Value,但是这是不及时的,也不是每次都会执行的,所以一些情况下还是会发生内存泄露。只有remove()方法中显式调用了expungeStaleEntry方法。
    从表面上看内存泄漏的根源在于使用了弱引用,但是另一个问题也同样值得思考:为什么使用弱引用而不是强引用?

    下面我们分两种情况讨论:

    key 使用强引用:对ThreadLocal对象实例的引用被置为null了,但是ThreadLocalMap还持有这个ThreadLocal对象实例的强引用,如果没有手动删除,ThreadLocal的对象实例不会被回收,导致Entry内存泄漏。
    key 使用弱引用:对ThreadLocal对象实例的引用被被置为null了,由于ThreadLocalMap持有ThreadLocal的弱引用,即使没有手动删除,ThreadLocal的对象实例也会被回收。value在下一次ThreadLocalMap调用set,get,remove都有机会被回收。
    比较两种情况,我们可以发现:由于ThreadLocalMap的生命周期跟Thread一样长,如果都没有手动删除对应key,都会导致内存泄漏,但是使用弱引用可以多一层保障。
    因此,ThreadLocal内存泄漏的根源是:由于ThreadLocalMap的生命周期跟Thread一样长,如果没有手动删除对应key就会导致内存泄漏,而不是因为弱引用。

    总结

    JVM利用设置ThreadLocalMap的Key为弱引用,来避免内存泄露。
    JVM利用调用remove、get、set方法的时候,回收弱引用。
    当ThreadLocal存储很多Key为null的Entry的时候,而不再去调用remove、get、set方法,那么将导致内存泄漏。
    使用线程池+ ThreadLocal时要小心,因为这种情况下,线程是一直在不断的重复运行的,从而也就造成了value可能造成累积的情况。

    错误使用ThreadLocal导致线程不安全

    package com.ch1.threadlocal;
    
    import tools.SleepTools;
    
    /**
     * @author sxylml
     * @Date : 2019/5/11 13:14
     * @Description: ThreadLocal的线程不安全演示
     */
    public class ThreadLocalUnsafe implements Runnable {
    
        // 错误的方式
        public static Number number = new Number(0);
    
        //public  Number number = new Number(0);
    
        public static ThreadLocal<Number> value = new ThreadLocal<Number>() {
        };
    
        @Override
        public void run() {
            //每个线程计数加一
            number.setNum(number.getNum() + 1);
            //将其存储到ThreadLocal中
            value.set(number);
            SleepTools.ms(2);
            //输出num值
            System.out.println(Thread.currentThread().getName() + "=" + value.get().getNum());
        }
    
        public static void main(String[] args) {
            for (int i = 0; i < 5; i++) {
                new Thread(new ThreadLocalUnsafe()).start();
            }
        }
    
    
        private static class Number {
            public Number(int num) {
                this.num = num;
            }
    
            private int num;
    
            public int getNum() {
                return num;
            }
    
            public void setNum(int num) {
                this.num = num;
            }
    
            @Override
            public String toString() {
                return "Number [num=" + num + "]";
            }
        }
    
    
    }
    
    
    image.png

    为什么每个线程都输出5?难道他们没有独自保存自己的Number副本吗?为什么其他线程还是能够修改这个值?仔细考察ThreadLocal和Thead的代码,我们发现ThreadLocalMap中保存的其实是对象的一个引用,这样的话,当有其他线程对这个引用指向的对象实例做修改时,其实也同时影响了所有的线程持有的对象引用所指向的同一个对象实例。这也就是为什么上面的程序为什么会输出一样的结果:5个线程中保存的是同一Number对象的引用,在线程睡眠的时候,其他线程将num变量进行了修改,而修改的对象Number的实例是同一份,因此它们最终输出的结果是相同的。
    而上面的程序要正常的工作,应该的用法是让每个线程中的ThreadLocal都应该持有一个新的Number对象。

    线程间的协作

    线程之间相互配合,完成某项工作,比如:一个线程修改了一个对象的值,而另一个线程感知到了变化,然后进行相应的操作,整个过程开始于一个线程,而最终执行又是另一个线程。前者是生产者,后者就是消费者,这种模式隔离了“做什么”(what)和“怎么做”(How),简单的办法是让消费者线程不断地循环检查变量是否符合预期在while循环中设置不满足的条件,如果条件满足则退出while循环,从而完成消费者的工作。却存在如下问题:
    1) 难以确保及时性。
    2)难以降低开销。如果降低睡眠的时间,比如休眠1毫秒,这样消费者能更加迅速地发现条件变化,但是却可能消耗更多的处理器资源,造成了无端的浪费。

    等待/通知机制

    是指一个线程A调用了对象O的wait()方法进入等待状态,而另一个线程B调用了对象O的notify()或者notifyAll()方法,线程A收到通知后从对象O的wait()方法返回,进而执行后续操作。上述两个线程通过对象O来完成交互,而对象上的wait()和notify/notifyAll()的关系就如同开关信号一样,用来完成等待方和通知方之间的交互工作。

    notify():

    通知一个在对象上等待的线程,使其从wait方法返回,而返回的前提是该线程获取到了对象的锁,没有获得锁的线程重新进入WAITING状态。

    notifyAll():

    通知所有等待在该对象上的线程

    wait()

    调用该方法的线程进入 WAITING状态,只有等待另外线程的通知或被中断才会返回.需要注意,调用wait()方法后,会释放对象的锁

    wait(long)

    超时等待一段时间,这里的参数时间是毫秒,也就是等待长达n毫秒,如果没有通知就超时返回

    wait (long,int)

    对于超时时间更细粒度的控制,可以达到纳秒

    等待和通知的标准范式

    等待方遵循如下原则。

    1)获取对象的锁。
    2)如果条件不满足,那么调用对象的wait()方法,被通知后仍要检查条件。
    3)条件满足则执行对应的逻辑。


    image.png
    通知方遵循如下原则。

    1)获得对象的锁。
    2)改变条件。
    3)通知所有等待在对象上的线程。


    image.png

    采用多线程技术,例如wait/notify,设计实现一个符合生产者和消费者问题的程序,
    对某一个对象(枪膛)进行操作,其最大容量是20颗子弹,
    生产者线程是一个压入线程,它不断向枪膛中压入子弹,消费者线程是一个射出线程,它不断从枪膛中射出子弹。

    package com.ch1.wn.gun;
    
    import tools.SleepTools;
    
    /**
     * @author sxylml
     * @Date : 2019/5/14 17:06
     * @Description: 采用多线程技术,例如wait/notify,设计实现一个符合生产者和消费者问题的程序,
     * 对某一个对象(枪膛)进行操作,其最大容量是20颗子弹,
     * 生产者线程是一个压入线程,它不断向枪膛中压入子弹,消费者线程是一个射出线程,它不断从枪膛中射出子弹。
     */
    public class GunDemo {
    
        public static void main(String[] args) {
            Gun gun = new Gun();
            new Thread(new PressIn(gun)).start();
            new Thread(new Shooting(gun)).start();
    
        }
    
        static class PressIn implements Runnable {
            Gun gun;
    
            public PressIn(Gun gun) {
                this.gun = gun;
            }
    
            @Override
            public void run() {
                while (true) {
                    gun.pressInBullet();
                    SleepTools.ms(400);
                }
            }
        }
    
        static class Shooting implements Runnable {
    
            Gun gun;
    
            public Shooting(Gun gun) {
                this.gun = gun;
            }
    
            @Override
            public void run() {
                while (true) {
                    gun.shooting();
                    SleepTools.ms(600);
                }
            }
        }
    
    
        static public class Gun {
    
            /**
             * 容量20
             */
            private final static int CAPACITY = 20;
    
            private int currentBulletNumber;
    
            synchronized void pressInBullet() {
    
                while (currentBulletNumber >= CAPACITY) {
                    System.out.println("弹夹已满,等待射击");
                    try {
                        wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                currentBulletNumber++;
                System.out.println("弹匣未满,开始装弹,现有子弹:" + currentBulletNumber);
                notifyAll();
    
    
            }
    
            synchronized void shooting() {
    
                while (currentBulletNumber <= 0) {
                    System.out.println("弹匣已空,无法射击");
                    try {
                        wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
    
                System.out.println("弹匣有弹,开始射击,现有子弹:" + currentBulletNumber);
                currentBulletNumber--;
                System.out.println("射击完还剩子弹:" + currentBulletNumber);
                notifyAll();
    
    
            }
        }
    
    
    }
    
    

    在调用wait()、notify()系列方法之前,线程必须要获得该对象的对象级别锁,即只能在同步方法或同步块中调用wait()方法、notify()系列方法,进入wait()方法后,当前线程释放锁,在从wait()返回前,线程与其他线程竞争重新获得锁, 执行notify()系列方法的线程退出调用了notifyAll的synchronized代码块的时候后,他们就会去竞争。如果其中一个线程获得了该对象锁,它就会继续往下执行,在它退出synchronized代码块,释放锁后,其他的已经被唤醒的线程将会继续竞争获取该锁,一直进行下去,直到所有被唤醒的线程都执行完毕。
    notify和notifyAll应该用谁
    尽可能用notifyall(),谨慎使用notify(),因为notify()只会唤醒一个线程,我们无法确保被唤醒的这个线程一定就是我们需要唤醒的线程,

    package com.ch1.wn;
    
    /**
     * @author sxylml
     * @Date : 2019/5/14 17:40
     * @Description:
     */
    public class ExpressDemo {
    
        private static Express express = new Express(0, Express.CITY);
    //    private static MyExpress express = new MyExpress(0, com.ch1.wn.Express.CITY);
    
        /**
         * 检查里程数变化的线程,不满足条件,线程一直等待
         */
        private static class CheckKm extends Thread {
            @Override
            public void run() {
                express.waitKm();
            }
        }
    
    
        /**
         * 检查地点变化的线程,不满足条件,线程一直等待
         */
        private static class CheckSite extends Thread {
            @Override
            public void run() {
                express.waitSite();
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            for (int i = 0; i < 3; i++) {
                new CheckSite().start();
            }
    
            for (int i = 0; i < 3; i++) {
                new CheckKm().start();
            }
    
            Thread.sleep(1000);
            express.changeKm();//快递地点变化
            express.changeSite();
        }
    
    
        static class Express {
    
            public final static String CITY = "ShengHai";
    
            /**
             * 快递运输里程数
             */
            private int km;
    
            /**
             * 快递到达地点
             */
            private String site;
    
    
            public Express(int km, String site) {
                this.km = km;
                this.site = site;
            }
    
    
       /**
         * 通知方遵循如下原则。
         * 1)获得对象的锁。
         * 2)改变条件。
         * 3)通知所有等待在对象上的线程。
         */
    
    
            /**
             * 变化公里数,然后通知处于wait状态并需要处理公里数的线程进行业务处理
             */
            public synchronized void changeKm() {
    
                this.km = 101;
                notify();
    
            }
    
            /**
             * 变化地点,然后通知处于wait状态并需要处理地点的线程进行业务处理
             */
            public synchronized void changeSite() {
                this.site = "BeiJing";
                notifyAll();
            }
    
    
    
    
        /**
         * 等待方遵循如下原则。
         * 1)获取对象的锁。
         * 2)如果条件不满足,那么调用对象的wait()方法,被通知后仍要检查条件。
         * 3)条件满足则执行对应的逻辑。
         */
    
            /**
             * 线程等待公里的变化
             */
            public synchronized void waitKm() {
    
                while (this.km < 100) {
                    try {
                        wait();
                        System.out.println("Check km thread[" + Thread.currentThread().getId() + "] is be notified");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("the Km is " + this.km + ",I will change db");
                }
    
            }
    
            /**
             * 线程等待目的地的变化
             */
            public synchronized void waitSite() {
    
                //到达目的地
                while (this.site.equals(CITY)) {
    
                    try {
                        wait();
                        System.out.println("Check Site thread[" + Thread.currentThread().getId() + "] is be notified");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("the site is " + this.site + ",I will call user");
    
                }
    
            }
        }
    }
    
    

    等待超时模式实现一个连接池

    调用场景:调用一个方法时等待一段时间(一般来说是给定一个时间段),如果该方法能够在给定的时间段之内得到结果,那么将结果立刻返回,反之,超时返回默认结果。
    假设等待时间段是T,那么可以推断出在当前时间now+T之后就会超时
    等待持续时间:REMAINING=T。
    •超时时间:FUTURE=now+T。
    // 对当前对象加锁
    public synchronized Object get(long mills) throws InterruptedException {
    long future = System.currentTimeMillis() + mills;
    long remaining = mills;
    // 当超时大于0并且result返回值不满足要求
    while ((result == null) && remaining > 0) {
    wait(remaining);
    remaining = future - System.currentTimeMillis();
    }
    return result;
    }

    客户端获取连接的过程被设定为等待超时的模式,也就是在1000毫秒内如果无法获取到可用连接,将会返回给客户端一个null。设定连接池的大小为10个,然后通过调节客户端的线程数来模拟无法获取连接的场景。
    它通过构造函数初始化连接的最大上限,通过一个双向队列来维护连接,调用方需要先调用fetchConnection(long)方法来指定在多少毫秒内超时获取连接,当连接使用完成后,需要调用releaseConnection(Connection)方法将连接放回线程池

    模拟继承Connection接口
    public class SqlConnectImpl implements Connection {
       /*拿一个数据库连接*/
        public static final Connection fetchConnection(){
            return new SqlConnectImpl();
        }
    
      @Override
        public void commit() throws SQLException {
            SleepTools.ms(70);
        }
    
        @Override
        public Statement createStatement() throws SQLException {
            SleepTools.ms(1);
            return null;
        }
    }
    
    实现一个数据库连接池
    package com.ch1.pool;
    
    import java.sql.Connection;
    import java.util.LinkedList;
    
    /**
     * @author sxylml
     * @Date : 2019/5/16 14:08
     * @Description:
     */
    public class DataBasePool {
    
        /**
         * 连接池
         */
        public static LinkedList<Connection> pool = new LinkedList<>();
    
    
        public DataBasePool(int initalSize) {
            if (initalSize > 0) {
                for (int i = 0; i < initalSize; i++) {
                    pool.addLast(SqlConnectImpl.fetchConnection());
                }
            }
        }
    
        /**
         * 释放连接,通知其他的等待连接的线程
         *
         * @param connection
         */
        public void releaseConnection(Connection connection) {
            if (connection != null) {
                //1: 获得对象的锁。
                synchronized (pool) {
                    //2:改变条件
                    pool.addLast(connection);
                    //3:通知所有等待在对象上的线程。
                    pool.notifyAll();
                }
            }
        }
    
        /**
         *
         * @param mills
         * @return
         * @throws InterruptedException
         */
    
        public Connection fetchConnection(long mills) throws InterruptedException {
    
            //1获取对象的锁。
            synchronized (pool) {
                // 永不超时模式
                if (mills <= 0) {
                    //2:如果条件不满足,那么调用对象的wait()方法,被通知后仍要检查条件。
                    while (pool.isEmpty()) {
                        try {
                            pool.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    //3:条件满足则执行对应的逻辑。
                    return pool.removeFirst();
                } else {
    
                    /*超时时刻*/
                    long future = System.currentTimeMillis() + mills;
                      /*等待时长*/
                    long remaining = mills;
                    while (pool.isEmpty() && remaining > 0) {
                        pool.wait(remaining);
                        /*唤醒一次,重新计算等待时长*/
                        remaining = future - System.currentTimeMillis();
                    }
                    Connection connection = null;
                    if (!pool.isEmpty()) {
                        connection = pool.removeFirst();
                    }
                    return connection;
                }
            }
        }
    }
    
    
    测试
    package com.ch1.pool;
    
    import java.sql.Connection;
    import java.sql.PreparedStatement;
    import java.sql.SQLException;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * @author sxylml
     * @Date : 2019/5/16 14:50
     * @Description:
     */
    public class DataBasePoolTest {
    
    
        static DataBasePool pool = new DataBasePool(10);
    
        /**
         * 控制器:控制main线程将会等待所有Woker结束后才能继续执行
         */
        static CountDownLatch countDownLatch;
    
    
        public static void main(String[] args) throws InterruptedException {
    
            int threadCount = 50;
            countDownLatch = new CountDownLatch(threadCount);
            //每个线程的操作次数
            int count = 20;
            //计数器:统计可以拿到连接的线程
            AtomicInteger got = new AtomicInteger();
            //计数器:统计没有拿到连接的线程
            AtomicInteger notGot = new AtomicInteger();
    
            for (int i = 0; i < threadCount; i++) {
                new Thread(new Worker(count, got, notGot), "worker_" + i).start();
    
    
            }
            countDownLatch.await();// main线程在此处等待
            System.out.println("总共尝试了: " + (threadCount * count));
            System.out.println("拿到连接的次数:  " + got);
            System.out.println("没能连接的次数: " + notGot);
    
        }
    
        static class Worker implements Runnable {
    
            /**
             * 每个线程的操作次数
             */
            int count;
    
            /**
             * 计数器:统计可以拿到连接的线程
             */
            AtomicInteger got;
    
            /**
             * 计数器:统计没有拿到连接的线程
             */
            AtomicInteger notGot;
    
            public Worker(int count, AtomicInteger got, AtomicInteger notGot) {
                this.count = count;
                this.got = got;
                this.notGot = notGot;
            }
    
            @Override
            public void run() {
                while (count > 0) {
                    try {
                        Connection connection = pool.fetchConnection(1000);
    
                        if (connection != null) {
                            //获取到连接
                            try {
                                //模拟数据操作
                                connection.createStatement();
                                //PreparedStatement preparedStatement = connection.prepareStatement("");
                                //preparedStatement.execute();
                                connection.commit();
                            } catch (SQLException e) {
                                e.printStackTrace();
                            } finally {
                                //使用完毕归还连接
                                pool.releaseConnection(connection);
                                got.incrementAndGet();
                                System.out.println(Thread.currentThread().getName() + "获取到连接使用后并且归还!次数:" + got);
                            }
    
                        } else {
                            // 没有获取到连接
                            notGot.incrementAndGet();
                            System.out.println(Thread.currentThread().getName() + "等待超时!");
                        }
    
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        count--;
                    }
                }
                countDownLatch.countDown();
            }
        }
    
    }
    
    

    面试题

    调用yield() 、sleep()、wait()、notify()等方法对锁有何影响?
    yield() 、sleep()被调用后,都不会释放当前线程所持有的锁。
    调用wait()方法后,会释放当前线程持有的锁,而且当前被唤醒后,会重新去竞争锁,锁竞争到后才会执行wait方法后面的代码。
    调用notify()系列方法后,对锁无影响,线程只有在syn同步代码执行完后才会自然而然的释放锁,所以notify()系列方法一般都是syn同步代码的最后一行。

    相关文章

      网友评论

          本文标题:(二)线程间的共享和协作

          本文链接:https://www.haomeiwen.com/subject/qbpiaqtx.html