又到了天桥说书的时候了,今天来聊一聊多线程的一些东西。假如,我是说假如,突然有一天给了来了这样一个需求:有多组任务,组与组之间的任务是顺序执行的,组内的任务是并发执行的。文字说明如下:
组1: 任务A 任务B
组2:任务A1 任务B1 任务C1
组3: 任务A3
...
就像上面文字所描述的,组1的任务是第一个执行的,任务A和任务B是并发去执行的。组2的任务要等到组1的所有任务执行完毕之后,再去并发执行任务A1 任务B1 任务C1,就这样以此类推,一直往下执行。
是不是一听,就很简单呢,可以写个循环,遍历每一组,每一组的各个任务包装为一个Callable对象去扔到线程池去执行,并且将放入线程池的返回对象Future对象放入列表,循环遍历这个Future列表,但凡有一个future.get()是阻塞的,那么就说明任务没有执行完毕,那就继续等着,直到循环遍历完毕,就说明所有的组内任务执行完毕了。下一组的任务如法炮制。
不得不说,这是一个取巧的办法,就是一次执行一点点的任务,根据结果再来决定是否继续执行。但总感觉这种方法太low了,不够高大上。为啥要高大上呢?一切都是为了装逼。
我更希望的是这些任务一股脑的扔进线程池,然后根据某种机制,去保证上面要求的顺序。这样一来,题目的难度是不是就高了那么一点点呢。
首先说一下思路吧:线程间的等待,可以使用CountDownLatch来做。每个任务的包装类Callable均持有两个CountDownLatch的引用。我们分别叫他为prev和current吧。prev这个是上一组任务的CountDownLatch,上一组任务每次执行完毕,都countdown一次。prev这个就在执行方法体第一行进行await,当上一组所有方法执行完,该线程就不再阻塞,可以执行了。current是本组内的CountDownLatch,当前任务执行完毕,就countdown一次。
对应的伪代码如下:
public class TestCallable implements Callable {
private Data data;
private CountDownLatch prevCountDownLatch;
private CountDownLatch currentCountDownLatch;
public TestCallable(Data data,CountDownLatch prevCountDownLatch,CountDownLatch currentCountDownLatch) {
this.data = data;
this.prevCountDownLatch = prevCountDownLatch;
this.currentCountDownLatch = currentCountDownLatch;
}
@Override
public Object call() throws Exception e{
if(prevCountDownLatch != null){
prevCountDownLatch.await();
}
try {
//执行业务逻辑
}finally {
if(currentCountDownLatch != null){
currentCountDownLatch.countDown();
}
return result;
}
}
}
值得注意的是,构造哪个是当前的CountDownLatch,哪一个是上一个的CountDownLatch,在循环中,应该注意:
CountDownLatch prevCountDownLatch = null;
CountDownLatch currentCountDownLatch = null;
for (int i = 0; i < list.size(); i++) {
List<Data> dataList = list.get(i);
currentCountDownLatch = new CountDownLatch(dataList.size());
for(int j = 0; j < dataList.size(); j++){
Data data = dataList.get(j);
TestCallable callable = new TestCallable(data,prevCountDownLatch,currentCountDownLatch);
//放入线程池略...
}
prevCountDownLatch = currentCountDownLatch;
}
到这里,基本上就好了,你可以享受一下窗外的湖光山色,声色犬马一番...等等,突然需求又增加了:如果前面组内的任何一个任务失败或者出错了,后续的任务都要取消执行。
想想也挺合理,但是,这些任务可是放入了线程池啊,我如何去把它们捉出来,然后残忍的杀死呢?或者换一种思路,我需要有某种标记,去标记当前任务是否可以执行,如果不可以执行,就返回。那么后续的任务呢,可以在返回之前,将直接的后续任务标记为不可执行。每个任务执行前,都要判断可执行标记。伪代码如下:
public class TestCallable implements Callable {
private Data data;
private CountDownLatch prevCountDownLatch;
private CountDownLatch currentCountDownLatch;
private volatile boolean isCancel = false;//是否取消执行 注意volatile关键字,这是线程可见的重要手段
/**
* 内部持有的引用 用来 控制这些的内部标记 isCancel。当上一组的任意一个任务执行失败,后续的就要取消执行
* 不可以以对这些内部引用callable进行其他操作!
*/
private List<TestCallable> nextCallableList;
public TestCallable(Data data,CountDownLatch prevCountDownLatch,CountDownLatch currentCountDownLatch) {
this.data = data;
this.prevCountDownLatch = prevCountDownLatch;
this.currentCountDownLatch = currentCountDownLatch;
}
@Override
public Object call(){
if(prevCountDownLatch != null){
try {
prevCountDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
return this.doCancel();
}
}
//注意放在prevCountDownLatch.await() 不要放在这之前
if(this.isCancel){//如果当前线程是取消执行标记 那么执行取消的逻辑
return this.doCancel();
}
boolean result = false;
try {
//逻辑操作
result = true;
}catch (Exception e){
e.printStackTrace();
result = false;
//将所有下一组的线程的取消标记置为true
this.cancelAllNext();
}finally {
if(currentCountDownLatch != null){
currentCountDownLatch.countDown();
}
return result;
}
}
public List<TestCallable> getNextCallableList() {
return nextCallableList;
}
public void setNextCallableList(List<TestCallable> nextCallableList) {
this.nextCallableList = nextCallableList;
}
public Data getData() {
return data;
}
private void cancel(){
this.isCancel = true;
}
private void cancelAllNext(){
if(CollectionUtils.isEmpty(nextCallableList)){
return;
}
for (int i = 0; i < nextCallableList.size(); i++) {
TestCallable nextCallable = nextCallableList.get(i);
nextCallable.cancel();
}
}
private Object doCancel(){
//首先将所有下一组的线程的取消标记置为true
this.cancelAllNext();
//当前线程继续countdown 这样下一组线程才轮的到执行
if(currentCountDownLatch != null){
currentCountDownLatch.countDown();
}
return false;
}
}
这些Callable的list引用如何构造呢:
//key: 第几组任务 value:对应组的任务包装为callable的列表
Map<Integer,List<TestCallable>> callableMap = new HashMap<>();
CountDownLatch prevCountDownLatch = null;
CountDownLatch currentCountDownLatch = null;
for (int i = 0; i < list.size(); i++) {
List<Data> dataList = list.getChildList();
currentCountDownLatch = new CountDownLatch(dataList.size());
for(int j = 0; j < dataList.size(); j++){
Data data = dataList.get(j);
List<TestCallable> callableList = callableMap.get(i);
if(CollectionUtils.isEmpty(callableList)){
callableList = new ArrayList<>();
}
callableList.add(callable);
callableMap.put(i,callableList);
}
prevCountDownLatch = currentCountDownLatch;
}
for (int i = 0; i < list.size(); i++) {
List<TestCallable> callableList = callableMap.get(i);
List<TestCallable> nextCallableList = callableMap.get(i + 1);
for (int j = 0; j < callableList.size(); j++) {
TestCallable callable = callableList.get(j);
callable.setNextCallableList(nextCallableList);
threadPool.submit(callable);
}
}
好了,就完事了。
网友评论