美文网首页java专题
Java线程协作基础 wait/notify+await/sig

Java线程协作基础 wait/notify+await/sig

作者: entro | 来源:发表于2019-05-16 14:47 被阅读9次

Java线程协作

[TOC]

线程协作的基本方法:wait/notify/notifyAll和await/signal/signalAll。

一、wait(long timeout)/notify/notifyAll

除了用于锁的锁池(entry set),每个对象(Object)还有另一个等待池(wait set)用于线程间协作,表示等待一个条件,这个条件自己改变不了,需要等待其他线程改变条件后唤醒当前线程。
调用Object的wait方法会将当前线程放入wait set并阻塞。
使用wait的注意事项:

  1. wait/notify/notifyAll必须包裹在 synchronized 同步块中。
  2. 多线程 synchronized 同步的必须是一个相同的对象。

wait/notify的两个问题

  1. wait 应该包裹在while(true){}循环中,而不是if(true)。

因为:如果if用不能重复判断条件,while则会重新判断条件执行。

  1. 多线程竞争的情况下应该使用notifyAll而不是notify否则可能造成死循环。

因为:wait/notify的同步代码块 synchronized 是一个非公平锁实现。
如果一次只唤醒一个线程,可能造成达不到条件的两个线程相互调用的死循环。
比如在消费者/生产者场景下,如果存在多个消费者和生产者,需要唤醒生产者的时候,可能会造成两个消费者互相唤醒的死循环。

生产者消费者场景的实现,可以通过在生产者消费者synchronized同一个容器来实现,在生产者消费者内部来使用wait/notify实现协作。

也可以通过一个阻塞队列来实现,阻塞队列的底层实现一般是采用另外一种机制:await/signal/signalAll

二、await/signal/signalAll

wait/notify属于Object的方法,配合 synchronized(JVM方法)使用。
而await/signal/signalAll是Condition的方法,配合Lock的子(jdk方法)类(如ReentrantLock)使用。

另外一个可以让线程阻塞的方法是sleep(Thread的方法)方法。调用wait方法需要先获得锁,而调用sleep方法是不需要的。
如果已经持有锁定,sleep不会释放锁,而wait会。

这三者的异同简单对比如下:

方法 谁的方法 配合使用 持有锁时是否释放锁 wait set
wait/notify Object synchronized Y unfair
await/signal Condition Lock接口的子类 Y 当Lock是fair则wait set是fair
sleep Thread 任何地方 N -

await和wait的作用的区别有一小点:wait只能在synchronized同步的对象中使用一个队列。
而await对应的Lock子类以newCondition多个,也就是一个锁有多个等待队列,相当于一个房子安装了多个门。

Condition接口支持把在condition queue上等待的线程 设置为fair或unfair, 当且仅当Lock是fair的,它生成的Condition对象才是fair的。

synchronized 块或方法对于一个对象只有一个condition queue。这样如果在这个queue上如果有多个condition predicate, 比如isFull(),isEmpty() ,
就必须用notifyAll()方法, 会有context switch及获取锁的性能损失。

三、协作场景

主要有五个场景:
1 生产者/消费者模式

生产者与消费者通过共享队列协作,生产者生产产品放入队列中,消费者从队列中消费产品。
如果队列长度有限,队列满的时候生产者需要等待,队列空的时候消费者需要等待。

2 同时开始

类似于运动员比赛,同时开始,常见于模拟仿真程序中。

3 等待结束(主从协作)

主从协作模式的一种情况:主线程将任务分解为若干子任务,为每个子任务创建线程,主线程继续需要等待所有子线程执行完毕。

4 异步结果(主从协作)

主从协作模式的一种情况:主线程将子线程的调用封装成异步调用,调用后马上返回Future对象,后续通过Future获取最终结果。

5 集合点

类似于团队旅游,过程中设置若干集合点,集合点集合完毕后进行后续活动。
比如在并行迭代计算中,每个线程负责一部分计算,然后在集合点等待其他线程完成,所有线程到齐后交换数据计算结果后进行下一次迭代。

3.1 生产者/消费者模式

有两种方法。

3.1.2 第一种方法:在每个线程中使用协作。

在生产者/消费者的线程中通过判断队列的状态,自主的使用wait/notify.

这里的生产/消费过程while包裹在synchronized内,所以生产的时候由一个生产者生产到满,由一个消费者消费到0这种比较规律的现象。

第二种实现方法(共享阻塞队列)实现的方式,生产/消费过程synchronized被while包裹,因而每次生产消费后都会释放锁,所以出现了比较频繁随机的竞争现象。

消费者和生产者的核心代码:

synchronized (linkedList){//消费者线程核心代码
    try {
        while (true) {
                if (linkedList.isEmpty()) {
                    OS.print(String.format("%s:缓存队列为空,通知生产者生产(notifyAll),暂停消费(wait)",getName()));
                    linkedList.notifyAll();
                    linkedList.wait();
                } else {
                    String rs = linkedList.poll();
                    OS.print(String.format("%s:消费产品:%s,消费后缓存队列size:%s",getName(),rs,linkedList.size()));
                    Thread.sleep(200);
                }
        }
    } catch(InterruptedException e){
        e.printStackTrace();
    }
}
try {//生产者线程核心代码
    synchronized (linkedList){
        int i = 0;
        while(true){
            if(linkedList.size()==max){
                OS.print(String.format("%s:缓存队列满,通知消费者启动(notifyAll),暂停生产(wait)。", getName()));
                linkedList.notifyAll();
                linkedList.wait();
            }else{
                linkedList.offer(getName()+(++i));
                OS.print(String.format("%s:继续生产:%s,队列当前长度(%s)", getName(),getName()+i, linkedList.size()));
                Thread.sleep(500);

            }
        }
    }
} catch (InterruptedException e) {
    e.printStackTrace();
}

这里的例子使用的是await/notify,也可以用await/signal来实现,具体实现大同小异。

4.1.2 在共享阻塞队列中实现协作

即消费者线程和生产者线程通过阻塞队列来实现协作。java提供的阻塞队列如 ArrayBlockingQueue 等底层都是通过await机制实现的。
第一种实现便于理解wait的原理,但是第二种实现代码更整洁一些,后续的场景只举例使用第二种实现来实现。

4.2 同时开始(实现最大的并行性):闭锁CountDownLatch也可以实现

主线程充当裁判员,每个子线程充当一个运动员。就位后,运动员子线程等待协作的共享变量是做为起跑信号。同样有两种实现。这里举例使用第二种简洁的实现:
一个起跑信号类FireFlag。

public class FireFlag {
    private volatile boolean fireFlag = false;
    public synchronized void wait4Fire() throws InterruptedException {
        while (!fireFlag){ wait(); }
    }
    public synchronized void fire(){
        this.fireFlag = true;
        notifyAll();
    }
}

运动员类,start后调用wait4Fire准备就绪:

public class Racer extends Thread{
    private FireFlag fireFlag;
    Racer(FireFlag fireFlag) {
        this.fireFlag = fireFlag;
    }
    @Override
    public void run() {
        try {
            fireFlag.wait4Fire();
            System.out.println(String.format("%s:开跑(%s)",getName(),System.nanoTime()+""));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

然后裁判员主线程,通过调用fire方法开枪,运动员线程开始同时起跑。

这里的FireFlag使用闭锁CountDownLatch也可以实现,初始值设为1,所有运动员线程调用CountDownLatch的await()方法,主线程调用CountDownLatch的countDown(),计数器变成0,相当于裁判员开枪。

4.3 等待结束(主从协作) 及同步协作类:CountDownLatch

主从协作的一种情况,Thread的join方法就是一种等待结束,底层是实现利用了wait():

while(isAlive){
    wait(0);
}

join的等待是线性的,需要逐一等待每个子线程。

public class Run {
    public static void main(String[] args) throws InterruptedException {
        Thread[] threads = new Thread[100];
        for (int i = 0;i<100;i++){
            threads[i] = new JoinThread("Thread"+i,(int)(Math.random()*1000));
            threads[i].start();
        }
        //join必须单独循环一次,否则100个子线程就变成线性执行了
        //但是主线程的等待还是逐个等待的。一些join可能是在线程执行完毕后调用的。
        for (int i=0;i<100;i++){
            threads[i].join();
        }
        System.out.println("其他线程都执行完毕了");
    }
}

这里举个并发执行的例子,主线程与子线程写作共享一个数count(WaitCount对象)来表示未完成的线程个数。
每执行完毕一个减一,当count==0时调用notify(此时只剩下当前线程和主线程,可以使用notify)通知主线程停止wait。
WaitCountDownLatch对象代码如下:

public class WaitCountDownLatch {
    private int count;
    WaitCountDownLatch(int count) {
        this.count = count;
    }
    
    /**
     * 阻塞等待计数器==0
     * @throws InterruptedException
     */
    public synchronized void await() throws InterruptedException {
        while (count>0){
            wait();
        }
        System.out.println(String.format("%s:count==0了,所有线程执行完毕了",Thread.currentThread().getName()));
    }

    /**
     * 任务执行完毕后计数器减一
     */
    public synchronized void countDown(){
        count--;
        System.out.println(String.format("%s:结束,count(%s)数减一",Thread.currentThread().getName(),count));
        if(count == 0){
            notify();
        }
    }
}

WaitCountDownLatch 是一个用于同步协作的工具类,用于演示原理。

jdk中提供了一个专门的同步类CountDownLatch,实际开发中应该使用这个类。
使用用法和这里的WaitCountDownLatch类似,也提供了await()countDown()这两个方法。

100个子线程共用一个WaitCountDownLatch对象,在run方法中调用中调用countDown()让计数器减一,主线程中启动所有线程后调用await()

4.4 异步结果(主从协作):Executor Future

主从协作的情况下,手工创建线程往往比较麻烦,一种常见的模式是异步调用。

异步调用一般返回一个Future对象,通过它可以获得最终的结果。

在java中表示子任务的接口是Callable。

异步结果的主要逻辑在Executor的execute()方法中:
创建callable线程,返回Future对象。Future的get()方法的逻辑是,阻塞等待任务线程结束,结束后返回结果。

这里有一个自己实现的Executor的execute()方法,包含Future的get()的实现的源码,供参考原理:

public class MyExecutor {
    public <V> MyFuture<V> execute(final Callable<V> task){
        final Object lock = new Object();
        final ExecuteThread<V> thread = new ExecuteThread<>(task,lock);
        thread.start();

        return () -> {
            synchronized (lock){
                while (!thread.isDone()){
                    lock.wait();
                }
            }
            if(thread.getException() != null){
                throw thread.getException();
            }
            return thread.getResult();
        };
    }
}

4.5 集合点:栅栏 CyclicBarrier

各个线程分头行动各自到达一个集合点,集合点等待所有线程到齐,交换数据后进行下一步动作。
这里举一个具体的例子:公司部门10人约定去爬山,各自从家中出发,到山底集合,所有人到齐开始爬山。

每个员工对应一个线程:

public class ClimberThread extends Thread {
    private CyclicBarrier cyclicBarrier;
    private int sleep;

    public ClimberThread(String threadName, int sleep, CyclicBarrier cyclicBarrier) {
        setName(threadName);
        this.cyclicBarrier = cyclicBarrier;
        this.sleep = sleep;
    }

    @Override
    public void run() {
        System.out.println(String.format("%s:我出发了",getName()));
        try {
            sleep(sleep);
            System.out.println(String.format("%s:我已经到达山底了,只到了(%s)人,我先等一下。",getName(),cyclicBarrier.getNumberWaiting()));
            cyclicBarrier.await();
            System.out.println(String.format("%s:所有人到齐了,开始爬山",getName()));
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}

主线程模拟10个员工从家中出发:

public class Run {
    public static void main(String[] args) {
        int climberCount = 10;
        CyclicBarrier cyclicBarrier = new CyclicBarrier(climberCount);
        for (int i = 0; i < climberCount; i++){
            new ClimberThread("climber"+i,(int)(Math.random()*1000),cyclicBarrier).start();
        }
        System.out.println("攀登者们陆续从家里出发了……");
    }
}

相关文章

网友评论

    本文标题:Java线程协作基础 wait/notify+await/sig

    本文链接:https://www.haomeiwen.com/subject/jadiaqtx.html