美文网首页
【JAVA】应用中的并行

【JAVA】应用中的并行

作者: Y了个J | 来源:发表于2019-03-22 22:57 被阅读0次

    下面看一个服务的调用链


    屏幕快照 2019-03-22 下午10.56.41.png

    设想一下这5个查询服务,平均每次消耗50ms,那么本次调用至少是250ms,我们细想一下,在这个这五个服务其实并没有任何的依赖,谁先获取谁后获取都可以,那么我们可以想想,是否可以用多重影分身之术,同时获取这五个服务的信息呢?优化如下:

    屏幕快照 2019-03-22 下午10.58.07.png

    将这五个查询服务并行查询,在理想情况下可以优化至50ms。当然说起来简单,我们真正如何落地呢?

    1.CountDownLatch

    CountDownLatch,可以将其看成是一个计数器,await()方法可以阻塞至超时或者计数器减至0,其他线程当完成自己目标的时候可以减少1,利用这个机制我们可以将其用来做并发。 可以用如下的代码实现我们上面的下订单的需求:

    public class CountDownTask {
        private static final int CORE_POOL_SIZE = 4;
        private static final int MAX_POOL_SIZE = 12;
        private static final long KEEP_ALIVE_TIME = 5L;
        private final static int QUEUE_SIZE = 1600;
    
        protected final static ExecutorService THREAD_POOL = new ThreadPoolExecutor(
                CORE_POOL_SIZE,
                MAX_POOL_SIZE,
                KEEP_ALIVE_TIME,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(QUEUE_SIZE)
        );
    
        public static void main(String[] args) throws InterruptedException {
            // 新建一个为5的计数器
            CountDownLatch countDownLatch = new CountDownLatch(5);
    
            OrderInfo orderInfo = new OrderInfo();
            THREAD_POOL.execute(() -> {
                System.out.println("当前任务Customer,线程名字为:"+ Thread.currentThread().getName());
                orderInfo.setCustomerInfo(new CustomerInfo());
                countDownLatch.countDown();
            });
            THREAD_POOL.execute(() -> {
                System.out.println("当前任务Discount,线程名字为:"+ Thread.currentThread().getName());
                orderInfo.setDiscountInfo(new DiscountInfo());
                countDownLatch.countDown();
            });
            THREAD_POOL.execute(() -> {
                System.out.println("当前任务Food,线程名字为:"+ Thread.currentThread().getName());
                orderInfo.setFoodListInfo(new FoodListInfo());
                countDownLatch.countDown();
            });
            THREAD_POOL.execute(() -> {
                System.out.println("当前任务Tenant,线程名字为:"+ Thread.currentThread().getName());
                orderInfo.setTenantInfo(new TenantInfo());
                countDownLatch.countDown();
            });
            THREAD_POOL.execute(() -> {
                System.out.println("当前任务OtherInfo,线程名字为:"+ Thread.currentThread().getName());
                orderInfo.setOtherInfo(new OtherInfo());
                countDownLatch.countDown();
            });
            countDownLatch.await(1, TimeUnit.SECONDS);
            System.out.println("主线程:"+ Thread.currentThread().getName());
        }
    }
    

    建立一个线程池(具体配置根据具体业务,具体机器配置),进行并发的执行我们的任务(生成用户信息,菜品信息等),最后利用await方法阻塞等待结果成功返回。

    2.CompletableFuture

    CountDownLatch虽然能实现我们需要满足的功能但是其任然有个问题是,在我们的业务代码需要耦合CountDownLatch的代码,比如在我们获取用户信息之后我们会执行countDownLatch.countDown(),很明显我们的业务代码显然不应该关心这一部分逻辑,并且在开发的过程中万一写漏了,那我们的await方法将只会被各种异常唤醒。

    在JDK1.8中提供了一个类CompletableFuture,它是一个多功能的非阻塞的Future。(什么是Future:用来代表异步结果,并且提供了检查计算完成,等待完成,检索结果完成等方法。)

    我们将每个任务的计算完成的结果都用CompletableFuture来表示,利用CompletableFuture.allOf汇聚成一个大的CompletableFuture,那么利用get()方法就可以阻塞。

    public class CompletableFutureParallel {
    
        private static final int CORE_POOL_SIZE = 4;
        private static final int MAX_POOL_SIZE = 12;
        private static final long KEEP_ALIVE_TIME = 5L;
        private final static int QUEUE_SIZE = 1600;
    
        protected final static ExecutorService THREAD_POOL = new ThreadPoolExecutor(
                CORE_POOL_SIZE,
                MAX_POOL_SIZE,
                KEEP_ALIVE_TIME,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(QUEUE_SIZE)
        );
    
        public static void main(String[] args) throws Exception {
            OrderInfo orderInfo = new OrderInfo();
    
            List futures = new ArrayList<>();
    
            futures.add(CompletableFuture.runAsync(() -> {
                System.out.println("当前任务Customer,线程名字为:" + Thread.currentThread().getName());
                orderInfo.setCustomerInfo(new CustomerInfo());
            }, THREAD_POOL));
    
            futures.add(CompletableFuture.runAsync(() -> {
                System.out.println("当前任务Discount,线程名字为:" + Thread.currentThread().getName());
                orderInfo.setDiscountInfo(new DiscountInfo());
            }, THREAD_POOL));
    
            futures.add(CompletableFuture.runAsync(() -> {
                System.out.println("当前任务Food,线程名字为:" + Thread.currentThread().getName());
                orderInfo.setFoodListInfo(new FoodListInfo());
            }, THREAD_POOL));
    
            futures.add(CompletableFuture.runAsync(() -> {
                System.out.println("当前任务Other,线程名字为:" + Thread.currentThread().getName());
                orderInfo.setOtherInfo(new OtherInfo());
            }, THREAD_POOL));
    
            CompletableFuture allDoneFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
            allDoneFuture.get(10, TimeUnit.SECONDS);
            System.out.println(orderInfo);
        }
    }
    

    可以看见我们使用CompletableFuture能很快的完成的需求,当然这还不够。

    3.Fork/Join

    我们上面用CompletableFuture完成了我们对多组任务并行执行,但是其依然是依赖我们的线程池,在我们的线程池中使用的是阻塞队列,也就是当我们某个线程执行完任务的时候需要通过这个阻塞队列进行,那么肯定会发生竞争,所以在JDK1.7中提供了ForkJoinTask和ForkJoinPool。

    屏幕快照 2019-03-22 下午11.17.16.png

    ForkJoinPool中每个线程都有自己的工作队列,并且采用Work-Steal算法防止线程饥饿。 Worker线程用LIFO的方法取出任务,但是会用FIFO的方法去偷取别人队列的任务,这样就减少了锁的冲突。

    屏幕快照 2019-03-22 下午11.18.20.png

    网上这个框架的例子很多,我们看看如何使用代码其完成我们上面的下订单需求:

    public class OrderTask extends RecursiveTask {
        
        @Override
        protected OrderInfocompute() {
            System.out.println("执行" + this.getClass().getSimpleName() + "线程名字为:" + Thread.currentThread().getName());                
    
            CustomerTask customerTask = new CustomerTask();
    
            TenantTask tenantTask = new TenantTask();
    
            DiscountTask discountTask = new DiscountTask();
    
            FoodTask foodTask = new FoodTask();
    
            OtherTask otherTask = new OtherTask();
    
            invokeAll(customerTask, tenantTask, discountTask, foodTask, otherTask);
    
            OrderInfo orderInfo = new OrderInfo(customerTask.join(), tenantTask.join(), discountTask.join(), foodTask.join(), otherTask.join());
            returnorderInfo;
        }
    
        public static void main(String[] args) {
            ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors() - 1);
            System.out.println(forkJoinPool.invoke(new OrderTask()));
        }
    }
    
    class CustomerTask extends RecursiveTask {
        @Override
        protected CustomerInfocompute() {
            System.out.println("执行" + this.getClass().getSimpleName() + "线程名字为:" + Thread.currentThread().getName());
            return new CustomerInfo();
        }
    }
    
    class TenantTask extends RecursiveTask {
        @Override
        protected TenantInfocompute() {
            System.out.println("执行" + this.getClass().getSimpleName() + "线程名字为:" + Thread.currentThread().getName());
            return new TenantInfo();
        }
    }
    
    class DiscountTask extends RecursiveTask {
        @Override
        protected DiscountInfocompute() {
            System.out.println("执行" + this.getClass().getSimpleName() + "线程名字为:" + Thread.currentThread().getName());
            return new DiscountInfo();
        }
    }
    
    class FoodTask extends RecursiveTask {
        @Override
        protected FoodListInfocompute() {
            System.out.println("执行" + this.getClass().getSimpleName() + "线程名字为:" + Thread.currentThread().getName());
            return new FoodListInfo();
        }
    }
    
    class OtherTask extends RecursiveTask {
        @Override
        protected OtherInfocompute() {
            System.out.println("执行" + this.getClass().getSimpleName() + "线程名字为:" + Thread.currentThread().getName());
            return new OtherInfo();
        }
    }
    

    我们定义一个OrderTask并且定义五个获取信息的任务,在compute中分别fork执行这五个任务,最后在将这五个任务的结果通过Join获得,最后完成我们的并行化的需求。

    4. parallelStream

    在jdk1.8中提供了并行流的API,当我们使用集合的时候能很好的进行并行处理,下面举了一个简单的例子从1加到100:

    public class ParallelStream {
        public static void main(String[] args) {
            ArrayList list = new ArrayList();
    
            for (int i = 1; i <= 100; i++) {
                list.add(i);
            }
    
            LongAdder sum = new LongAdder();
            list.parallelStream().forEach(integer -> {
                System.out.println("当前线程" + Thread.currentThread().getName());
                sum.add(integer);
            });
            System.out.println(sum);
        }
    }
    

    parallelStream中底层使用的那一套也是Fork/Join的那一套,默认的并发程度是可用CPU数-1。

    5.分片

    可以想象有这么一个需求,每天定时对id在某个范围之间的用户发券,比如这个范围之间的用户有几百万,如果给一台机器发的话,可能全部发完需要很久的时间,所以分布式调度框架比如:elastic-job。都提供了分片的功能,比如你用50台机器,那么id%50=0的在第0台机器上,=1的在第1台机器上发券,那么我们的执行时间其实就分摊到了不同的机器上了。

    并行化注意事项

    线程安全:在parallelStream中我们列举的代码中使用的是LongAdder,并没有直接使用我们的Integer和Long,这个是因为在多线程环境下Integer和Long线程不安全。所以线程安全我们需要特别注意。
    合理参数配置:可以看见我们需要配置的参数比较多,比如我们的线程池的大小,等待队列大小,并行度大小以及我们的等待超时时间等等,我们都需要根据自己的业务不断的调优防止出现队列不够用或者超时时间不合理等等。

    CompletableFuture详细介绍

    JDK8以前的Future

    public static void main(String[] args) {
            ExecutorService executor = Executors.newSingleThreadExecutor();
            Future<String> future = executor.submit(new Callable<String>() {
                @Override
                public String call() throws Exception {
                    return Thread.currentThread().getName();
                }
            });
    
            doSomethingElse();//在我们异步操作的同时一样可以做其他操作
    
            try {
                String res = future.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    

    上面展示了我们的线程可以并发方式调用另一个线程去做我们耗时的操作。当我们必须依赖我们的异步结果的时候我们就可以调用get方法去获得。当我们调用get方法的时候如果我们的任务完成就可以立马返回,但是如果任务没有完成就会阻塞,直到超时为止。

    Future底层是怎么实现的呢? 我们首先来到我们ExecutorService的代码中submit方法这里会返回一个Future

    public <T> Future<T> submit(Callable<T> task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task);
            execute(ftask);
            return ftask;
        }
    

    在sumbmit中会对我们的Callable进行包装封装成我们的FutureTask,我们最后的Future其实也是Future的实现类FutureTask,FutureTask实现了Runnable接口所以这里直接调用execute。在FutureTask代码中的run方法代码如下:

    public void run() {
            if (state != NEW ||  !UNSAFE.compareAndSwapObject(this, runnerOffset,  null, Thread.currentThread()))
                return;
            try {
                Callable<V> c = callable;
                if (c != null && state == NEW) {
                    V result;
                    boolean ran;
                    try {
                        result = c.call();
                        ran = true;
                    } catch (Throwable ex) {
                        result = null;
                        ran = false;
                        setException(ex);
                    }
                    if (ran)
                        set(result);
                }
            } 
            .......
        }
    

    可以看见当我们执行完成之后会set(result)来通知我们的结果完成了。set(result)代码如下:

    protected void set(V v) {
            if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
                outcome = v;
                UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
                finishCompletion();
            }
        }
    

    首先用CAS置换状态为完成,以及替换结果,当替换结果完成之后,才会替换为我们的最终状态,这里主要是怕我们设置完COMPLETING状态之后最终值还没有真正的赋值出去,而我们的get就去使用了,所以还会有个最终状态。我们的get()方法的代码如下:

    public V get() throws InterruptedException, ExecutionException {
            int s = state;
            if (s <= COMPLETING)
                s = awaitDone(false, 0L);
            return report(s);
        }
    

    首先获得当前状态,然后判断状态是否完成,如果没有完成则进入awaitDone循环等待,这也是我们阻塞的代码,然后返回我们的最终结果。

    缺陷
    我们的Future使用很简单,这也导致了如果我们想完成一些复杂的任务可能就比较难。比如下面一些例子:
    将两个异步计算合成一个异步计算,这两个异步计算互相独立,同时第二个又依赖第一个的结果。
    当Future集合中某个任务最快结束时,返回结果。
    等待Future结合中的所有任务都完成。
    通过编程方式完成一个Future任务的执行。
    应对Future的完成时间。也就是我们的回调通知。

    CompletableFuture
    CompletableFuture是JDK8提出的一个支持非阻塞的多功能的Future,同样也是实现了Future接口。

    下面会写一个比较简单的例子:

    public static void main(String[] args) {
            CompletableFuture<String> completableFuture = new CompletableFuture<>();
    
            new Thread(()->{
                completableFuture.complete(Thread.currentThread().getName());
            }).start();
    
            doSomethingelse();//做你想做的其他操作
            
            try {
                System.out.println(completableFuture.get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    

    用法上来说和Future有一点不同,我们这里fork了一个新的线程来完成我们的异步操作,在异步操作中我们会设置值,然后在外部做我们其他操作。在complete中会用CAS替换result,然后当我们get如果可以获取到值得时候就可以返回了。

    错误处理
    上面介绍了正常情况下但是当我们在我们异步线程中产生了错误的话就会非常的不幸,错误的异常不会告知给你,会被扼杀在我们的异步线程中,而我们的get方法会被阻塞。

    对于我们的CompletableFuture提供了completeException方法可以让我们返回我们异步线程中的异常,代码如下:

    public static void main(String[] args) {
            CompletableFuture<String> completableFuture = new CompletableFuture<>();
    
            new Thread(()->{
                completableFuture.completeExceptionally(new RuntimeException("error"));
                completableFuture.complete(Thread.currentThread().getName());
            }).start();
    
           doSomethingelse();//做你想做的耗时操作
    
            try {
                System.out.println(completableFuture.get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    --------------
    输出:
    java.util.concurrent.ExecutionException: java.lang.RuntimeException: error
        at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1887)
        at futurepackge.jdk8Future.main(jdk8Future.java:19)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
    Caused by: java.lang.RuntimeException: error
        at futurepackge.jdk8Future.lambda$main$0(jdk8Future.java:13)
        at futurepackge.jdk8Future$$Lambda$1/1768305536.run(Unknown Source)
        at java.lang.Thread.run(Thread.java:745)
    

    在我们新建的异步线程中直接New一个异常抛出,在我们客户端中依然可以获得异常。

    工厂方法创建CompletableFuture

    我们的上面的代码虽然不复杂,但是我们的java8依然对其提供了大量的工厂方法,用这些方法更容易完成整个流程。如下面的例子:

    public static void main(String[] args) {
            CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() ->{
                 return Thread.currentThread().getName();
            });
    
           doSomethingelse();//做你想做的耗时操作
    
            try {
                System.out.println(completableFuture.get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    ---------
    输出:
    ForkJoinPool.commonPool-worker-1
    

    上面的例子通过工厂方法supplyAsync提供了一个Completable,在异步线程中的输出是ForkJoinPool可以看出当我们不指定线程池的时候会使用ForkJoinPool,而我们上面的compelte的操作在我们的run方法中做了,源代码如下:

    public void run() {
                CompletableFuture<T> d; Supplier<T> f;
                if ((d = dep) != null && (f = fn) != null) {
                    dep = null; 
                    fn = null;
                    if (d.result == null) {
                        try {
                            d.completeValue(f.get());
                        } catch (Throwable ex) {
                            d.completeThrowable(ex);
                        }
                    }
                    d.postComplete();
                }
            }
    

    上面代码中通过d.completeValue(f.get());设置了我们的值。同样的构造方法还有runasync等等。

    计算结果完成时的处理
    当CompletableFuture计算结果完成时,我们需要对结果进行处理,或者当CompletableFuture产生异常的时候需要对异常进行处理。有如下几种方法:

    public CompletableFuture<T>     whenComplete(BiConsumer<? super T,? super Throwable> action)
    public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
    public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
    public CompletableFuture<T>     exceptionally(Function<Throwable,? extends T> fn)
    

    上面的四种方法都返回了CompletableFuture,当我们Action执行完毕的时候,future返回的值和我们原始的CompletableFuture的值是一样的。上面以Async结尾的会在新的线程池中执行,上面没有一Async结尾的会在之前的CompletableFuture执行的线程中执行。例子代码如下:

    public static void main(String[] args) throws Exception {
            CompletableFuture<Integer> future = CompletableFuture.supplyAsync(jdk8Future::getMoreData);
            Future<Integer> f = future.whenComplete((v, e) -> {
                System.out.println(Thread.currentThread().getName());
                System.out.println(v);
            });
            System.out.println("Main" + Thread.currentThread().getName());
            System.out.println(f.get());
        }
    

    exceptionally方法返回一个新的CompletableFuture,当原始的CompletableFuture抛出异常的时候,就会触发这个CompletableFuture的计算,调用function计算值,否则如果原始的CompletableFuture正常计算完后,这个新的CompletableFuture也计算完成,它的值和原始的CompletableFuture的计算的值相同。也就是这个exceptionally方法用来处理异常的情况。

    上面我们讨论了如何计算结果完成时进行的处理,接下来我们讨论如何对计算结果完成时,对结果进行转换。

    public <U> CompletableFuture<U>     thenApply(Function<? super T,? extends U> fn)
    public <U> CompletableFuture<U>     thenApplyAsync(Function<? super T,? extends U> fn)
    public <U> CompletableFuture<U>     thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
    

    这里同样也是返回CompletableFuture,但是这个结果会由我们自定义返回去转换他,同样的不以Async结尾的方法由原来的线程计算,以Async结尾的方法由默认的线程池ForkJoinPool.commonPool()或者指定的线程池executor运行。Java的CompletableFuture类总是遵循这样的原则,下面就不一一赘述了。
    例子代码如下:

    public static void main(String[] args) throws Exception {
            CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
                return 10;
            });
            CompletableFuture<String> f = future.thenApply(i ->i+1 ).thenApply(i-> String.valueOf(i));
            System.out.println(f.get());
        }
    

    上面的最终结果会输出11,我们成功将其用两个thenApply转换为String。

    计算结果完成时的消费
    上面已经讲了结果完成时的处理和转换,他们最后的CompletableFuture都会返回对应的值,这里还会有一个只会对计算结果消费不会返回任何结果的方法。

    public CompletableFuture<Void>  thenAccept(Consumer<? super T> action)
    public CompletableFuture<Void>  thenAcceptAsync(Consumer<? super T> action)
    public CompletableFuture<Void>  thenAcceptAsync(Consumer<? super T> action, Executor executor)
    

    函数接口为Consumer,就知道了只会对函数进行消费,例子代码如下:

    public static void main(String[] args) throws Exception {
            CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
                return 10;
            });
            future.thenAccept(System.out::println);
        }
    

    这个方法用法很简单我就不多说了.Accept家族还有个方法是用来合并结果当两个CompletionStage都正常执行的时候就会执行提供的action,它用来组合另外一个异步的结果。

    public <U> CompletableFuture<Void>  thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
    public <U> CompletableFuture<Void>  thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
    public <U> CompletableFuture<Void>  thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor)
    public     CompletableFuture<Void>  runAfterBoth(CompletionStage<?> other,  Runnable action)
    

    runAfterBoth是当两个CompletionStage都正常完成计算的时候,执行一个Runnable,这个Runnable并不使用计算的结果。 示例代码如下:

    public static void main(String[] args) throws Exception {
            CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
                return 10;
            });
            System.out.println(future.thenAcceptBoth(CompletableFuture.supplyAsync(() -> {
                return 20;
            }),(x,y) -> System.out.println(x+y)).get());
        }
    

    CompletableFuture也提供了执行Runnable的办法,这里我们就不能使用我们future中的值了。

    public CompletableFuture<Void>  thenRun(Runnable action)
    public CompletableFuture<Void>  thenRunAsync(Runnable action)
    public CompletableFuture<Void>  thenRunAsync(Runnable action, Executor executor)
    

    对计算结果的组合
    首先是介绍一下连接两个future的方法:

    public <U> CompletableFuture<U>     thenCompose(Function<? super T,? extends CompletionStage<U>> fn)
    public <U> CompletableFuture<U>     thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn)
    public <U> CompletableFuture<U>     thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn, Executor executor)
    

    对于Compose可以连接两个CompletableFuture,其内部处理逻辑是当第一个CompletableFuture处理没有完成时会合并成一个CompletableFuture,如果处理完成,第二个future会紧接上一个CompletableFuture进行处理。

    public static void main(String[] args) throws Exception {
            CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
                return 10;
            });
            System.out.println(future.thenCompose(i -> CompletableFuture.supplyAsync(() -> { return i+1;})).get());
        }
    

    我们上面的thenAcceptBoth讲了合并两个future,但是没有返回值这里将介绍一个有返回值的方法,如下:

    public <U,V> CompletableFuture<V>   thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
    public <U,V> CompletableFuture<V>   thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
    public <U,V> CompletableFuture<V>   thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)
    

    例子比较简单如下:

    public static void main(String[] args) throws Exception {
            CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
                return 10;
            });
            CompletableFuture<String> f = future.thenCombine(CompletableFuture.supplyAsync(() -> {
                return 20;
            }),(x,y) -> {return "计算结果:"+x+y;});
            System.out.println(f.get());
        }
    

    上面介绍了两个future完成的时候应该完成的工作,接下来介绍任意一个future完成时需要执行的工作,方法如下:

    public CompletableFuture<Void>  acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
    public CompletableFuture<Void>  acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
    public CompletableFuture<Void>  acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)
    public <U> CompletableFuture<U>     applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn)
    public <U> CompletableFuture<U>     applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn)
    public <U> CompletableFuture<U>     applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn, Executor executor)
    

    上面两个是一个是纯消费不返回结果,一个是计算后返回结果。

    其他方法

    public static CompletableFuture<Void>       allOf(CompletableFuture<?>... cfs)
    public static CompletableFuture<Object>     anyOf(CompletableFuture<?>... cfs)
    

    allOf方法是当所有的CompletableFuture都执行完后执行计算。
    anyOf方法是当任意一个CompletableFuture执行完后就会执行计算,计算的结果相同。

    相关文章

      网友评论

          本文标题:【JAVA】应用中的并行

          本文链接:https://www.haomeiwen.com/subject/xbwdvqtx.html