美文网首页
有顺序的多线程操作

有顺序的多线程操作

作者: 呼噜噜睡 | 来源:发表于2024-07-17 08:00 被阅读0次

    又到了天桥说书的时候了,今天来聊一聊多线程的一些东西。假如,我是说假如,突然有一天给了来了这样一个需求:有多组任务,组与组之间的任务是顺序执行的,组内的任务是并发执行的。文字说明如下:

    组1: 任务A   任务B
    组2:任务A1  任务B1  任务C1
    组3:    任务A3  
    ...
    

    就像上面文字所描述的,组1的任务是第一个执行的,任务A和任务B是并发去执行的。组2的任务要等到组1的所有任务执行完毕之后,再去并发执行任务A1 任务B1 任务C1,就这样以此类推,一直往下执行。

    是不是一听,就很简单呢,可以写个循环,遍历每一组,每一组的各个任务包装为一个Callable对象去扔到线程池去执行,并且将放入线程池的返回对象Future对象放入列表,循环遍历这个Future列表,但凡有一个future.get()是阻塞的,那么就说明任务没有执行完毕,那就继续等着,直到循环遍历完毕,就说明所有的组内任务执行完毕了。下一组的任务如法炮制。

    不得不说,这是一个取巧的办法,就是一次执行一点点的任务,根据结果再来决定是否继续执行。但总感觉这种方法太low了,不够高大上。为啥要高大上呢?一切都是为了装逼。

    我更希望的是这些任务一股脑的扔进线程池,然后根据某种机制,去保证上面要求的顺序。这样一来,题目的难度是不是就高了那么一点点呢。

    首先说一下思路吧:线程间的等待,可以使用CountDownLatch来做。每个任务的包装类Callable均持有两个CountDownLatch的引用。我们分别叫他为prev和current吧。prev这个是上一组任务的CountDownLatch,上一组任务每次执行完毕,都countdown一次。prev这个就在执行方法体第一行进行await,当上一组所有方法执行完,该线程就不再阻塞,可以执行了。current是本组内的CountDownLatch,当前任务执行完毕,就countdown一次。

    对应的伪代码如下:

    public class TestCallable implements Callable {
    
        private Data data;
    
        private CountDownLatch prevCountDownLatch;
    
        private CountDownLatch currentCountDownLatch;
    
        public TestCallable(Data data,CountDownLatch prevCountDownLatch,CountDownLatch currentCountDownLatch) {
            this.data = data;
            this.prevCountDownLatch = prevCountDownLatch;
            this.currentCountDownLatch = currentCountDownLatch;
        }
    
        @Override
        public Object call() throws Exception e{
            if(prevCountDownLatch != null){
                prevCountDownLatch.await();
            }
            try {
                //执行业务逻辑
            }finally {
                if(currentCountDownLatch != null){
                    currentCountDownLatch.countDown();
                }
                return result;
            }
        }
    }
    

    值得注意的是,构造哪个是当前的CountDownLatch,哪一个是上一个的CountDownLatch,在循环中,应该注意:

    CountDownLatch prevCountDownLatch = null;
    CountDownLatch currentCountDownLatch = null;
    for (int i = 0; i < list.size(); i++) {
      List<Data> dataList = list.get(i);
      currentCountDownLatch = new CountDownLatch(dataList.size());
      for(int j = 0; j < dataList.size(); j++){
        Data data = dataList.get(j);
        TestCallable callable = new TestCallable(data,prevCountDownLatch,currentCountDownLatch);
        //放入线程池略...
    
      }
      prevCountDownLatch = currentCountDownLatch;
    }
    

    到这里,基本上就好了,你可以享受一下窗外的湖光山色,声色犬马一番...等等,突然需求又增加了:如果前面组内的任何一个任务失败或者出错了,后续的任务都要取消执行。

    想想也挺合理,但是,这些任务可是放入了线程池啊,我如何去把它们捉出来,然后残忍的杀死呢?或者换一种思路,我需要有某种标记,去标记当前任务是否可以执行,如果不可以执行,就返回。那么后续的任务呢,可以在返回之前,将直接的后续任务标记为不可执行。每个任务执行前,都要判断可执行标记。伪代码如下:

    public class TestCallable implements Callable {
    
        private Data data;
    
        private CountDownLatch prevCountDownLatch;
    
        private CountDownLatch currentCountDownLatch;
    
        private volatile boolean isCancel = false;//是否取消执行  注意volatile关键字,这是线程可见的重要手段
    
        /**
         * 内部持有的引用  用来 控制这些的内部标记  isCancel。当上一组的任意一个任务执行失败,后续的就要取消执行
         * 不可以以对这些内部引用callable进行其他操作!
         */
        private List<TestCallable> nextCallableList;
    
        public TestCallable(Data data,CountDownLatch prevCountDownLatch,CountDownLatch currentCountDownLatch) {
            this.data = data;
            this.prevCountDownLatch = prevCountDownLatch;
            this.currentCountDownLatch = currentCountDownLatch;
        }
    
        @Override
        public Object call(){
            if(prevCountDownLatch != null){
                try {
                    prevCountDownLatch.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    return this.doCancel();
                }
            }
    
            //注意放在prevCountDownLatch.await()  不要放在这之前
            if(this.isCancel){//如果当前线程是取消执行标记  那么执行取消的逻辑
                return this.doCancel();
            }
    
            boolean result = false;
            
            try {
                //逻辑操作
                result = true;
            }catch (Exception e){
                e.printStackTrace();
                result = false;
                //将所有下一组的线程的取消标记置为true
                this.cancelAllNext();
            }finally {
                if(currentCountDownLatch != null){
                    currentCountDownLatch.countDown();
                }
                return result;
            }
        }
    
        public List<TestCallable> getNextCallableList() {
            return nextCallableList;
        }
    
        public void setNextCallableList(List<TestCallable> nextCallableList) {
            this.nextCallableList = nextCallableList;
        }
    
        public Data getData() {
            return data;
        }
    
        private void cancel(){
            this.isCancel = true;
        }
    
        private void cancelAllNext(){
            if(CollectionUtils.isEmpty(nextCallableList)){
                return;
            }
            for (int i = 0; i < nextCallableList.size(); i++) {
                TestCallable nextCallable = nextCallableList.get(i);
                nextCallable.cancel();
            }
        }
    
        private Object doCancel(){
            //首先将所有下一组的线程的取消标记置为true
            this.cancelAllNext();
            //当前线程继续countdown  这样下一组线程才轮的到执行
            if(currentCountDownLatch != null){
                currentCountDownLatch.countDown();
            }
            return false;
        }
    }
    

    这些Callable的list引用如何构造呢:

    //key: 第几组任务  value:对应组的任务包装为callable的列表
    Map<Integer,List<TestCallable>> callableMap = new HashMap<>(); 
    CountDownLatch prevCountDownLatch = null;
    CountDownLatch currentCountDownLatch = null;
    for (int i = 0; i < list.size(); i++) {
      List<Data> dataList = list.getChildList();
      currentCountDownLatch = new CountDownLatch(dataList.size());
      for(int j = 0; j < dataList.size(); j++){
        Data data = dataList.get(j);
        List<TestCallable> callableList = callableMap.get(i);
        if(CollectionUtils.isEmpty(callableList)){
          callableList = new ArrayList<>();
        }
        callableList.add(callable);
        callableMap.put(i,callableList);
      }
      prevCountDownLatch = currentCountDownLatch;
    }
    
    for (int i = 0; i < list.size(); i++) {
      List<TestCallable> callableList = callableMap.get(i);
      List<TestCallable> nextCallableList = callableMap.get(i + 1);
      for (int j = 0; j < callableList.size(); j++) {
        TestCallable callable = callableList.get(j);
        callable.setNextCallableList(nextCallableList);
        threadPool.submit(callable);
      }
    }
    

    好了,就完事了。

    相关文章

      网友评论

          本文标题:有顺序的多线程操作

          本文链接:https://www.haomeiwen.com/subject/kaklhjtx.html