美文网首页
四、Java线程间通信

四、Java线程间通信

作者: 沉沦2014 | 来源:发表于2018-12-11 21:48 被阅读20次

    摘自《Java并发编程的艺术》

    1 volatile和synchronized关键字

    关键字volatile可以用来修饰字段(成员变量),就是告知程序任何对该变量的访问均需要从共享内存中获取,而对它的改变必须同步刷新回共享内存,它能保证所有线程对变量访问的可见性

    关键字synchronized可以修饰方法或者以同步块的形式来进行使用,它主要确保多个线程在同一个时刻,只能有一个线程处于方法或者同步块中,它保证了线程对变量访问的可见性和排他性

    通过使用javap工具查看生成的class文件信息来分析synchronized关键字的实现细节,代码如下

    public class Synchronized {
    
        public static void main(String[] args) {
            synchronized (Synchronized.class){
                m();
            }
        }
        public static synchronized void m(){
        }
    }
    

    执行javap -v Synchronized.class,部分相关输出如下所示:

    对于同步块的实现使用了 monitorentermonitorexit 指令,而同步方法则是依赖方法修饰符上的ACC_SYNCHRONIZED来完成。无论采用哪种方式,其本质是对一个对象的监视器进行获取,而这个获取过程是排他的,也就是同一时刻只能有一个线程获取到由synchronized所保护对象的监视器。

    任意一个对象都拥有自己的监视器,当这个对象由同步块或者这个对象的同步方法调用时,执行方法的线程必须先获取到该对象的监视器才能进入同步块或者同步方法,而没有获取到监视器(执行该方法)的线程将会被阻塞在同步块和同步方法的入口处,进入BLOCKED状态。

    下图描述了对象、对象的监视器、同步队列和执行线程之间的关系:

    从图中可以看到,任意线程对Object(Object由synchronized保护)的访问,首先要获得Object的监视器。如果获取失败,线程进入同步队列,线程状态变为BLOCKED。当访问Object的前驱(获得了锁的线程)释放了锁,则该释放操作唤醒阻塞在同步队列中的线程,使其重新尝试对监视器的获取。

    2 等待 / 通知机制

    等待/通知机制是指一个线程A调用了对象O的wait()方法进入等待状态,而另一个线程B调用了对象O的notify()或notifyAll()方法,线程A收到通知后从对象O的wait()方法返回,进而执行后续操作。上述两个线程对象O来完成交互,而对象上的wait()和notify/notifyAll()的关系就如同开关信号一样,用来完成等待方通知方之间的交互工作。

    下面所示的例子中,创建了两个线程——WaitThread和NotifyThread,前者检查flag值是否为false,如果符合要求,进行后续操作,否则在lock上等待,后者在睡眠了一段时间后对lock进行通知,示例如下所示。

    public class WaitNotify {
        static boolean flag = true;
        static Object lock = new Object();
     
        public static void main(String[] args) throws Exception {
            Thread waitThread = new Thread(new Wait(), "WaitThread");
            waitThread.start();
            TimeUnit.SECONDS.sleep(1);
            Thread notifyThread = new Thread(new Notify(), "NotifyThread");
            notifyThread.start();
        }
     
        static class Wait implements Runnable {
            public void run() {
                // 加锁,拥有lock的Monitor
                synchronized (lock) {
                    // 当条件不满足时,继续wait,同时释放了lock的锁
                    while (flag) {
                        try {
                            System.out.println(Thread.currentThread()
                                    + " flag is true. wait@ "
                                    + new SimpleDateFormat("HH:mm:ss")
                                            .format(new Date()));
                            lock.wait();
                        } catch (InterruptedException e) {
                        }
                    }
                    // 条件满足时,完成工作
                    System.out.println(Thread.currentThread()
                            + " flag is false. running@ "
                            + new SimpleDateFormat("HH:mm:ss").format(new Date()));
                }
            }
        }
     
        static class Notify implements Runnable {
            public void run() {
                // 加锁,拥有lock的Monitor
                synchronized (lock) {
                    // 获取lock的锁,然后进行通知,通知时不会释放lock的锁,
                    // 直到当前线程释放了lock后,WaitThread才能从wait方法中返回
                    System.out.println(Thread.currentThread()
                            + " hold lock. notify @ "
                            + new SimpleDateFormat("HH:mm:ss").format(new Date()));
                    lock.notifyAll();
                    flag = false;
                    try {
                        TimeUnit.SECONDS.sleep(5);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                // 再次加锁
                synchronized (lock) {
                    System.out.println(Thread.currentThread()
                            + " hold lock again. sleep@ "
                            + new SimpleDateFormat("HH:mm:ss").format(new Date()));
                    try {
                        TimeUnit.SECONDS.sleep(5);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    

    输出如下(输出内容可能不同,主要区别在时间上)

    Thread[WaitThread,5,main] flag is true. wait@ 16:15:44
    Thread[NotifyThread,5,main] hold lock. notify @ 16:15:45
    Thread[NotifyThread,5,main] hold lock again. sleep@ 16:15:50
    Thread[WaitThread,5,main] flag is false. running@ 16:15:55
    

    上述第3行和第4行输出的顺序可能会互换,而上述例子主要说明了调用wait()、notify()以及notifyAll()时需要注意的细节,如下。

    1. 使用wait()、notify()和notifyAll()时需要先对调用对象加锁
    2. 调用wait()方法后,线程状态由RUNNING变为WAITING,并将当前线程放置到对象的等待队列。
    3. notify()或notifyAll()方法调用后,等待线程依旧不会从wait()返回,需要调用notify()或notifAll()的线程释放锁之后,等待线程才有机会从wait()返回
    4. notify()方法将等待队列中的一个等待线程从等待队列中移到同步队列中,而notifyAll()方法则是将等待队列中所有的线程全部移到同步队列,被移动的线程状态由WAITING变为BLOCKED。
    5. 从wait()方法返回的前提是获得了调用对象的锁

    从上述细节中可以看到,等待/通知机制依托于同步机制,其目的就是确保等待线程从wait()方法返回时能够感知到通知线程对变量做出的修改。下图描述了上述示例的过程。

    在图中,WaitThread首先获取了对象的锁,然后调用对象的wait()方法,从而放弃了锁并进入了对象的等待队列WaitQueue中,进入等待状态。由于WaitThread释放了对象的锁,NotifyThread随后获取了对象的锁,并调用对象的notify()方法,将WaitThread从WaitQueue移到SynchronizedQueue中,此时WaitThread的状态变为阻塞状态。NotifyThread释放了锁之后,WaitThread再次获取到锁并从wait()方法返回继续执行。

    2.1 生产者/消费者模式

    Consumer.java

    public class Consumer extends Thread {
        // 每次消费的产品数量
        private int num;
     
        // 所在放置的仓库
        private Storage storage;
     
        // 构造函数,设置仓库
        public Consumer(Storage storage) {
            this.storage = storage;
        }
     
        // 线程run函数
        public void run() {
            consume(num);
        }
     
        // 调用仓库Storage的生产函数
        public void consume(int num) {
            storage.consume(num);
        }
     
        // get/set方法
        public int getNum() {
            return num;
        }
     
        public void setNum(int num) {
            this.num = num;
        }
     
        public Storage getStorage() {
            return storage;
        }
     
        public void setStorage(Storage storage) {
            this.storage = storage;
        }
    }
    

    Producer.java

    public class Producer extends Thread {
        // 每次生产的产品数量
        private int num;
     
        // 所在放置的仓库
        private Storage storage;
     
        // 构造函数,设置仓库
        public Producer(Storage storage) {
            this.storage = storage;
        }
     
        // 线程run函数
        public void run() {
            produce(num);
        }
     
        // 调用仓库Storage的生产函数
        public void produce(int num) {
            storage.produce(num);
        }
     
        // get/set方法
        public int getNum() {
            return num;
        }
     
        public void setNum(int num) {
            this.num = num;
        }
     
        public Storage getStorage() {
            return storage;
        }
     
        public void setStorage(Storage storage) {
            this.storage = storage;
        }
    }
    

    Storage.java

    public class Storage {
        // 仓库最大存储量
        private final int MAX_SIZE = 100;
     
        // 仓库存储的载体
        private LinkedList<Object> list = new LinkedList<Object>();
     
        // 生产num个产品
        public void produce(int num) {
            // 同步代码段
            synchronized (list) {
                // 如果仓库剩余容量不足
                while (list.size() + num > MAX_SIZE) {
                    System.out.println("【要生产的产品数量】:" + num + "\t【库可以存放存量】:"
                            + list.size() + "\t暂时不能执行生产任务!");
                    try {
                        // 由于条件不满足,生产阻塞
                        list.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
     
                // 生产条件满足情况下,生产num个产品
                for (int i = 1; i <= num; ++i) {
                    list.add(new Object());
                }
     
                System.out.println("【已经生产产品数】:" + num + "\t【现仓储量为】:" + list.size());
                // 通知消费者来消费
                list.notifyAll();
            }
        }
     
        // 消费num个产品
        public void consume(int num) {
            // 同步代码段
            synchronized (list) {
                // 如果仓库存储量不足
                while (list.size() < num) {
                    System.out.println("【要消费的产品数量】:" + num + "\t【库存量】:"
                            + list.size() + "\t暂时不能执行生产任务!");
                    try {
                        // 由于条件不满足,消费阻塞
                        list.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
     
                // 消费条件满足情况下,消费num个产品
                for (int i = 1; i <= num; ++i) {
                    list.remove();
                }
     
                System.out.println("【已经消费产品数】:" + num + "\t【现仓储量为】:" + list.size());
                // 通知生产者生产
                list.notifyAll();
            }
        }
     
        // get/set方法
        public LinkedList<Object> getList() {
            return list;
        }
     
        public void setList(LinkedList<Object> list) {
            this.list = list;
        }
     
        public int getMAX_SIZE() {
            return MAX_SIZE;
        }
    }
    

    Test.java

    public class Test {
        public static void main(String[] args) {
            // 仓库对象
            Storage storage = new Storage();
     
            // 生产者对象
            Producer p1 = new Producer(storage);
            Producer p2 = new Producer(storage);
            Producer p3 = new Producer(storage);
            Producer p4 = new Producer(storage);
            Producer p5 = new Producer(storage);
            Producer p6 = new Producer(storage);
            Producer p7 = new Producer(storage);
     
            // 消费者对象
            Consumer c1 = new Consumer(storage);
            Consumer c2 = new Consumer(storage);
            Consumer c3 = new Consumer(storage);
     
            // 设置生产者产品生产数量
            p1.setNum(10);
            p2.setNum(10);
            p3.setNum(10);
            p4.setNum(10);
            p5.setNum(10);
            p6.setNum(10);
            p7.setNum(80);
     
            // 设置消费者产品消费数量
            c1.setNum(50);
            c2.setNum(20);
            c3.setNum(30);
     
            // 线程开始执行
            c1.start();
            c2.start();
            c3.start();
            p1.start();
            p2.start();
            p3.start();
            p4.start();
            p5.start();
            p6.start();
            p7.start();
        }
    }
    

    3 Thread.join()的使用

    如果一个线程A执行了thread.join()语句,其含义是:当前线程A等待thread线程终止之后才 从thread.join()返回。线程Thread除了提供join()方法之外,还提供了join(long millis)和join(long millis,int nanos)两个具备超时特性的方法。这两个超时方法表示,如果线程thread在给定的超时时间里没有终止,那么将会从该超时方法中返回。

    4 ThreadLocal的使用

    ThreadLocal,即线程变量,是一个以ThreadLocal对象为键、任意对象为值的存储结构。这 个结构被附带在线程上,也就是说一个线程可以根据一个ThreadLocal对象查询到绑定在这个线程上的一个值。

    可以通过set(T)方法来设置一个值,在当前线程下再通过get()方法获取到原先设置的值。

    在代码清单4-15所示的例子中,构建了一个常用的Profiler类,它具有begin()和end()两个方法,而end()方法返回从begin()方法调用开始到end()方法被调用时的时间差,单位是毫秒。

    public class Profiler {
        // 第一次get()方法调用时会进行初始化(如果set方法没有调用),每个线程会调用一次
        private static final ThreadLocal<Long> TIME_THREADLOCAL = 
                new ThreadLocal<Long>() {
            @Override
            protected Long initialValue() {
                return System.currentTimeMillis();
            }
        };
    
        public static void main(String[] args) throws InterruptedException {
            Profiler.begin();
            TimeUnit.SECONDS.sleep(1);
            System.out.println("Time cost is: " + Profiler.end() + " mills");
        }
    
        public static final void begin() {
            TIME_THREADLOCAL.set(System.currentTimeMillis());
        }
    
        public static final long end() {
            // 时间消耗
            return System.currentTimeMillis() - TIME_THREADLOCAL.get();
        }
    }
    

    输出结果如下所示

    Cost: 1001 mills
    

    相关文章

      网友评论

          本文标题:四、Java线程间通信

          本文链接:https://www.haomeiwen.com/subject/mofyhqtx.html