在公司一个数据导入的场景中因为需要导入的数据量非常大,在本地导入一次需要十几分钟,估算线上导入的时间会翻倍,为了缩短导入时间,需要使用并发,但是导入完成后需要给用户反馈,而反馈代码又写在主线程中,所以就需要,在线程启动后,主线程挂起,在所有线程完成后,主线程恢复执行。
问题提出后,心中有了个大致方案,并且在测试后能够顺利执行,但是在TestCase完成后,发现JDK 1.5中就有相似的场景解决方案,所以学习研究了一番,再次记录,并分享给大家
原有方案
原有的解决方案是利用以下这几个包来实现的,通过Condition锁和while(true)的等待通知模式,实现了主线程的挂起和恢复
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
主线程 挂起机制
主线程通过while(true) 和 index计数 实现了不停的挂起的操作
lock.lock();
int index = 0;
while (true) {
// threaNum 线程数量
if (index < threadNum) {
condition.await();
} else {
break;
}
index++;
}
lock.unlock();
线程 唤醒机制
在执行的线程最后执行唤醒机制,唤醒主线程,
lock.lock();
condition.signalAll();
lock.unlock();
注
通过线程的唤醒,和主线程的挂起操作,主线程不停被唤醒,然后再次挂起,直到最后一个线程唤醒主线程,index = threadNum 主线程跳出循环,继续执行
JDK 原生方案
JDK 1.5中 提供了CountDownLatch这个并发工具类,解决了多线程并发,主线程等待最后执行的效果。
构造器
// count 为 线程数量
public CountDownLatch(int count)
常用方法
// //调用此方法的线程会被挂起,直到count值为0才继续执行
public void await();
// 在等待timeOut时间后,如果count值还没到0 立即执行当前线程
public boolean await(long timeout, TimeUnit unit)
// count减一
public void countDown()
案例
public class TestCase {
public static void main(String[] args) {
// 创建对象,并声明有2个线程需要执行
final CountDownLatch countDownLatch= new CountDownLatch(2);
new Thread(){
public void run() {
try {
System.out.println("我是小弟1:"+Thread.currentThread().getName()+"正在执行");
Thread.sleep(3000);
System.out.println("我是小弟1:"+Thread.currentThread().getName()+"执行完毕");
countDownLatch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
};
}.start();
new Thread(){
public void run() {
try {
System.out.println("我是小弟2:"+Thread.currentThread().getName()+"正在执行");
Thread.sleep(3000);
System.out.println("我是小弟2:"+Thread.currentThread().getName()+"执行完毕");
countDownLatch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
};
}.start();
try {
System.out.println("等待2小弟干活呢...");
countDownLatch.await();
System.out.println("2个小弟干完了");
System.out.println("老大我要继续干活了");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
执行结果
我是小弟1:Thread-0正在执行
我是小弟2:Thread-1正在执行
等待2小弟干活呢...
我是小弟1:Thread-0执行完毕
我是小弟2:Thread-1执行完毕
2个小弟干完了
老大我要继续干活了
注
大家可以发现 使用CountDownLatch 类来解决问题,更简洁和方便,不需要在写额外的循环和锁机制
concurrent中其他有趣和实用的工具类
CyclicBarrier
CyclicBarrier翻译回环栅栏,实现的功能是让多个线程运行到某一标志点后挂起,当所有线程都到此标志位后再一起运行,类似起跑线,当所有人都各就位后才能唤醒,开始奔跑。
应用场景 - 多个线程做任务,等到达集合点同步后交给后面的线程做汇总
构造器
// count 指明多少个线程要到达特定标志
public CyclicBarrier(int count) {}
// barrierAction为当所有线程到特定标志位后执行的内容
public CyclicBarrier(int count, Runnable barrierAction) {}
方法
// 挂起当前线程,知道所有线程到达标志位,在执行
public int await() ;
// 挂起timeOut时间,如果所有线程还未到位,则到位的线程直接继续执行
public int await(long timeout, TimeUnit unit);
案例1-指定线程数
public class Test {
public static void main(String[] args) {
int N = 4;
CyclicBarrier barrier = new CyclicBarrier(N);
for(int i=0;i<N;i++) new Writer(barrier).start();
}
static class Writer extends Thread{
private CyclicBarrier cyclicBarrier;
public Writer(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据...");
try {
Thread.sleep(5000); //以睡眠来模拟写入数据操作
System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕");
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
}catch(BrokenBarrierException e){
e.printStackTrace();
}
System.out.println("所有线程写入完毕,继续处理其他任务...");
}
}
}
运行结果
线程Thread-0正在写入数据...
线程Thread-3正在写入数据...
线程Thread-2正在写入数据...
线程Thread-1正在写入数据...
线程Thread-2写入数据完毕,等待其他线程写入完毕
线程Thread-0写入数据完毕,等待其他线程写入完毕
线程Thread-3写入数据完毕,等待其他线程写入完毕
线程Thread-1写入数据完毕,等待其他线程写入完毕
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...
案例2-指定执行内容
public class Test {
public static void main(String[] args) {
int N = 4;
CyclicBarrier barrier = new CyclicBarrier(N,new Runnable() {
@Override
public void run() {
System.out.println("当前线程"+Thread.currentThread().getName());
}
});
for(int i=0;i<N;i++) new Writer(barrier).start();
}
static class Writer extends Thread{
private CyclicBarrier cyclicBarrier;
public Writer(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据...");
try {
Thread.sleep(5000); //以睡眠来模拟写入数据操作
System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕");
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
}catch(BrokenBarrierException e){
e.printStackTrace();
}
System.out.println("所有线程写入完毕,继续处理其他任务...");
}
}
}
运行结果
线程Thread-0正在写入数据...
线程Thread-1正在写入数据...
线程Thread-2正在写入数据...
线程Thread-3正在写入数据...
线程Thread-0写入数据完毕,等待其他线程写入完毕
线程Thread-1写入数据完毕,等待其他线程写入完毕
线程Thread-2写入数据完毕,等待其他线程写入完毕
线程Thread-3写入数据完毕,等待其他线程写入完毕
当前线程Thread-3
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...
注
从结果可以看出,当四个线程都到达barrier状态后,会从四个线程中选择一个线程去执行Runnable。
案例3-CyclicBarrier重用
public class Test {
public static void main(String[] args) {
int N = 4;
CyclicBarrier barrier = new CyclicBarrier(N);
for(int i=0;i<N;i++) {
new Writer(barrier).start();
}
try {
Thread.sleep(25000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("CyclicBarrier重用");
for(int i=0;i<N;i++) {
new Writer(barrier).start();
}
}
static class Writer extends Thread{
private CyclicBarrier cyclicBarrier;
public Writer(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据...");
try {
Thread.sleep(5000); //以睡眠来模拟写入数据操作
System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕");
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
}catch(BrokenBarrierException e){
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"所有线程写入完毕,继续处理其他任务...");
}
}
}
运行结果
线程Thread-0正在写入数据...
线程Thread-1正在写入数据...
线程Thread-3正在写入数据...
线程Thread-2正在写入数据...
线程Thread-1写入数据完毕,等待其他线程写入完毕
线程Thread-3写入数据完毕,等待其他线程写入完毕
线程Thread-2写入数据完毕,等待其他线程写入完毕
线程Thread-0写入数据完毕,等待其他线程写入完毕
Thread-0所有线程写入完毕,继续处理其他任务...
Thread-3所有线程写入完毕,继续处理其他任务...
Thread-1所有线程写入完毕,继续处理其他任务...
Thread-2所有线程写入完毕,继续处理其他任务...
CyclicBarrier重用
线程Thread-4正在写入数据...
线程Thread-5正在写入数据...
线程Thread-6正在写入数据...
线程Thread-7正在写入数据...
线程Thread-7写入数据完毕,等待其他线程写入完毕
线程Thread-5写入数据完毕,等待其他线程写入完毕
线程Thread-6写入数据完毕,等待其他线程写入完毕
线程Thread-4写入数据完毕,等待其他线程写入完毕
Thread-4所有线程写入完毕,继续处理其他任务...
Thread-5所有线程写入完毕,继续处理其他任务...
Thread-6所有线程写入完毕,继续处理其他任务...
Thread-7所有线程写入完毕,继续处理其他任务...
注
从执行结果可以看出,在初次的4个线程越过barrier状态后,又可以用来进行新一轮的使用。而CountDownLatch无法进行重复使用。
Semaphore
Semaphore翻译成字面意思为 信号量,信号量就是可以声明多把锁(包括一把锁:此时为互斥信号量)。
举个例子:一个房间如果只能容纳5个人,多出来的人必须在门外面等着。如何去做呢?一个解决办法就是:房间外面挂着五把钥匙,每进去一个人就取走一把钥匙,没有钥匙的不能进入该房间而是在外面等待。每出来一个人就把钥匙放回原处以方便别人再次进入。
应用场景 - 流量控制,即控制能够访问的最大线程数。
构造器
//参数permits表示许可数目,即同时可以允许多少线程进行访问
public Semaphore(int permits) {}
//这个多了一个参数fair表示是否是公平的,即等待时间越久的越先获取许可,默认是非公平的
public Semaphore(int permits, boolean fair) {}
常用方法
//获取一个许可,若无许可能够获得,则会一直等待,直到获得许可
public void acquire();
//获取x个许可
public void acquire(int x);
//释放一个许可,注意,在释放许可之前,必须先获获得许可。
public void release() ;
//释放x个许可
public void release(int x) ;
以上4个方法都会被阻塞,如果想立即得到执行结果,可以使用下面几个方法
//尝试获取一个许可,若获取成功,则立即返回true,若获取失败,则立即返回false
public boolean tryAcquire()
//尝试获取一个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false
public boolean tryAcquire(long timeout, TimeUnit unit)
//尝试获取permits个许可,若获取成功,则立即返回true,若获取失败,则立即返回false
public boolean tryAcquire(int permits)
//尝试获取permits个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
注
另外还可以通过availablePermits()方法得到可用的许可数目。
案例
假若一个工厂有5台机器,但是有8个工人,一台机器同时只能被一个工人使用,只有使用完了,其他工人才能继续使用。那么我们就可以通过Semaphore来实现
public class Test {
public static void main(String[] args) {
int N = 8; //工人数
Semaphore semaphore = new Semaphore(5); //机器数目
for(int i=0;i<N;i++) new Worker(i,semaphore).start();
}
static class Worker extends Thread{
private int num;
private Semaphore semaphore;
public Worker(int num,Semaphore semaphore){
this.num = num;
this.semaphore = semaphore;
}
@Override
public void run() {
try {
semaphore.acquire();
System.out.println("工人"+this.num+"占用一个机器在生产...");
Thread.sleep(2000);
System.out.println("工人"+this.num+"释放出机器");
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
运行结果
工人0占用一个机器在生产...
工人1占用一个机器在生产...
工人2占用一个机器在生产...
工人4占用一个机器在生产...
工人5占用一个机器在生产...
工人0释放出机器
工人2释放出机器
工人3占用一个机器在生产...
工人7占用一个机器在生产...
工人4释放出机器
工人5释放出机器
工人1释放出机器
工人6占用一个机器在生产...
工人3释放出机器
工人7释放出机器
工人6释放出机器
注
1. CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不同:
- CountDownLatch一般用于某个线程A等待若干个其他线程执行完任务之后,它才执行;
- 而CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行;
- CountDownLatch是不能够重用的,而CyclicBarrier是可以重用的。
2. Semaphore和锁类似,一般用于控制对某组资源的访问权限。
Phaser
Phaser是更加复杂和强大的同步辅助类。它允许并发执行多阶段任务。当我们有并发任务并且需要分解成几步执行时(CyclicBarrier是分成两步)就可以选择使用Phaser。
Phaser类机制是在每一步结束的位置对线程进行同步,当所有的线程都完成了这一步,才允许执行下一步。
跟其他同步工具一样,必须对Phaser类中参与同步操作的任务数进行初始化,不同的是,可以动态的增加或者减少任务数。
Phaser状态
- 活跃态:当存在参与同步的线程的时候,Phaser就是活跃的,并且在每个阶段结束的时候进行同步。
- 终止态:当所有的线程都取消注册的时候,Phaser就处于终止态,此时Phaser没有任何参与者。
常用方法
// 类似于CyclicBarrier的await()方法,等待其它线程都到来之后同步继续执行
arriveAndAwaitAdvance()
// 把执行到此的线程从Phaser中注销掉
arriveAndDeregister()
// 判断Phaser是否终止
isTerminated()
// 将一个新的参与者注册到Phaser中,这个新的参与者将被当成没有执行完本阶段的线程
register()
// 强制Phaser进入终止态
forceTermination()
案例
使用Phaser类同步三个并发任务。这三个任务将在三个不同的文件夹及其子文件夹中查找过去24小时内修改过扩展为为.log的文件。这个任务分成以下三个步骤:
1. 在执行的文件夹及其子文件夹中获取扩展名为.log的文件
2. 对每一步的结果进行过滤,删除修改时间超过24小时的文件
3. 将结果打印到控制台
在第一步和第二步结束的时候,都会检查所查找到的结果列表是不是有元素存在。如果结果列表是空的,对应的线程将结束执行,并从Phaser中删除。(也就是动态减少任务数)
文件查找类
public class FileSearch implements Runnable {
private String initPath;
private String end;
private List<String> results;
private Phaser phaser;
public FileSearch(String initPath, String end, Phaser phaser) {
this.initPath = initPath;
this.end = end;
this.phaser=phaser;
results=new ArrayList<>();
}
@Override
public void run() {
phaser.arriveAndAwaitAdvance();//等待所有的线程创建完成,确保在进行文件查找的时候所有的线程都已经创建完成了
System.out.printf("%s: Starting.\n",Thread.currentThread().getName());
// 1st Phase: 查找文件
File file = new File(initPath);
if (file.isDirectory()) {
directoryProcess(file);
}
// 如果查找结果为false,那么就把该线程从Phaser中移除掉并且结束该线程的运行
if (!checkResults()){
return;
}
// 2nd Phase: 过滤结果,过滤出符合条件的(一天内的)结果集
filterResults();
// 如果过滤结果集结果是空的,那么把该线程从Phaser中移除,不让它进入下一阶段的执行
if (!checkResults()){
return;
}
// 3rd Phase: 显示结果
showInfo();
phaser.arriveAndDeregister();//任务完成,注销掉所有的线程
System.out.printf("%s: Work completed.\n",Thread.currentThread().getName());
}
private void showInfo() {
for (int i=0; i<results.size(); i++){
File file=new File(results.get(i));
System.out.printf("%s: %s\n",Thread.currentThread().getName(),file.getAbsolutePath());
}
// Waits for the end of all the FileSearch threads that are registered in the phaser
phaser.arriveAndAwaitAdvance();
}
private boolean checkResults() {
if (results.isEmpty()) {
System.out.printf("%s: Phase %d: 0 results.\n",Thread.currentThread().getName(),phaser.getPhase());
System.out.printf("%s: Phase %d: End.\n",Thread.currentThread().getName(),phaser.getPhase());
//结果为空,Phaser完成并把该线程从Phaser中移除掉
phaser.arriveAndDeregister();
return false;
} else {
// 等待所有线程查找完成
System.out.printf("%s: Phase %d: %d results.\n",Thread.currentThread().getName(),phaser.getPhase(),results.size());
phaser.arriveAndAwaitAdvance();
return true;
}
}
private void filterResults() {
List<String> newResults=new ArrayList<>();
long actualDate=new Date().getTime();
for (int i=0; i<results.size(); i++){
File file=new File(results.get(i));
long fileDate=file.lastModified();
if (actualDate-fileDate<TimeUnit.MILLISECONDS.convert(1,TimeUnit.DAYS)){
newResults.add(results.get(i));
}
}
results=newResults;
}
private void directoryProcess(File file) {
// Get the content of the directory
File list[] = file.listFiles();
if (list != null) {
for (int i = 0; i < list.length; i++) {
if (list[i].isDirectory()) {
// If is a directory, process it
directoryProcess(list[i]);
} else {
// If is a file, process it
fileProcess(list[i]);
}
}
}
}
private void fileProcess(File file) {
if (file.getName().endsWith(end)) {
results.add(file.getAbsolutePath());
}
}
}
主函数
public static void main(String[] args) {
Phaser phaser = new Phaser(3);
FileSearch system = new FileSearch("C:\\Windows", "log", phaser);
FileSearch apps = new FileSearch("C:\\Program Files", "log", phaser);
FileSearch documents = new FileSearch("C:\\Documents And Settings", "log", phaser);
Thread systemThread = new Thread(system, "System");
systemThread.start();
Thread appsThread = new Thread(apps, "Apps");
appsThread.start();
Thread documentsThread = new Thread(documents, "Documents");
documentsThread.start();
try {
systemThread.join();
appsThread.join();
documentsThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.printf("Terminated: %s\n", phaser.isTerminated());
}
注
例子中Phaser分了三个步骤:查找文件、过滤文件、打印结果。并且在查找文件和过滤文件结束后对结果进行分析,如果是空的,将此线程从Phaser中注销掉。也就是说,下一阶段,该线程将不参与运行。
在run()方法中,开头调用了phaser的arriveAndAwaitAdvance()方法来保证所有线程都启动了之后再开始查找文件。在查找文件和过滤文件阶段结束之后,都对结果进行了处理。即:如果结果是空的,那么就把该条线程移除,如果不空,那么等待该阶段所有线程都执行完该步骤之后在统一执行下一步。最后,任务执行完后,把Phaser中的线程均注销掉。
Phaser其实有两个状态:活跃态和终止态。当存在参与同步的线程时,Phaser就是活跃的。并且在每个阶段结束的时候同步。当所有参与同步的线程都取消注册的时候,Phase就处于终止状态。在这种状态下,Phaser没有任务参与者。
Phaser主要功能就是执行多阶段任务,并保证每个阶段点的线程同步。在每个阶段点还可以条件或者移除参与者。主要涉及方法arriveAndAwaitAdvance()和register()和arriveAndDeregister()
网友评论