美文网首页技术干货程序员
并发基础知识之线程间的协作

并发基础知识之线程间的协作

作者: 秃头哥编程 | 来源:发表于2018-05-18 14:00 被阅读37次

    多线程除了我们前面讲的竞争,其实还有协作。就像我们人一样,不但要竞争,也要学会合作,这样才能进步。这篇文章我们就讲讲多线程协作的基本机制wait/notify。同时使用多线程实现生产者/消费者模式。

    1.协作的场景

    多线程协作的场景有很多,比如:

    • 经典的生产者/消费者模式:生产者消费者通过共享队列实现协作,生产者往队列中放数据,消费者向队列中取数据,当队列满了的时候,生产者就不能再放了,当队列空了的时候,消费者就不能取了。

    • 同时开始:比如百米赛跑,所有的运动员必须等待裁判吹哨子之后才能开始跑。在一些程序,尤其是模拟仿真程序,要求多个线程同时开始。

    • 等待结束:主从协作模式也是一种比较常见的协作模式,主线程将任务分成若干小任务,为每个小任务创建一个线程,主线程必须等待所有子线程都运行结束后才能继续。

    • 集合点:比如班级去春游,在集合点必须等到所有同学都来齐了,才能去下一个旅游点,不能抛下任何一个同学,这是不负责任的。反映在程序中,比如并行迭代计算,每个线程负责一部分计算,然后在集合点等待其他线程完成,一起交付数据。


    2.wait/notify方法的介绍

    wait/notify方法是类Object中的方法,而Object又是所有类的父类,这样就使得所有的对象都可以调用这两个方法。

    主要有两个wait方法

    public final void wait() throws InterruptedException
    public final void wait(long timeout) throws InterruptedException
    

    不带参数或者参数为0,表示无限期等待。

    同时这里说一下wait和sleep的区别:wait会释放对象锁,而且不能主动唤醒,需要其他线程去唤醒它。而sleep不会释放对象锁,且到了设置的时间就会主动唤醒。

    前面的文章中有说过,每个对象都有一个锁和一个等待队列,当一个线程尝试去获取对象的锁失败时,就会加入该对象的等待队列。其实对象还有另一个队列,叫条件队列,专门用于线程间的协作。

    当一个线程调用wait方法后就会把当前线程加入条件队列中并阻塞,等待一个线程去把它唤醒。唤醒使用notify方法,主要有下面形式:

    public final void notify()
    public final void notifyAll()
    

    notify方法是在条件队列中随机选择一个线程,将其从条件队列中移除并唤醒。notifyAll方法,顾名思义,就是把条件队列中的线程都唤醒。

    wait的具体过程如下

    1.把当前线程放入条件队列,释放对象锁,线程状态变为WAITING或TIMED_WAITING。

    2.等待时间到或者其他线程调用notify/notifyAll方法唤醒,从条件队列中移除,但要重新竞争锁

    3.如果获得了对象锁,则线程状态变为RUNNABLE,并从wait方法调用中返回。否则进入等待队列,线程状态变为BLOCKED,只有获得锁之后才从wait方法调用中返回。

    说明:线程被唤醒不一定就能立刻获得锁。

    ps:wait/notify方法只能在synchronized块中被调用。如果调用wait/notify方法时,当前线程没有获得对象锁,则会抛出异常。


    3.生产者/消费者模式

    生产者/消费者模式中,协作的共享变量是队列,队列满了,则生产者就wait,队列空了,则消费者就wait。

    /**
     * 生产者消费者的共享队列
     */
    public class MyBlockingQueue<E> {
        private int length;
        private Queue<E> queue;
        public MyBlockingQueue(int length) {
            this.length = length;
            queue = new ArrayDeque<E>(length);
        }
    
        public synchronized void put(E e) throws InterruptedException {
            while(queue.size() == length) {
                wait();
            }
            queue.add(e);
            notifyAll();
        }
    
        public synchronized E get() throws InterruptedException {
            while(queue.isEmpty()) {
                wait();
            }
            E e = queue.poll();
            notifyAll();
            return e;
        }
    }
    

    该类是生产者消费者的共享队列,有两个方法,分别是生产者放数据的put方法,以及消费者取数据的get方法。都加了synchronized进行修饰。两个方法中都使用了wait方法,等待的条件不一样,但会加入相同的等待条件队列,所以这里要使用notifyAll方法,因为notify只能唤醒一个线程,如果唤醒的是同类线程那就完蛋了。

    只能有一个条件等待队列,这是wait/notify机制的局限性。

    /**
     * 生产者线程
     */
    public class Producer extends Thread {
        MyBlockingQueue<String> queue;
        public Producer(MyBlockingQueue<String> queue) {
            this.queue = queue;
        }
        
        @Override
        public void run() {
            int num = 0;
            try {
                while(true) {
                    String value = String.valueOf(num);
                    queue.put(value);
                    System.out.println("producer put " + value);
                    num++;
                    Thread.sleep((int)(Math.random() * 1000));
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            
        }
    }
    
    /**
     * 消费者线程
     */
    public class Consumer extends Thread {
        MyBlockingQueue<String> queue;
        public Consumer(MyBlockingQueue<String> queue) {
            this.queue = queue;
        }
        
        @Override
        public void run() {
            try {
                while(true) {
                    String value = queue.get();
                    System.out.println("Consumer get " + value);
                    Thread.sleep((int)(Math.random() * 1000));
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            
        }
    }
    

    搞一个主程序运行

    public class ProducerAndConsumer {
        public static void main(String[] args) {
            MyBlockingQueue<String> queue = new MyBlockingQueue<>(10);
            new Producer(queue).start();
            new Consumer(queue).start();
        }
    }
    

    运行后会交替的打印生产者线程和消费者线程的存取信息。

    这里我们使用了ArrayDeque,Java提供了专门的阻塞队列实现

    • 接口BlockingQueue和BlockingDeque
    • 基于数组的实现类ArrayBlockingQueue
    • 基于链表的实现类LinkedBlockingQueue和LinkedBlockingDeque
    • 基于堆的实现类PriorityBlockingQueue

    在实际开发中,应优先使用这些类。

    相关文章

      网友评论

      本文标题:并发基础知识之线程间的协作

      本文链接:https://www.haomeiwen.com/subject/sacgdftx.html