美文网首页
四、线程池及异步编程

四、线程池及异步编程

作者: kar_joe | 来源:发表于2019-12-28 16:16 被阅读0次

    线程池ThreadPool

    任务队列+多线程
    生产者-消费者模式


    image.png
    ThreadPoolExecutor(
      int corePoolSize,
      int maximumPoolSize,
      long keepAliveTime,
      TimeUnit unit,
      BlockingQueue<Runnable> workQueue,
      ThreadFactory threadFactory,
      RejectedExecutionHandler handler) 
    
    // 提交 Runnable 任务
    Future<?> 
      submit(Runnable task);
    // 提交 Callable 任务
    <T> Future<T> 
      submit(Callable<T> task);
    // 提交 Runnable 任务及结果引用  
    <T> Future<T> 
      submit(Runnable task, T result);
    

    2、CompletionService,线程池+结果队列

    image.png

    示例1:
    同时发三个请求,收到一个即可

    // 创建线程池
    ExecutorService executor =
      Executors.newFixedThreadPool(3);
    // 创建 CompletionService
    CompletionService<Integer> cs =
      new ExecutorCompletionService<>(executor);
    // 用于保存 Future 对象
    List<Future<Integer>> futures =
      new ArrayList<>(3);
    // 提交异步任务,并保存 future 到 futures
    futures.add(
      cs.submit(()->geocoderByS1()));
    futures.add(
      cs.submit(()->geocoderByS2()));
    futures.add(
      cs.submit(()->geocoderByS3()));
    // 获取最快返回的任务执行结果
    Integer r = 0;
    try {
      // 只要有一个成功返回,则 break
      for (int i = 0; i < 3; ++i) {
        r = cs.take().get();
        // 简单地通过判空来检查是否成功返回
        if (r != null) {
          break;
        }
      }
    } finally {
      // 取消所有任务
      for(Future<Integer> f : futures)
        f.cancel(true);
    }
    // 返回结果
    return r;
    

    示例2:

    // 创建线程池
    ExecutorService executor =
      Executors.newFixedThreadPool(3);
    // 创建 CompletionService
    CompletionService<Integer> cs = new
      ExecutorCompletionService<>(executor);
    // 异步向电商 S1 询价
    cs.submit(()->getPriceByS1());
    // 异步向电商 S2 询价
    cs.submit(()->getPriceByS2());
    // 异步向电商 S3 询价
    cs.submit(()->getPriceByS3());
    // 将询价结果异步保存到数据库
    // 并计算最低报价
    AtomicReference<Integer> m =
      new AtomicReference<>(Integer.MAX_VALUE);
    for (int i=0; i<3; i++) {
      executor.execute(()->{
        Integer r = null;
        try {
          r = cs.take().get();
        } catch (Exception e) {}
        save(r);
        m.set(Integer.min(m.get(), r));
      });
    }
    return m;
    

    问题:组合操作,竟态条件

    Integer o; 
    do{ 
           o= m. get(); 
           if(o<=r){ break;}
    } 
    while(! m. compareAndSet( o, r)); 
    //或这么改:
    for (int i=0; i<3; i++) {
           Integer r = logIfError(()->cs.take().get());
           executor.execute(()-> save(r));
           m.getAndUpdate(v->Integer.min(v, r));
    }
    return m.get();
    

    异步编程(CompletableFuture)

    常见任务关系:串行、并行、and汇聚、or汇聚


    image.png

    异步编程可以用于上述模型,并且代码语义清晰,更专注业务逻辑
    示例:


    image.png
    Future实现版本:
    image.png
    // T1Task 需要执行的任务:
    // 洗水壶、烧开水、泡茶
    class T1Task implements Callable<String>{
      FutureTask<String> ft2;
      // T1 任务需要 T2 任务的 FutureTask
      T1Task(FutureTask<String> ft2){
        this.ft2 = ft2;
      }
      @Override
      String call() throws Exception {
        System.out.println("T1: 洗水壶...");
        TimeUnit.SECONDS.sleep(1);
        
        System.out.println("T1: 烧开水...");
        TimeUnit.SECONDS.sleep(15);
        // 获取 T2 线程的茶叶  
        String tf = ft2.get();
        System.out.println("T1: 拿到茶叶:"+tf);
    
        System.out.println("T1: 泡茶...");
        return " 上茶:" + tf;
      }
    }
    // T2Task 需要执行的任务:
    // 洗茶壶、洗茶杯、拿茶叶
    class T2Task implements Callable<String> {
      @Override
      String call() throws Exception {
        System.out.println("T2: 洗茶壶...");
        TimeUnit.SECONDS.sleep(1);
    
        System.out.println("T2: 洗茶杯...");
        TimeUnit.SECONDS.sleep(2);
    
        System.out.println("T2: 拿茶叶...");
        TimeUnit.SECONDS.sleep(1);
        return " 龙井 ";
      }
    }
    

    CompletableFuture版本:


    image.png
    // 任务 1:洗水壶 -> 烧开水
    CompletableFuture<Void> f1 = 
      CompletableFuture.runAsync(()->{
      System.out.println("T1: 洗水壶...");
      sleep(1, TimeUnit.SECONDS);
    
      System.out.println("T1: 烧开水...");
      sleep(15, TimeUnit.SECONDS);
    });
    // 任务 2:洗茶壶 -> 洗茶杯 -> 拿茶叶
    CompletableFuture<String> f2 = 
      CompletableFuture.supplyAsync(()->{
      System.out.println("T2: 洗茶壶...");
      sleep(1, TimeUnit.SECONDS);
    
      System.out.println("T2: 洗茶杯...");
      sleep(2, TimeUnit.SECONDS);
    
      System.out.println("T2: 拿茶叶...");
      sleep(1, TimeUnit.SECONDS);
      return " 龙井 ";
    });
    // 任务 3:任务 1 和任务 2 完成后执行:泡茶
    CompletableFuture<String> f3 = 
      f1.thenCombine(f2, (__, tf)->{
        System.out.println("T1: 拿到茶叶:" + tf);
        System.out.println("T1: 泡茶...");
        return " 上茶:" + tf;
      });
    // 等待任务 3 执行结果
    System.out.println(f3.join());
    

    相关文章

      网友评论

          本文标题:四、线程池及异步编程

          本文链接:https://www.haomeiwen.com/subject/gspsoctx.html