美文网首页Java 8
Java 函数式编程 之组合式异步编程

Java 函数式编程 之组合式异步编程

作者: e86dab508bc1 | 来源:发表于2019-04-10 23:42 被阅读0次

    我们这一篇介绍一下java8中的异步编程,其实在java5的时候,Future就已经被引入了,下面我们用一个例子说明下Future的简单使用,
    public class FutureExample {

    public static void main(String[] arg) throws ExecutionException, InterruptedException, TimeoutException {
    
        ExecutorService executorService = new ThreadPoolExecutor(2,10,30, TimeUnit.MINUTES,new ArrayBlockingQueue<>(10));
        Future<Integer> result = executorService.submit(() -> doSomething());
        System.out.println(result.get(10,TimeUnit.SECONDS));
    
    }
    
    
    public static Integer doSomething() throws InterruptedException {
        TimeUnit.SECONDS.sleep(7);//模拟时间占用
        return 666;
    
    }}
    

    这边我们使用了future来异步获得返回值,那我们考虑一下,如果我们现在有多个任务,任务之间还存在依赖关系的话,那么我们采用Future实现,代码是怎么样的呢,我们难免要使用Future提供给我们的isDone来判断,这样代码就会变得不简洁,接下来我们就引入一个新的类,CompleteFuture 来解决future不能解决的复杂需求.

    我们这边假设这样一个场景,收银员使用一套系统来获取输入的水果的打折之后的价格,按照一般的代码写法:

     public void delay() throws InterruptedException {
            TimeUnit.SECONDS.sleep(3);
      }
           /**
     * get the product price in synchronized way
     *
     * @param product
     * @return
     * @throws InterruptedException
     */
    public double getPrice(String product) throws InterruptedException {
        return calculatePrice(product);
    }
    
    
    /**
     * calculate the price.
     *
     * @param price
     * @return
     * @throws InterruptedException
     */
    public double calculatePrice(String price) throws InterruptedException {
        delay();//模拟数据库等操作
        return new Random(100).nextDouble() * 100;//模拟价格
    }
    
     /**
     * get the latest discount info
     *
     * @param product
     * @return
     */
    public double getLatestDiscountInfo(String product) throws InterruptedException {
        TimeUnit.SECONDS.sleep(2);
        return new Random(100).nextDouble();
    }
    

    那我们如何使用CompleteFuture来进行改造呢,看下面

       public CompletableFuture<Double> getPriceAsync(String product) {
        CompletableFuture<Double> futureResult = new CompletableFuture<>();
        new Thread(() -> {
            try {
                double price = calculatePrice(product);
                futureResult.complete(price);
            } catch (InterruptedException e) {
                futureResult.completeExceptionally(e);
            }
        }).start();
        return futureResult;
    }
    

    看了一下这代码,感觉有点杂,单独开启一个线程来,如果请求很多,那么大量的线程将占用大量的资源,一种解决的办法是采用我们自己手工创建的线程池的方法,还有一种就是采用CompleteFuture给我们提供的静态方法,如下 :

        /**
     * Returns a new CompletableFuture that is asynchronously completed
     * by a task running in the given executor with the value obtained
     * by calling the given Supplier.
     *
     * @param supplier a function returning the value to be used
     * to complete the returned CompletableFuture
     * @param executor the executor to use for asynchronous execution
     * @param <U> the function's return type
     * @return the new CompletableFuture
     */
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
                                                       Executor executor) {
        return asyncSupplyStage(screenExecutor(executor), supplier);
    }
    

    改造之后:

     public CompletableFuture<Double> getPriceAsync(String product) {
        //we can refactor above code to below
        CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> {
            Double v = null;
            try {
                v = calculatePrice(product);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return v;
        }, threadExecutor);
        return future;
    }
    

    再深入CompleteFuture, 我们观察到其内部还提供这些函数。

    Screen Shot 2019-04-10 at 11.17.01 PM.png

    我们可以猜测到,其功能可能远远不止我们上面介绍的那些,下面我们就简单的使用下其中一个方法。

      public static void testCombine() throws ExecutionException,InterruptedException {
    
        PriceService priceService = new PriceService();
    
        Future<Double> future = CompletableFuture.supplyAsync(() -> {
            return priceService.getPrice("apple");
        }).thenCombine(CompletableFuture.supplyAsync(() -> {
                    return priceService.getPrice("Grape");
                }
        ), (priceOne, priceTwo) -> priceOne + priceTwo);
    
    
            System.out.println("The sum of the price of  apple and Grade is" +             future.get());
    
    
    }
    

    相关文章

      网友评论

        本文标题:Java 函数式编程 之组合式异步编程

        本文链接:https://www.haomeiwen.com/subject/apzwiqtx.html