美文网首页线程
java并发编程(4)ForkJoin实现接口并发调用

java并发编程(4)ForkJoin实现接口并发调用

作者: monkey01 | 来源:发表于2017-09-13 17:48 被阅读427次

    前面几片文章都介绍了线程的基本开发,今天介绍在JDK7中新增的ForkJoin框架,并介绍该框架的在接口并发调用下的使用场景和demo。

    1.ForkJoin核心类

    Fork/Join框架是用来解决能够通过分治技术(Divide and Conquer Technique)将问题拆分成小任务的问题。
    Fork/Join框架的核心是由下列两个类组成的。

    ForkJoinPool:这个类实现了ExecutorService接口和工作窃取算法(Work-Stealing Algorithm)。它管理工作者线程,并提供任务的状态信息,以及任务的执行信息。
    ForkJoinTask:这个类是一个将在ForkJoinPool中执行的任务的基类。
    Fork/Join框架提供了在一个任务里执行fork()和join()操作的机制和控制任务状态的方法。通常,为了实现Fork/Join任务,需要实现一个以下两个类之一的子类。

    RecursiveAction:用于任务没有返回结果的场景。
    RecursiveTask:用于任务有返回结果的场景。实现compute()方法,实现任务的执行逻辑

    2.接口并发调用实例

    使用Fork/Join框架,需要实现一个继承RecursiveTask这个抽象类并实现compute方法,下面举个例子说明下,这个例子是我们日常经常会遇到的一个场景,前端app或者web一个页面中有很多接口调用,这些接口之间没有依赖关系,前端同学希望后端同学提供一个组合接口,最简单的就是后端同学串行调用接口,但是这就会导致接口调用时间也是所有接口调用时间的累加,这样很容易前端请求超时,这时我们就希望有各功能可以实现接口的并发调用,最后将返回报文汇总,这个例子就是简单实现了类似的功能:

    public class ComposeTask extends RecursiveTask<String> {
        private String[] urls;
        private String[] args;
    
        public ComposeTask(String[] urls, String[] args){
            this.urls = urls;
            this.args = args;
        }
    
        @Override
        protected String compute() {
            String rtn = new String();
            if(urls.length == 1){
                //执行url
                rtn = urls[0] + " || ";
                try {
                    //模拟执行网络请求
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }else{
                ArrayList<ComposeTask> subTasks = new ArrayList<ComposeTask>();
                for(String url : urls){
                    String[] arrayUrl = {url};
                    ComposeTask subTask = new ComposeTask(arrayUrl, args);
                    subTasks.add(subTask);
                    subTask.fork();
                }
    
                for(ComposeTask t : subTasks){
                    rtn = rtn + t.join();
                }
            }
            return rtn;
        }
    }
    

    这个代码是模拟实现一个并行网络请求的功能,从代码可以看到,除了需要重写compute()方法,还需要新增一个构造函数,在compute方法中通过入参可以判断出是否需要进行fork线程拆分。在compute方法中的逻辑主要有两部分组成,第一部分是通过urls.length是否等于1,来判断是否只有一个网络请求,如果只有一个网络请求,则调用网络方法进行执行网络请求,这里简单的让线程等待1秒来模拟耗时操作。这里子线程的拆分和执行过程通过递归实现,当符合拆分条件则新建ComposeTask子任务并调用fork()方法进行子线程执行,并在任务全部拆分完后对所有的子任务再调用join方法获取每个子线程的返回值,这里子线程返回的类型为继承RecursiveTask定义的范型类型String,这里除了int、boolean之类的基础类型不能使用,其他类型或者自定义类型都可以定义为RecursiveTask的范型类型。这里的join方法和thread的join方法一样,一旦调用RecursiveTask的join方法后,子任务会处于等待状态,ForkJoin线程池中的子线程会互相帮忙,如果一个任务完成释放出来后,会去执行等待中的子任务。

    下面我们再来看下调用并行task的代码如何编写,直接上代码:

        public static void main(String args[]) throws ExecutionException, InterruptedException {
            System.out.println(System.currentTimeMillis());
            ForkJoinPool forkJoinPool = new ForkJoinPool(4);
            String[] urls = {"http://localhost:8080/uc/getUser","http://localhost:8080/uc/addUser",
                    "http://localhost:8080/uc/getUser2","http://localhost:8080/uc/addUser2",
                    "http://localhost:8080/uc/getUser3","http://localhost:8080/uc/addUser3",
                    "http://localhost:8080/uc/getUser4","http://localhost:8080/uc/addUser4"};
            String[] argus = {};
            ComposeTask task = new ComposeTask(urls, argus);
            ForkJoinTask<String> result = forkJoinPool.submit(task);
            String rsp = result.get();
            System.out.println(System.currentTimeMillis());
            System.out.println("fork/join rsp = " + rsp);
        }
    

    为了防止线程被fork太多,这里也需要使用ForkJoinPool来保证线程不会太多而导致系统资源耗尽,一般线程池的大小和机器的cpu内核数相等。创建ForkJoinPool之后通过submit调用主task,这是一个同步调用,这个任务将等待子任务完成,然后继续执行(也可能是结束)。当一个主任务等待它的子任务时,执行这个主任务的工作者线程接收另一个等待执行的任务并开始执行。最后通过ForkJoinTask的get()方法获取所有子任务合并的返回结果。这样就实现了大任务的拆分子线程同步执行,对于一个前端页面有很多无依赖的接口调用的情况可以使用ForkJoin的方式来实现。

    相关文章

      网友评论

        本文标题:java并发编程(4)ForkJoin实现接口并发调用

        本文链接:https://www.haomeiwen.com/subject/okjlsxtx.html