当多个线程一起协调完成一件事时,那么线程之间的沟通就显得尤为重要。
1. 假设线程A,线程B,两个线程分别打印1-3三个数字。
private static void ThreadDemo1(){
Thread A = new Thread(new Runnable() {
public void run() {
printNum("A");
}
});
Thread B = new Thread(new Runnable() {
public void run() {
printNum("B");
}
});
A.start();
B.start();
}
//数字打印
private static void printNum(String threadname){
int i = 0;
while ( i++ < 3 ){
try{
Thread.sleep(100);
}catch(InterruptedException e){
e.printStackTrace();
}
System.out.println( threadname + "print:" + i );
}
}
public static void main(String[] args) {
ThreadDemo1();
}
运行得到的结果:
Aprint:1
Bprint:1
Bprint:2
Aprint:2
Aprint:3
Bprint:3
分析:
可以看出线程A和线程B是同时运行的,且顺序是随机的,每次运行得到的结果可能不一样。
2. 如何让A、B两个线程依次执行?
如果希望线程B在线程A全部打印完成后再开始打印。可以利用thread.join()方法。
private static void ThreadDemo2(){
final Thread A = new Thread(new Runnable() {
public void run() {
printNum("A");
}
});
Thread B = new Thread(new Runnable() {
public void run() {
System.out.println("B 开始等待 A");
try{
A.join();
}catch(InterruptedException e){
e.printStackTrace();
}
printNum("B");
}
});
B.start();
A.start();
}
private static void printNum(String threadname){
int i = 0;
while ( i++ < 3 ){
try{
Thread.sleep(100);
}catch(InterruptedException e){
e.printStackTrace();
}
System.out.println( threadname + "print:" + i );
}
}
public static void main(String[] args) {
ThreadDemo2();
}
运行得到的结果:
B 开始等待 A
Aprint:1
Aprint:2
Aprint:3
Bprint:1
Bprint:2
Bprint:3
分析:
Thread.join
含义:
简单来说,就是线程没有执行完之前,会一直阻塞在join方法处。
join对于线程的作用.png
Thread.join源码
public class Thread implements Runnable {
public final void join() throws InterruptedException {
join(0);
}
public final synchronized void join(long millis) throws InterruptedException {
long base = System.currentTimeMillis();
long now = 0;
if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}
if (millis == 0) {//判断是否携带阻塞的超时时间,等于0表示没有设置超时时间
while (isAlive()) {//isAlive获取线程状态,无线等待直到AThread线程结束
wait(0); //调用Object中的wait方法实现线程的阻塞
}
} else {//阻塞直到超时
while (isAlive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base;
}
}
}
}
总结:Thread.join其实底层是通过wait/notifyall来实现线程的通信达到线程阻塞的目的;当线程执行结束以后,会触发两个事情,第一个是设置native线程对象为null、第二个是通过notifyall方法,让等待在A对象锁上的wait方法被唤醒。
3. 那如何让 A、B两个线程按照指定方式有序交叉运行呢?
希望 A 在打印完 1 后,再让 B 打印 1, 2, 3,最后再回到 A 继续打印 2, 3。这种需求下,显然 Thread.join() 已经不能满足了。我们需要更细粒度的锁来控制执行顺序。
这里,我们可以利用 object.wait() 和 object.notify() 两个方法来实现。
private static void ThreadDemo3(){
final Object lock = new Object();
Thread A = new Thread(new Runnable() {
public void run() {
synchronized(lock){
System.out.println("Aprint: 1");
try{
System.out.println("Aprint: wait");
lock.wait();
}catch(InterruptedException e){
e.printStackTrace();
}
System.out.println("Aprint: 2");
System.out.println("Aprint: 3");
}
}
});
Thread B = new Thread(new Runnable() {
public void run() {
synchronized (lock) {
System.out.println("Bprint: 1");
System.out.println("Bprint: 2");
System.out.println("Bprint: 3");
lock.notify();
}
}
});
A.start();
B.start();
}
public static void main(String[] args) {
ThreadDemo3();
}
运行得到的结果:
Aprint: 1
Aprint: wait
Bprint: 1
Bprint: 2
Bprint: 3
Aprint: 2
Aprint: 3
分析:
- 首先建立一个A和B共享的对象锁 lock = new Object();
- 当A得到锁后,先打印1,然后调用 lock.wait()方法,交出锁的控制权,进入wait状态;
- 对B而言,由于A最开始得到了锁,导致B无法执行;直到A调用 lock.wait()释放控制权后,B才得到了锁;
- 在B得到锁后,B开始打印1,2,3;然后调用 lock.notify()方法,唤醒正在wait的A;
- A被唤醒后,继续打印剩下的2,3。
wait():使调用该方法的线程释放共享资源锁,然后从运行状态退出,进入等待队列,直到被再次唤醒。
notify():随机唤醒等待队列中等待同一共享资源的一个线程,并使该线程退出等待队列,进入可运行状态,也就是notify()方法仅通知一个线程。
4. 四个线程 A B C D,其中 D 要等到 A B C 全执行完毕后才执行,而且 A B C 是同步运行的。
如果使用 thread.join(),可以让一个线程等另一个线程运行完毕后再继续执行,那我们可以在 D 线程里依次 join A B C,不过这也就使得 A B C 必须依次执行,而我们要的是这三者能同步运行。
我们希望达到的目的是:A B C 三个线程同时运行,各自独立运行完后通知 D;对 D 而言,只要 A B C 都运行完了,D 再开始运行。针对这种情况,我们可以利用 CountdownLatch 来实现这类通信方式。它的基本用法是:
- 创建一个计数器,设置初始值,CountDownLatch countDownLatch = new CountDownLatch(2);
- 在等待线程里调用countDownLatch.await()方法,进入等待状态,直到计数器变成0;
- 在其他线程里,调用countDownLatch.countDown()方法,该方法会将计数值减少1;
- 在其他线程里countDown()方法把计数值变成0时,等待线程里的countDownLatch.await()立即退出,继续执行下面的代码。
private static void runDAfterABC() {
int worker = 3;
final CountDownLatch countDownLatch = new CountDownLatch(worker);
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("D is waiting for ABC");
try {
countDownLatch.await();
System.out.println("All done, D starts working");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
for (char threadName='A'; threadName <= 'C'; threadName++) {
final String tN = String.valueOf(threadName);
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(tN + " is working");
try {
Thread.sleep(100);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(tN + " finished");
countDownLatch.countDown();
}
}).start();
}
}
public static void main(String[] args) {
runDAfterABC();
}
运行结果:
D is waiting for ABC
A is working
C is working
B is working
C finished
B finished
A finished
All done, D starts working
分析:
CountDownLatch可以用来倒计时,但当计数完毕,只有一个线程的await()会得到响应,无法让多个线程同时触发。 【CountDownLatch 适用于一个线程去等待多个线程的情况。】
5. 线程 A B C 各自开始准备,直到三者都准备完毕,然后再同时运行 。
为了实现线程间互相等待这种需求,我们可以利用 CyclicBarrier 数据结构。
它的基本用法是:
- 先创建一个公共CyclicBarrier对象,设置同时等待的线程数,CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
- 这些线程同时开始自己做准备,自身准备完毕后,需要等待别人准备完毕,这是调用cyclicBarrier.await(),即可开始等待别人;
- 当指定的同时等待的线程数都调用了cyclicBarrier.await()时,意味着这些线程都准备完毕,然后这些线程才同时继续执行。
public static void runABCWhenAllReady(){
int runner = 3;
final CyclicBarrier cyclicBarrier = new CyclicBarrier(runner);
final Random random = new Random();
for ( char runnername = 'A' ;runnername <= 'C' ; runnername++ ){
final String rN = String.valueOf(runnername);
new Thread(new Runnable(){
@Override
public void run() {
long prepareTime = random.nextInt(10000)+100;
System.out.println(rN + " is preparing for time:"+prepareTime);
try{
Thread.sleep(prepareTime);
}catch(Exception e){
e.printStackTrace();
}
try{
System.out.println(rN + " is preparing,waiting for others");
//当前线程准备完毕后,等待别人准备好
cyclicBarrier.await();
}catch(InterruptedException e){
e.printStackTrace();
}catch(BrokenBarrierException e){
e.printStackTrace();
}
//所有线程都准备好了,准备一起跑
System.out.println(rN+" starts running");
}
}).start();;
}
}
public static void main(String[] args) {
runABCWhenAllReady();
}
运行结果:
A is preparing for time:1150
C is preparing for time:10096
B is preparing for time:4127
A is preparing,waiting for others
B is preparing,waiting for others
C is preparing,waiting for others
C starts running
A starts running
B starts running
6. 子线程完成某件任务后,把得到的结果回传给主线程。
可以利用接口类Callable
private static void doTaskWithResultInWorker() {
Callable<Integer> callable = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
System.out.println("Task starts");
Thread.sleep(1000);
int result = 0;
for (int i=0; i<=100; i++) {
result += i;
}
System.out.println("Task finished and return result");
return result;
}
};
FutureTask<Integer> futureTask = new FutureTask(callable);
new Thread(futureTask).start();
try {
System.out.println("Before futureTask.get()");
System.out.println("Result: " + futureTask.get());
System.out.println("After futureTask.get()");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
doTaskWithResultInWorker();
}
运行结果:
Before futureTask.get()
Task starts
Task finished and return result
Result: 5050
After futureTask.get()
分析:
Runnable源码
@FunctionalInterface
public interface Runnable {
/**
* When an object implementing interface <code>Runnable</code> is used
* to create a thread, starting the thread causes the object's
* <code>run</code> method to be called in that separately executing
* thread.
* <p>
* The general contract of the method <code>run</code> is that it may
* take any action whatsoever.
*
* @see java.lang.Thread#run()
*/
public abstract void run();
}
Callable源码
@FunctionalInterface
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
Callable与Runnable最大的区别就是返回一泛型V结果。
那么,如何把子线程的结果回传回来呢?在Java里,有个类配合Callable使用:FutureTask,不过需注意,它获取结果的get方法会阻塞主线程。
public class FutureTask<V> implements RunnableFuture<V> {
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
private Callable<V> callable;
/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes
/** The thread running the callable; CASed during run() */
private volatile Thread runner;
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
/**
* @throws CancellationException {@inheritDoc}
*/
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
@SuppressWarnings("unchecked")
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
}
从运行结果可以看出,主线程调用futureTask.get()方法时阻塞主线程;然后Callable内部开始执行,并返回运行结果;此时futureTask.get()得到结果,主线程恢复运行。
通过FutureTask和Callable可以直接在主线程获得子线程对的运算结果,只不过需要阻塞主线程。当然不希望阻塞主线程,可以考虑利用ExecutorService,把FutureTask放到线程池去管理执行。
网友评论