1、等待/通知机制
线程间通讯可以采用while语句轮询检测某一条件,缺点是线程主动操作、读取、判断同一个变量,此方法浪费CPU资源。
1.1 等待/通知机制的实现
- wait()方法和notify()方法
1、要在同步方法或同步代码块中调用,即必须在调用前先获取该对象锁,若没有会抛出IllegalMonitorStateException。
2、执行wait()方法后,线程停止运行并释放锁,后续代码不执行;
3、notify()方法用来通知呈wait状态的线程,随机挑选一个对其发送通知并使其转为就绪状态,获取对象锁并执行后续代码。执行notify()方法不会立即释放锁,需要等待线程将程序执行完成。
4、notifyAll()方法可以通知所有在等待同一资源的线程全部进入就绪状态。
public class MyList
{
//此处必须将变量定义为static,方法定义为public方便访问
private static List list = new ArrayList();
public static void add()
{
list.add("myString");
}
public static int size()
{
return list.size();
}
}
public class ThreadA extends Thread
{
private Object lock;
public ThreadA(Object lock)
{
super();
this.lock = lock;
}
@Override
public void run()
{
try
{
synchronized(lock)
{
if(MyList.size() != 5)
{
System.out.println("wait begin");
lock.wait();
System.out.println("wait end");
}
}
}
catch(InterruptedException e)
{
e.printStackTrace();
}
}
}
public class ThreadB extends Thread
{
private Object lock;
public ThreadB(Object lock)
{
super();
this.lock = lock;
}
@Override
public void run()
{
try
{
synchronized(lock)
{
for(int i=0; i < 10; I++)
{
MyList.add();
if(MyList.size() == 5)
{
System.out.println("notify begin");
lock.notify();
System.out.println("notify end");
}
System.out.println("add 第" + (i+1) + "个元素");
}
}
}
catch(InterruptedException e)
{
e.printStackTrace();
}
}
}
public class Run
{
public static void main(String[] args)
{
try
{
Object lock = new Object();
ThreadA a = new ThreadA(lock);
a.start();
Thread.sleep(50);
ThreadB b = new ThreadB(lock);
b.start();
}
catch(InterruptedException e)
{
e.printStackTrace();
}
}
}
- 每个锁对象都有两个队列,一个是就绪队列,一个是阻塞队列,就绪队列存储将要获取锁的线程,阻塞队列存储被阻塞的线程。
- wait()方法被执行后,锁被自动释放,执行完notify()方法锁不会立即释放,等待notify()方法所在的代码块代码全部执行完成。
- wait(long)方法:等待某一段时间内是否有线程对锁进行唤醒,超过这个时间线程会自动唤醒。
1.2 生产者/消费者模式的实现
- 一生产者赋值,一消费者读取
public class Consumer
{
private Object lock;
public Consumer(Object lock)
{
this.lock = lock;
}
public void getValue()
{
try
{
synchronized (lock)
{
if(Value.getValue().equals(""))
{
lock.wait();
}
System.out.println("Consumer get " + Value.getValue());
Value.setValue("");
lock.notify();
}
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
}
public class Produce
{
private Object lock;
public Produce(Object lock)
{
this.lock = lock;
}
public void setValue()
{
try
{
synchronized (lock)
{
if(!Value.getValue().equals(""))
{
lock.wait();
}
String value = System.currentTimeMillis()+"";
System.out.println("Produce set " + value);
Value.setValue(value);
lock.notify();
}
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
}
public class ConsumerThread extends Thread
{
private Consumer consumer;
public ConsumerThread(Consumer consumer)
{
super();
this.consumer = consumer;
}
@Override
public void run()
{
while(true)
{
consumer.getValue();
}
}
}
public class ProduceThread extends Thread
{
private Produce produce;
public ProduceThread(Produce produce)
{
super();
this.produce = produce;
}
@Override
public void run()
{
while(true)
{
produce.setValue();
}
}
}
public class Value
{
//设置初值
private static String value = "";
public static void setValue(String valueSet)
{
value = valueSet;
}
public static String getValue()
{
return value;
}
}
public class Main
{
public static void main(String[] args)
{
Object lock = new Object();
Produce produce = new Produce(lock);
ProduceThread produceThread = new ProduceThread(produce);
Consumer consumer = new Consumer(lock);
ConsumerThread consumerThread = new ConsumerThread(consumer);
produceThread.start();
consumerThread.start();
}
}
运作机制:
以value值作为切入点,当value值为""时,消费者线程为wait状态,生产者获取锁对value赋值,完成赋值后notify并释放锁,消费者被唤醒,获取生产者给value赋的值,并将value值设置回"",notify并释放锁。
- 多生产者/多消费者实现
假死:因为线程notify的对象有可能是异类,也有可能是同类,连续多次唤醒同类,这就有可能导致所有的线程都进入waiting状态,从而任务无法正常执行即假死。
解决方法:使用notifyAll()方法
public class Stack
{
private List list = new ArrayList();
synchronized public void push()
{
try
{
//此处由if改为while,条件改变时无法得到及时响应
//唤醒多个wait状态的线程,pop中的remove出错。
while(list.size() == 1)
{
this.wait();
}
list.add("anyString"));
this.notify();
System.out.println("push " + list.size());
}
catch(InterruptedException e)
{
e.printStackTrace();
}
}
synchronized public void pop()
{
try
{
while(list.size() != 1)
{
this.wait();
}
list.remove(0);
//通知同类和异类
this.notifyAll();
System.out.println("pop" + list.size());
}
}
}
*交叉备份实现
public class BackUpTool
{
private boolean backToA = false;
synchronized public void backUpA()
{
try
{
while (backToA == false)
{
wait();
}
System.out.println("Backup to A");
backToA = false;
notifyAll();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
synchronized public void backUpB()
{
try
{
while (backToA == true)
{
wait();
}
System.out.println("Backup to B");
backToA = true;
notifyAll();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
}
public class ThreadA extends Thread
{
private BackUpTool backUpTool;
public ThreadA(BackUpTool tool)
{
super();
this.backUpTool = tool;
}
@Override
public void run()
{
backUpTool.backUpA();
}
}
public class ThreadB extends Thread
{
private BackUpTool backUpTool;
public ThreadB(BackUpTool tool)
{
super();
this.backUpTool = tool;
}
@Override
public void run()
{
backUpTool.backUpB();
}
}
public class Run
{
public static void main(String[] args)
{
BackUpTool backUpTool = new BackUpTool();
for(int i = 0; i < 20; i++)
{
ThreadA threadA = new ThreadA(backUpTool);
ThreadB threadB = new ThreadB(backUpTool);
threadA.start();
threadB.start();
}
}
}
2、通过管道进行线程间通信
管道流(pipeStream)是一种特殊的流,用于不同线程间直接传送数据。一个线程发送数据到输出管道,另一个线程从输入管道读取数据。
字节流 PipedInputStream、PipedOutputStream
字符流 PipedReader、PipedWriter
import java.io.IOException;
import java.io.PipedOutputStream;
public class WriteData
{
public void wirteMethod(PipedOutputStream out)
{
try
{
System.out.println("Write:");
for(int i=0; i<300;i++)
{
String outData = "" + (i+1);
//0到299写入到输出管道
out.write(outData.getBytes());
System.out.print(outData);
}
System.out.println();
out.close();
}
catch (IOException e)
{
e.printStackTrace();
}
}
}
import java.io.PipedOutputStream;
public class ThreadWrite extends Thread
{
private WriteData writeData;
private PipedOutputStream out;
public ThreadWrite(WriteData writeData, PipedOutputStream out)
{
super();
this.writeData = writeData;
this.out = out;
}
@Override
public void run()
{
writeData.wirteMethod(out);
}
}
import java.io.IOException;
import java.io.PipedInputStream;
public class ReadData
{
public void readMethod(PipedInputStream input)
{
try
{
System.out.println("Read:");
byte[] byteArray = new byte[20];
//到输入管道里读取数据到byteArray数组中
//返回数组大小
int readLength = input.read(byteArray);
while(readLength != -1)
{
String newData = new String(byteArray,0,readLength);
System.out.println(newData);
readLength = input.read(byteArray);
}
System.out.println();
input.close();
}
catch (IOException e)
{
e.printStackTrace();
}
}
}
import java.io.PipedInputStream;
public class ThreadReader extends Thread
{
private ReadData readData;
private PipedInputStream input;
public ThreadReader(ReadData readData, PipedInputStream input)
{
super();
this.readData = readData;
this.input = input;
}
@Override
public void run()
{
readData.readMethod(input);
}
}
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
public class Run
{
public static void main(String[] args)
{
try
{
WriteData writeData = new WriteData();
ReadData readData = new ReadData();
PipedInputStream inputStream = new PipedInputStream();
PipedOutputStream outputStream = new PipedOutputStream();
inputStream.connect(outputStream);
ThreadReader threadReader = new ThreadReader(readData, inputStream);
ThreadWrite threadWrite = new ThreadWrite(writeData, outputStream);
threadWrite.start();
Thread.sleep(2000);
threadReader.start();
}
catch (IOException e)
{
e.printStackTrace();
}
catch (InterruptedException ex)
{
ex.printStackTrace();
}
}
}
3、join()方法
3.1 join()方法使用方式
class Run
{
public static void main(String[] args)
{
Thread thread = new Thread();
thread.start();
//加上join语句可以保证在thread线程结束后再执行打印end操作
thread.join();
System.out.println("end");
}
}
方法join()的作用是使所属的线程对象thread正常执行完run()方法中的任务,而使当前的线程main进行无限期的阻塞,等待线程thread销毁后再继续执行线程main后面的代码。
3.2 join(long)与sleep(long)的区别
方法join(long)的功能在内部使用wait(long)来实现的,因此具有释放锁的特点;而sleep(long)方法不是释放锁。
public final synchronized void join(long millis)
throws InterruptedException {
long base = System.currentTimeMillis();
long now = 0;
if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}
if (millis == 0) {
while (isAlive()) {
wait(0);
}
} else {
while (isAlive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base;
}
}
}
4、类ThreadLocal作用
每个线程绑定自己的值,ThreadLocal类可以存储每个线程的私有数据。每个线程可以获取到各自设置的值,互不影响,隔离性。
public class Tools
{
public static ThreadLocal t1 = new ThreadLocal();
}
public class ThreadA extends Thread
{
@Override
public void run()
{
try
{
for(int i = 0; i < 100; i++)
{
Tools.t1.set("ThreadA" + (i+1));
System.out.println("ThreadA get " + Tools.t1.get());
Thread.sleep(1000);
}
}
catch (InterruptedException ex)
{
ex.printStackTrace();
}
}
}
public class ThreadB extends Thread
{
@Override
public void run()
{
try
{
for(int i = 0; i < 100; i++)
{
Tools.t1.set("ThreadB" + (i+1));
System.out.println("ThreadB get " + Tools.t1.get());
Thread.sleep(1000);
}
}
catch (InterruptedException ex)
{
ex.printStackTrace();
}
}
}
public class Run
{
public static void main(String[] args)
{
try
{
ThreadA threadA = new Thread();
ThreadB threadB = new Thread();
threadA.start();
threadB.start();
for(int i = 0; i < 100; i++)
{
Tools.t1.set("Main" + (i+1));
System.out.println("Main get " + Tools.t1.get());
Thread.sleep(1000);
}
}
catch (InterruptedException ex)
{
ex.printStackTrace();
}
}
}
InteritableThreadLocal类可以让子线程从父类线程中取得值。
网友评论