线程串行并行调度实现
问题描述
问题描述:线程A、B、C并行执行,然后和线程D串行执行,如何实现。
问题具体化:现在有A、B、C三个线程,每一个线程分别完成打印0~4的任务,有一个线程D,在A、B、C完成打印之后打印“Hello world!”。
解决思路
我们都知道线程就是用来实现并发执行的,而要实现的结果是在A、B、C三个线程并行执行完之后才执行线程D的任务,那么就需要获取等到A、B、C三个线程的结果,之后再调用线程D的执行方法,就需要想办法让线程D暂不执行,等待A、B、C三个线程执行完才执行,根据这个思路,我们可以想到的解决方案方法如下:
- 通过一些flag变量控制判断A、B、C三个线程是否执行完
这是一个理解起来最简单也是最原始的方法,通过不断循环判断A、B、C三个线程对应的标识符是否完成执行来决定是否该执行线程D。
- 在线程D执行前阻塞线程,等待A、B、C三个线程执行完在执行
这个思路和上面差不多,只不过一个是循环判断,一个是阻塞线程。阻塞线程的方法有很多,sleep了,上锁了之类的,但是我们除了阻塞线程外,还需要有一个时机去唤醒阻塞,这个时机的触发点就是A、B、C三个线程执行完,综合考虑,能满足这样条件的阻塞方式有以下几种:
1、thread.join() 阻塞等待当前线程执行完
2、利用FeatureTask实现的线程,通过get()方法可以阻塞等待线程结果返回
3、CountDownLatch,通过闭锁计数器的方式,通过await方法阻塞线程,在A、B、C三个线程执行完之后减少计数,唤醒阻塞
4、CyclicBarrier,栅栏原理,通过其await方法阻塞线程,在A、B、C三个线程执行完之后触发到达栅栏数,唤醒阻塞
以上就是能想到的实现方式,接下来一个一个用代码的方式实现并简要介绍其原理和特性。
解决方案及原理分析
通过flag变量控制
- 代码实现
//线程类
class MyRunnable implements Runnable {
Flag flag;//变量
CountDownLatch countDownLatch;//闭锁
CyclicBarrier cyclicBarrier;//栅栏
/**
* 变量控制的方法
*
* @param flag
*/
public MyRunnable(Flag flag) {
this.flag = flag;
}
/**
* 通过其他阻塞的方法时调用
*/
public MyRunnable() {
}
/**
* 通过CountDownLatch的方式
*/
public MyRunnable(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
/**
* 通过CyclicBarrier的方式
*/
public MyRunnable(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
for (int i = 0; i < 5; i++) {
try {
Thread.sleep(new Random().nextInt(100));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("当前线程:" + Thread.currentThread().getName() + " >> " + i);
}
if (countDownLatch != null) {//通过闭锁方式
countDownLatch.countDown();
}
if (cyclicBarrier != null) {//通过栅栏方式
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
if (flag != null) {//通过flag变量方式
flag.setFlag(true);
System.out.println("当前线程:" + Thread.currentThread().getName() + " >> " + flag.isFlag());
}
}
}
class Flag {
boolean flag = false;
public void setFlag(boolean flag) {
this.flag = flag;
}
public boolean isFlag() {
return flag;
}
}
/**
* 线程调度方法1 flag变量控制
* Created by anonyper on 2019/10/17.
*/
public class ThreadTest {
Flag flagA = new Flag();//考虑一下这里为什么用一个对象包装一个boolean而不是直接用boolean对象来传递呢
Flag flagB = new Flag();
Flag flagC = new Flag();
public void testThread() {
Thread threadA = new Thread(new MyRunnable(flagA), "A");
Thread threadB = new Thread(new MyRunnable(flagB), "B");
Thread threadC = new Thread(new MyRunnable(flagC), "C");
Thread threadD = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("当前线程:" + Thread.currentThread().getName() + " >> Hello World!");
}
}, "D");
threadA.start();
threadB.start();
threadC.start();
while (!flagA.isFlag() || !flagB.isFlag() || !flagC.isFlag()) {
Thread.yield();//释放CPU资源
//this.wait(0L); //这样也可以等待 释放CPU资源
//sleep(0L);//这样也可以等待 释放CPU资源
}
threadD.start();
}
public static void main(String[] args) {
new ThreadTest().testThread();
}
}
//执行结果:
当前线程:A >> 0
当前线程:B >> 0
当前线程:C >> 0
当前线程:A >> 1
当前线程:A >> 2
当前线程:B >> 1
当前线程:C >> 1
当前线程:B >> 2
当前线程:A >> 3
当前线程:B >> 3
当前线程:A >> 4
当前线程:A >> true
当前线程:B >> 4
当前线程:B >> true
当前线程:C >> 2
当前线程:C >> 3
当前线程:C >> 4
当前线程:C >> true
当前线程:D >> Hello World!
- 原理分析
通过wihle循环以及变量控制,让当前线程等待A、B、C三个线程执行完之后在执行D线程。这种方法是理解简单,但是实现挺麻烦的,线程越多要控制的变量就越多,非常不便。
注意实现Runnable中参数传递,这里涉及到值传递和对象传递的知识点。
Thread.join方法
- 代码实现
/**
* 线程调度方法1 flag变量控制
* Created by anonyper on 2019/10/17.
*/
public class ThreadTest {
public void testThread() {
Thread threadA = new Thread(new MyRunnable(), "A");
Thread threadB = new Thread(new MyRunnable(), "B");
Thread threadC = new Thread(new MyRunnable(), "C");
Thread threadD = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("当前线程:" + Thread.currentThread().getName() + " >> Hello World!");
}
}, "D");
threadA.start();
threadB.start();
threadC.start();
try {
threadA.join();//阻塞等待线程A运行完
threadB.join();//阻塞等待线程B运行完
threadC.join();//阻塞等待线程C运行完
} catch (InterruptedException e) {
e.printStackTrace();
}
threadD.start();
}
public static void main(String[] args) {
new ThreadTest().testThread();
}
}
//执行结果
当前线程:A >> 0
当前线程:C >> 0
当前线程:B >> 0
当前线程:B >> 1
当前线程:A >> 1
当前线程:C >> 1
当前线程:B >> 2
当前线程:C >> 2
当前线程:A >> 2
当前线程:C >> 3
当前线程:B >> 3
当前线程:C >> 4
当前线程:A >> 3
当前线程:A >> 4
当前线程:B >> 4
当前线程:D >> Hello World!
- 原理分析
线程的join方法会阻塞等待当前线程执行完成,其源代码如下:
public final void join() throws InterruptedException {
this.join(0L);
}
public final synchronized void join(long var1) throws InterruptedException {
long var3 = System.currentTimeMillis();//当前时间
long var5 = 0L;
if (var1 < 0L) {//等待时间不能小于0
throw new IllegalArgumentException("timeout value is negative");
} else {
if (var1 == 0L) {//如果为零,则等待执行完成
while(this.isAlive()) {//判断线程是否激活,涉及到线程状态,执行完之后返回false
this.wait(0L);//等待0秒后唤醒
}
} else {//等待一定时间后判断是否执行完
while(this.isAlive()) {
long var7 = var1 - var5;
if (var7 <= 0L) {
break;
}
this.wait(var7);
var5 = System.currentTimeMillis() - var3;
}
}
}
}
我们看到join方法如果不传时间则默认等待线程执行完,但是其过程可能会被Interrupted,所以使用该方法需要针对异常做好判断。
FeatureTask的get方法
- 代码实现
/**
* 实现Callable接口,重写call方法
*/
class MyCall implements Callable<Boolean> {
@Override
public Boolean call() {
for (int i = 0; i < 5; i++) {
try {
Thread.sleep(new Random().nextInt(100));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("当前线程:" + Thread.currentThread().getName() + " >> " + i);
}
return true;
}
}
/**
* 线程调度方法1 flag变量控制
* Created by anonyper on 2019/10/17.
*/
public class ThreadTest {
public void testThread() {
FutureTask futureTaskA = new FutureTask(new MyCall());
FutureTask futureTaskB = new FutureTask(new MyCall());
FutureTask futureTaskC = new FutureTask(new MyCall());
Thread threadA = new Thread(futureTaskA, "A");
Thread threadB = new Thread(futureTaskB, "B");
Thread threadC = new Thread(futureTaskC, "C");
Thread threadD = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("当前线程:" + Thread.currentThread().getName() + " >> Hello World!");
}
}, "D");
threadA.start();
threadB.start();
threadC.start();
try {
futureTaskA.get();
futureTaskB.get();
futureTaskC.get();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
threadD.start();
}
public static void main(String[] args) {
new ThreadTest().testThread();
}
}
//运行结果
当前线程:A >> 0
当前线程:C >> 0
当前线程:B >> 0
当前线程:A >> 1
当前线程:A >> 2
当前线程:C >> 1
当前线程:A >> 3
当前线程:B >> 1
当前线程:C >> 2
当前线程:C >> 3
当前线程:B >> 2
当前线程:A >> 4
当前线程:B >> 3
当前线程:C >> 4
当前线程:B >> 4
当前线程:D >> Hello World!
- 原理分析
1、Callable接口
public interface Callable<V> {
V call() throws Exception;
}
2、Future接口
public interface Future<V> {
boolean cancel(boolean var1);//取消
boolean isCancelled();//是否取消
boolean isDone();//是否完成
V get() throws InterruptedException, ExecutionException;//返回结果
V get(long var1, TimeUnit var3) throws InterruptedException, ExecutionException, TimeoutException;//等待具体时间内返回结果
}
2、RunnableFuture接口实现了Runnable、Future接口
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
3、FutureTask实现了RunnableFuture接口,构造函数接受一个Callable对象
public class FutureTask<V> implements RunnableFuture<V> {
private Callable<V> callable;
public FutureTask(Callable<V> var1) {
if (var1 == null) {
throw new NullPointerException();
} else {
this.callable = var1;
this.state = 0;
}
}
public void run() {
...
Callable var1 = this.callable;
var2 = var1.call;
...
}
public V get() throws InterruptedException, ExecutionException {//get方法
int var1 = this.state;
if (var1 <= COMPLETING) {//状态未完成时等待完成
var1 = this.awaitDone(false, 0L);
}
return this.report(var1);//完成后返回结果
}
private int awaitDone(boolean var1, long var2) throws InterruptedException {
long var4 = var1 ? System.nanoTime() + var2 : 0L;
FutureTask.WaitNode var6 = null;
boolean var7 = false;
while(!Thread.interrupted()) {
int var8 = this.state;
if (var8 > COMPLETING) {//完成或被打断,返回结果
if (var6 != null) {
var6.thread = null;
}
return var8;
}
if (var8 == COMPLETING) {//快要完成了,等待一会
Thread.yield();
} else if (var6 == null) {
var6 = new FutureTask.WaitNode();
} else if (!var7) {
var7 = UNSAFE.compareAndSwapObject(this, waitersOffset, var6.next = this.waiters, var6);
} else if (var1) {//有等待时间限制
var2 = var4 - System.nanoTime();
if (var2 <= 0L) {
this.removeWaiter(var6);
return this.state;
}
LockSupport.parkNanos(this, var2);
} else {
LockSupport.park(this);//通过LockSupport.park方法阻塞
}
}
//最后没有等待结果时被唤醒(LockSupport.uppark())抛出打断异常
this.removeWaiter(var6);
throw new InterruptedException();
}
4、FutureTask对象作为runnable实现类,传入Thread中,调用run方法调用的是callable的call方法,get方法在call方法没有返回时阻塞线程,等待结果返回。
该方法学会了理解起来也简单,但是相对写的类比较多,如果比较在意线程结果返回值来做判断条件的话,可以使用。
CountDownLatch实现
- 代码实现
//MyRunnable的代码
/**
* 通过CountDownLatch的方式
*/
public MyRunnable(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
...
if (countDownLatch != null) {//完成后减1
countDownLatch.countDown();
}
...
}
public class ThreadTest {
public void testThread() {
CountDownLatch countDownLatch = new CountDownLatch(3);//闭锁的数量 三个线程
//MyRunnable的实现见第一种方式代码,此处不在重复
Thread threadA = new Thread(new MyRunnable(countDownLatch), "A");
Thread threadB = new Thread(new MyRunnable(countDownLatch), "B");
Thread threadC = new Thread(new MyRunnable(countDownLatch), "C");
Thread threadD = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("当前线程:" + Thread.currentThread().getName() + " >> Hello World!");
}
}, "D");
threadA.start();
threadB.start();
threadC.start();
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
threadD.start();
}
public static void main(String[] args) {
new ThreadTest().testThread();
}
}
//执行结果
当前线程:A >> 0
当前线程:B >> 0
当前线程:C >> 0
当前线程:A >> 1
当前线程:B >> 1
当前线程:A >> 2
当前线程:A >> 3
当前线程:C >> 1
当前线程:B >> 2
当前线程:A >> 4
当前线程:C >> 2
当前线程:B >> 3
当前线程:C >> 3
当前线程:B >> 4
当前线程:C >> 4
当前线程:D >> Hello World!
- 原理分析
CountDownLatch的核心方法有三个:
1、构造方式传入一个计数数量
private final CountDownLatch.Sync sync;
public CountDownLatch(int var1) {
if (var1 < 0) {
throw new IllegalArgumentException("count < 0");
} else {
this.sync = new CountDownLatch.Sync(var1);
}
}
2、countDown计数减1
public void countDown() {
this.sync.releaseShared(1);
}
3、await阻塞等待,还有一个方法是await(long var1, TimeUnit var3),等待具体的时间
public void await() throws InterruptedException {
this.sync.acquireSharedInterruptibly(1);
}
public boolean await(long var1, TimeUnit var3) throws InterruptedException {
return this.sync.tryAcquireSharedNanos(1, var3.toNanos(var1));
}
这三个方法都关联一个类:Sync,该类继承AbstractQueuedSynchronizer类,简称AQS,AQS是用来构建锁或者其他同步组件(信号量、事件等)的基础框架类,JDK中许多并发工具类的内部实现都依赖于AQS,如ReentrantLock, Semaphore, CountDownLatch等等,具体源码此处不做分析。
CountDownLatch简单理解就是await方法阻塞线程,在等待计数为0时唤醒等待,减少计数的方法可以是在不同的线程中,也可以在同一个线程中。使用也比较简单,理解也容易。
CyclicBarrier实现
- 代码实现
//MyRunnable中的代码
/**
* 通过CyclicBarrier的方式
*/
public MyRunnable(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
...
if (cyclicBarrier != null) {//通过栅栏方式
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
...
}
public class ThreadTest {
public void testThread() {
CyclicBarrier cyclicBarrier = new CyclicBarrier(4);//这个地方是4,不是3
Thread threadA = new Thread(new MyRunnable(cyclicBarrier), "A");
Thread threadB = new Thread(new MyRunnable(cyclicBarrier), "B");
Thread threadC = new Thread(new MyRunnable(cyclicBarrier), "C");
Thread threadD = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("当前线程:" + Thread.currentThread().getName() + " >> Hello World!");
}
}, "D");
threadA.start();
threadB.start();
threadC.start();
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
threadD.start();
}
public static void main(String[] args) {
new ThreadTest().testThread();
}
}
//执行结果
当前线程:B >> 0
当前线程:B >> 1
当前线程:C >> 0
当前线程:B >> 2
当前线程:C >> 1
当前线程:B >> 3
当前线程:B >> 4
当前线程:A >> 0
当前线程:C >> 2
当前线程:C >> 3
当前线程:A >> 1
当前线程:C >> 4
当前线程:A >> 2
当前线程:A >> 3
当前线程:A >> 4
当前线程:D >> Hello World!
- 原理分析
CyclicBarrier是通过ReentrantLock锁来实现的,具体源码就不分析了,其里面也有几个重要的方法:
1、构造方法
public CyclicBarrier(int var1){//传入等待条件数
...
}
2、等待方法,当wait方法调用次数达到设定的次数之后,统一唤醒所有等待地方。
public int await() throws InterruptedException, BrokenBarrierException {
try {
return this.dowait(false, 0L);
} catch (TimeoutException var2) {
throw new Error(var2);
}
}
CyclicBarrier和CountDownLatch有点类似,但是也有点不一样,用在跑步比赛上的区别就是:
1、CountDownLatch 计数减到0之后,阻塞的地方就可以开始跑了(阻塞了一个地方)
2、CyclicBarrier 当有运动员没有准备好(调用await方法)时,其他的运动员都等着(await阻塞),只有都准备好了再开始跑。
3、CountDownLatch计数只能用一次,CyclicBarrier可以循环使用。
所以用CyclicBarrier的实现思路,相当于让A、B、C三个线程阻塞在最后一步,然后线程D就绪,A、B、C三个线程收个尾之后线程D开始运行,严格意义上并不是A、B、C三个线程和D串行,在理论上实现了串行。
总结
以上就是我能想到的实现A、B、C三个线程并发然后和线程D串行执行的几种方法,代码都以贴出。如果有其他方法,欢迎提出补充。
网友评论