Java异步

作者: 愤怒的老照 | 来源:发表于2020-05-21 19:21 被阅读0次

    1、同步、异步、阻塞、非阻塞

    理解Java的异步调用,还需要从这四个基础概念理解;如果要从Java方面来理解这四个概念,需要对Java的IO模型有了解,Java 对于 IO 的封装分为 BIO、NIO 和 AIO,BIO 对应的是阻塞同步 IO,NIO 和 AIO 对应的都是非阻塞同步 IO。下面梳理一下对这几个概念的理解。https://www.jianshu.com/p/0efedb229e98

    一般的系统无法避免IO依赖,无论是从本地磁盘或从网络读取,是远程通信和本地文件读写的核心。但是由于IO的速度比CPU慢很多,所以IO成了系统的瓶颈,也就很有必要学习IO模型,避免瓶颈,提升性能。

    1.1 Linux IO模型

    首先从操作系统层面来说,以Linux为例,Linux内核把所有的外来设备当做一个文件,对本地文件有file descriptor处理,对网络文件有socket file descriptor处理,所以可以把二者一起分析。

    文件读取设计两个部分,一个是用户进程,一个是内核,用户进程是没有权利操作系统资源的,所以用户进程需要通过系统调用来调用内核,从而达到文件读取的目的。整个过程分三步:

    • 用户进程调用read方法向内核发起读请求
    • 内核将要读取的信息复制到缓冲区
    • 内核将数据从缓冲区复制到用户进程空间

    从这个步骤来看,阻塞非阻塞是针对调用方法来说的,如果用户进程在内核将数据准备完成前一致处于阻塞等待状态,就是阻塞的;如果内核还没有准备好,用户调用后直接返回,并没有阻塞,此时就是非阻塞的。。。同步异步是针对内核数据到用户进程这一过程看的,同步是指读写事件就绪后(一般是采用轮询检查),用户进程再自己负责进行读写的操作,内核向用户进程复制数据的过程仍然是阻塞的;如果是内核将数据复制到用户进程,并在复制完成后通知用户,用户没有主动检查数据准备状态,就是异步的,同步和异步的本质区别是,内核数据复制到用户空间时,用户进程是否进行等待。

    Linux中用的比较多的是IO多路复用模型,I/O多路复用可以监视多个描述符,一旦某个描述符读写操作就绪,便可以通知程序进行相应的读写操作,这个模型也是同步IO。

    1.2 Java IO模型

    下面来看Java中的IO模型Java对于IO的封装分为BIO、NIO和AIO。BIO对应的是阻塞同步IO,NIO和AIO对应的都是非阻塞同步IO。

    1.2.1 BIO

    Java中的BIO对应的是同步阻塞IO。BIO针对每一个客户端请求都建立一个Socket连接,当客户端没有数据时线程一直处于阻塞状态。这种IO模型的优点是简单,但是服务端的线程个数和客户端并发访问数呈正比,如果访问增多,线程也会增多导致OOM;可以通过线程池来解决,但是线程池可以接受的线程数量也是由系统决定的,并且线程上下文切换会导致CPU使用率不高,所以BIO不适合大规模使用。
    BIO请求代码如下:

    {
     ExecutorService executor = Excutors.newFixedThreadPollExecutor(100);//线程池
    
     ServerSocket serverSocket = new ServerSocket();
     serverSocket.bind(8088);
     while(!Thread.currentThread.isInturrupted()){//主线程死循环等待新连接到来
     Socket socket = serverSocket.accept();
     executor.submit(new ConnectIOnHandler(socket));//为新的连接创建新的线程
    }
    
    class ConnectIOnHandler extends Thread{
        private Socket socket;
        public ConnectIOnHandler(Socket socket){
           this.socket = socket;
        }
        public void run(){
          while(!Thread.currentThread.isInturrupted()&&!socket.isClosed()){死循环处理读写事件
              String someThing = socket.read()....//读取数据
              if(someThing!=null){
                 ......//处理数据
                 socket.write()....//写数据
              }
    
          }
        }
    }
    

    1.2.2 NIO

    NIO对应的是同步非阻塞IO。NIO有几个关键的概念,selector、channel、buffer。

    • buffer是包含需要读取或写入的数据的缓冲区。NIO中所有数据的读写均通过缓冲区进行操作。相对于BIO流的好处是可以重复读取,并且可以双向通信。
    • channel是一个双向的数据读写的通道,buffer就是建立在channel的基础上流通的。
    • selector是NIO的核心,通过不断轮询注册在其之上的channel,当selector发现某个channel有数据状态有变化时,所以只需要一个线程,就可以解决所有的连接请求。

    NIO代码如下:

     interface ChannelHandler{
          void channelReadable(Channel channel);
          void channelWritable(Channel channel);
       }
       class Channel{
         Socket socket;
         Event event;//读,写或者连接
       }
    
       //IO线程主循环:
       class IoThread extends Thread{
       public void run(){
       Channel channel;
       while(channel=Selector.select()){//选择就绪的事件和对应的连接,阻塞的不需要担心cpu空转
          if(channel.event==accept){
             registerNewChannelHandler(channel);//如果是新连接,则注册一个新的读写处理器
          }
          if(channel.event==write){
             getChannelHandler(channel).channelWritable(channel);//如果可以写,则执行写事件
          }
          if(channel.event==read){
              getChannelHandler(channel).channelReadable(channel);//如果可以读,则执行读事件
          }
        }
       }
       Map<Channel,ChannelHandler> handlerMap;//所有channel的对应事件处理器
      }
    

    NIO由原来的阻塞读写(占用线程)变成了单线程轮询事件,找到可以进行读写的网络描述符进行读写。除了事件的轮询是阻塞的(没有可干的事情必须要阻塞),剩余的I/O操作都是纯CPU操作,没有必要开启多线程。

    并且由于线程的节约,连接数大的时候因为线程切换带来的问题也随之解决,进而为处理海量连接提供了可能。

    1.2.3 AIO

    还不会。。。。。

    2、Java代码中的异步

    上面说的是Java IO模型的同步异步,在业务代码中可不可以体现异步呢?答案是肯定的。

    2.1 同步代码

    public class Main {                                                                          
        public static void main(String[] args) {                                                 
                                                                                                 
            long begin = System.currentTimeMillis();                                             
                                                                                                 
            calculate();                                                                         
            long end = System.currentTimeMillis();                                               
            System.out.println(String.format("耗时%ss", String.valueOf(end - begin)));             
        }                                                                                        
                                                                                                 
        public static  void calculate(){                                                         
            // 模拟耗时                                                                              
            try {                                                                                
                TimeUnit.SECONDS.sleep(5);                                                       
            } catch (InterruptedException e) {                                                   
                e.printStackTrace();                                                             
            }                                                                                    
                                                                                                 
            System.out.println("计算完成");                                                          
        }                                                                                        
    }                                                                                            
    

    代码中调用方必须要等到被调用方执行结束后,才可以继续执行。

    2.2 基于Future的异步调用

    Future 模式相当于一个占位符,代表一个操作的未来的结果,其简单的概念不在本文中介绍,直接给出总结:Future 模式可以细分为将来式和回调式两种模式。

    2.2.1 将来式

    public class Main {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
    
            ExecutorService service = Executors.newFixedThreadPool(10);
            long begin = System.currentTimeMillis();
            Future<String> future = service.submit(new Callable<String>() {
                @Override
                public String call() throws Exception {
                   return calculate();
                }
            });
            long end = System.currentTimeMillis();
            System.out.println("do otherthing");
            System.out.println(future.get());
            System.out.println(String.format("耗时%ss", String.valueOf(end - begin)));
        }
    
        public static  String calculate(){
            // 模拟耗时
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            return "100";
        }
    }
    

    提交任务到线程池后,主线程可以继续做自己的事情,但是future.get()还是阻塞的,所以如果在提交任务后直接get,并不会提高效率,反而由于线程的开销会比同步调用更慢。除了get,还可以通过isDone方法轮询检查是否计算完成,本质上和get没有什么区别。

    所以Future虽然可以实现异步,但是还是有缺点,对比JavaScript的异步,不满足一下几点:

    • 结果的获取还是通过get阻塞的方式,更好的方式是采用回调
    • 如果多个Future形成依赖,会产生回调地狱,如果可以链式调用就更好了。

    这种方式在Future的第二种回调式中声明了。

    2.2.2回调式

    这种方式就是回调,但是Future 并没有实现 callback,addListener 这样的方法,想要在 JAVA 中体验到 callback 的特性,得引入一些额外的框架。有Netty、Guava第三方框架可以使用, jdk1.8 已经提供了一种更为高级的回调方式:CompletableFuture。

    2.2.2.1 Guava-ListeningExecutorService
    public class Main {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
    
            long begin = System.currentTimeMillis();
            ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
            ListenableFuture<Integer> future = service.submit(new Callable<Integer>() {
                public Integer call() throws Exception {
                    System.out.println("执行耗时操作...");
                    calculate();
                    return 100;
                }
            });
    
            Futures.addCallback(future, new FutureCallback<Integer>() {
    
                @Override
                public void onSuccess(Integer result) {
                    System.out.println("计算成功:" + result);
                }
    
                @Override
                public void onFailure(Throwable t) {
                    System.out.println("计算失败:" + t.getMessage());
                }
            });
    
            long end = System.currentTimeMillis();
            System.out.println("do otherthing");
            System.out.println(String.format("耗时%ss", String.valueOf(end - begin)));
    
    
        }
    
        public static  String calculate(){
            // 模拟耗时
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            return "100";
        }
    }
    

    这下可以彻底不用管结果了,😌😌😌,但是还是有链式调用的问题,所以还是看看CompletableFuture的用法,这是一个非常好用的异步编程类

    2.3 JDK-CompletableFuture

    2.3.1初始化

    有几个常用的初始化静态工厂方法:

    CompletableFuture.runAsync(Runnable runnable);
    CompletableFuture.runAsync(Runnable runnable, Executor executor);
    
    CompletableFuture.supplyAsync(Supplier<U> supplier);
    CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor)
    
    • runAsync 方法接收的是 Runnable 的实例,意味着它没有返回值
    • supplyAsync 方法对应的是有返回值的情况
    • 这两个方法的带 executor,表示让任务在指定的线程池中执行,不指定的话,通常任务是在 ForkJoinPool.commonPool() 线程池中执行的,里面是守护线程。

    2.3.2 任务执行

    //前三个任务 A 无返回值,所以对应的,第 2 行和第 3 行代码中,resultA 其实是 null。
    CompletableFuture.runAsync(() -> {}).thenRun(() -> {}); 
    CompletableFuture.runAsync(() -> {}).thenAccept(resultA -> {}); 
    CompletableFuture.runAsync(() -> {}).thenApply(resultA -> "resultB");
    
    // thenRun(Runnable runnable),任务 A 执行完执行 B,并且 B 不需要 A 的结果。
    CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {});
    //thenAccept(Consumer action),任务 A 执行完执行 B,B 需要 A 的结果,但是任务 B 不返回值。
    CompletableFuture.supplyAsync(() -> "resultA").thenAccept(resultA -> {});
    //thenApply(Function fn),任务 A 执行完执行 B,B 需要 A 的结果,同时任务 B 有返回值。
    CompletableFuture.supplyAsync(() -> "resultA").thenApply(resultA -> resultA + " resultB");
    

    解释在注释里,如果任务 B 后面还有任务 C,往下继续调用 .thenXxx() 即可。

    2.3.3 异常处理

    public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn);
    public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
    

    看一个例子:

    CompletableFuture.supplyAsync(() -> "resultA")
        .thenApply(resultA -> resultA + " resultB")
        .thenApply(resultB -> resultB + " resultC")
        .thenApply(resultC -> resultC + " resultD");
    

    如果第一个出现了异常,其他的都不能执行,那么我们怎么处理异常呢?看下面的代码,我们在任务 A 中抛出异常,并对其进行处理:

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
       throw new RuntimeException();
    }).exceptionally(ex -> "errorResultA")
      .thenApply(resultA -> resultA + " resultB")
      .thenApply(resultB -> resultB + " resultC")
      .thenApply(resultC -> resultC + " resultD");
    
    System.out.println(future.join());
    

    上面的代码中,任务 A 抛出异常,然后通过 .exceptionally() 方法处理了异常,并返回新的结果,这个新的结果将传递给任务 B。所以最终的输出结果是:

    errorResultA resultB resultC resultD
    

    2.3.4 聚合任务

    CompletableFuture<String> cfA = CompletableFuture.supplyAsync(() -> "resultA");
    CompletableFuture<String> cfB = CompletableFuture.supplyAsync(() -> "resultB");
    
    //thenAcceptBoth 表示后续的处理不需要返回值
    cfA.thenAcceptBoth(cfB, (resultA, resultB) -> {});
    // thenCombine 表示需要返回值。
    cfA.thenCombine(cfB, (resultA, resultB) -> "result A + B");
    //不需要result
    cfA.runAfterBoth(cfB, () -> {});
    

    2.3.5 取多个任务的结果

    // N个Future全部完成才可以
    public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs){...}
    // 只需要满足一个就可以
    public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {...}
    
    CompletableFuture cfA = CompletableFuture.supplyAsync(() -> "resultA");
    CompletableFuture cfB = CompletableFuture.supplyAsync(() -> 123);
    CompletableFuture cfC = CompletableFuture.supplyAsync(() -> "resultC");
    
    CompletableFuture<Void> future = CompletableFuture.allOf(cfA, cfB, cfC);
    // 所以这里的 join() 将阻塞,直到所有的任务执行结束
    future.join();
    

    2.3.6 compose

    与thenCombine不同的是,compose的后一个是基于前一个的结果,而thenCombine只是把两个的结果组合在一起。

    CompletableFuture参考:https://www.javadoop.com/post/completable-future

    相关文章

      网友评论

        本文标题:Java异步

        本文链接:https://www.haomeiwen.com/subject/uxvtahtx.html