美文网首页
Flutter学习之Dart异步编程

Flutter学习之Dart异步编程

作者: echoSuny | 来源:发表于2020-04-19 15:59 被阅读0次

    1 isolate机制

    dart是基于单线程模型的语言。但是在开发中我们经常会进行耗时操作,比如网络请求。这种耗时操作会阻塞我们的代码,所以dart也有并发机制,名叫isolate。app的启动入口main函数就是一个类似Android主线程的一个主isolate。和java的Thread不同的是,dart中的isolate无法共享内存。
    isolate的英文直译就是隔离,孤立,脱离。在这里我们可以先简单的将其理解为java中的线程。后续根据代码来一步步弄清它到底是什么?

    void main(){
      Isolate.spawn(entryPoint, "hello");
    }
    
    void entryPoint(String msg){
        print(msg);
    }
    

    Isolate.spawn()方法的第一个参数需要我们传入一个方法(不清楚的可以看另一篇文章Flutter学习之Dart方法,异常与类)。于是我们在下面定义了一个方法entryPoint()并需要传入String类型的数据(数据类型可以是任意的类型),那么我们在Isolate.spawn()的第二个参数就要传入对应类型的数据。

    上面的main()函数我们就可以理解成Android中的主线程,下面的entryPoint就相当于是自线程。而 Isolate.spawn()就相当于创建了这个自线程。

    void main(){
      Isolate.spawn(entryPoint, "hello");
      print("你好");
    }
    
    void entryPoint(String msg){
        sleep(Duration(seconds:10));
        print(msg);
    }
    

    可以看到我们在entryPoint()中睡眠了10秒,但是主线程依然会继续执行且没有阻塞。
    前面我们说了Isolate是线程隔离的,那么怎么证明呢?

    int i;
    void main(){
      i =10;
      Isolate.spawn(entryPoint, "hello");
      print(i);
    }
    
    void entryPoint(String msg){
        sleep(Duration(seconds:10));
        print(i);
    }
    

    我们声明了一个全局的 i 且没有赋值。在主线程中我们赋于其10。但是根据最后打印的结果可以看到在entryPoint()中打印的却是null。这就证明了dart中的Isolate确实是线程隔离的。但是我们如果需要两个“线程”之间要相互通信该怎么办呢?我们知道在Android中我们可以通过Handler来进行通信,可惜的是dart没有提供Handler,但是提供了ReceivePort和SendPort。

    void main(){
      var rp = ReceivePort();
      rp.listen((t) {
          print(t);
       });
      rp.sendPort.send("hello");
      rp.sendPort.send("dart");
      rp.sendPort.send(66);
    }
    

    根据上面的代码我们可以得知SendPort是在ReceivePort里面的,SendPort一发送数据,ReceivePort就可以通过listen()函数监听到数据。listen函数中的 t 是没有类型的,那么就表示它是dart中的 dynamic 类型的,可以接收任何类型。但是上图中我们看到被黄色框框住的地方。这个地方是红色的表示我们的程序正在运行,如果运行完了的话就会自动变成灰色。可我们明明已经发送完数据了,程序怎么还没有结束呢?这是因为我们忘了添加一句代码:

    rp.close();

    如果你现在在最后添加上这句代码,反而会看不到任何的输出。这与后面的消息驱动机制有关,暂且先不说,稍后会介绍。只要大家记得发送完了要记得close()就可以了,就像java中使用IO流一样。
    上面是我们在一个线程中使用了ReceivePort和SendPort。下面就在main()和entryPoint()之间来使用:

    void main(){
    var rp = ReceivePort();
      rp.listen((t) {
          print(t);
       });
      Isolate.spawn(entryPoint, rp.sendPort);
    }
    
    void entryPoint(SendPort sendPort){
         sendPort.send("echo");
    }
    

    可以看到这样就完成了从“子线程” entryPoint到“主线程”main之间的通信了。 现在只是单向通信,如果要完成双向通信我们就还需要从main向entryPoint发送。和之前的子到主是一样的,我们只需要在entryPoint中把“子线程”的SendPort发给“主线程”,那么就可以在“主线程”给“子线程”发消息了。

    void main(){
    // 主线程的ReceivePort
    var rp = ReceivePort();
      rp.listen((t) {
          if( t is SendPort){
              // 可以在这里发送消息给子线程
          }
       });
      Isolate.spawn(entryPoint, rp.sendPort);
    }
    
    void entryPoint(SendPort sendPort){
          // 子线程的ReceivePort和SendPort
         var rp = ReceivePort();
         var sp = rp.sendPort;
          
         sendPort.send(sp)
         sendPort.send("echo");
    
        rp.listen((t) {
           // 这里可以接收主线程发来的消息
       });
    }
    

    2 Future

    dart的事件驱动流程

    前面我们说了dart中也是靠事件驱动(消息驱动)来推动整个程序的运行。我们都知道在Android中每个线程只有一个Looper和其对应的MessageQueue。但是在Dart中有两个队列:EventQueue(事件队列)和MicroTaskQueue(微任务队列):



    整个程序的运行时靠Event-Loop先从MicroTaskQueue中取出任务执行,当MicroTaskQueue中没有任务了再去EventQueue中取任务执行。当EventQueue执行完取出的这一个普通任务之后就会重新去检查MicroTaskQueue中是否有新的任务。如果有就执行MicroTaskQueue中的任务,没有则继续EventQueue中的任务。也就是说MicroTaskQueue中的任务优先级是高于EventQueue中的,即MicroTaskQueue中的任务可以插队在EventQueue中的任务之前。

    void main(){
      var rp = ReceivePort();
      rp.listen((t) {
          print(t);
       });
      rp.sendPort.send("hello --- 1");
      Future.microTask((){
          print("micro task execute --- 1")
      });
    
      rp.sendPort.send("hello --- 2");
      Future.microTask((){
          print("micro task execute --- 2")
      });
    }
    

    从执行结果来看微任务的优先级确实是要高于普通的任务,也跟我们的流程图上画的一样。
    了解了dart的整个事件的驱动流程之后我们来认识一下Future:
    Future的中文意思是未来,与其对应的就是java当中的FutureTask。在dart中就意味着使用Future创建的任务会在未来被执行。其实Future有很多的api,除了上面我们介绍过的Future.microTask()是向MicroTaskQueue中添加任务之外,其他的则是向EventQueue中添加。

    void main(){
        Future f = Future.delayed(Duration(seconds : 3));
        f.then((t){
          print(t);
        });
    }
    

    是不是跟我们Android中的Handler很像?上面的代码表示延迟3秒之后会得到一个未来的Future对象并执行。但是不意味着3秒过后会立即执行,因为你不能确定Queue中是否有其他的任务,其他的任务是不是会阻塞。所以print(t)方法的执行事件是会在3秒或大于3秒之后执行。

    void main(){
    Future<String> f =  new File("C:\user\test\a.txt").readAsString();
    f.then((String text){
        print(text);
    });
    }
    

    上面的例子我们是从一个文件中读取内容。可以看到readAsString()返回的是一个带有明确泛型类型的Future,也就是把读取文件内容的这么一个任务放入了队列中。那么我们就可以通过then()方法在未来的某个时间点得到文件中的内容。那如果我们在读取的过程中出现了异常该怎么办呢?很简单,只需要在后面添加一个方法就可以了:

    void main(){
    Future<String> f =  new File("C:\user\test\a.txt").readAsString();
    f.then((String text){
        print(text);
    }).catchError((e,s){
        // 捕获异常
    });
    }
    

    通过查看Future的源码得知Future的then()方法返回的是一个Future对象,而then方法中我们传递的匿名方法是可以有返回值的。

    void main(){
    Future<String> f =  new File("C:\user\test\a.txt").readAsString();
    f.then((String text){
        print(text);
        return 6;
    }).then((int i){
         print(i);
    });
    }
    

    可以看到当我们返回一个int类型的6时我们在调用then()方法的时候需要我们传入一个参数为int类型的匿名方法。如果不好理解可以拆开来理解:

    Future<String> f =  new File("C:\user\test\a.txt").readAsString();
    Future<int> then = f.then((String text){
        print(text);
        return 6;
    });
    then.then((int i){
        print(i);
    })
    
    // wait表示一组任务(我们传入了一个Future集合)执行完之后再做统一的一个处理
    // 具体场景的话可以认为同时去请求几个接口,但是需要每个都成功返回之后再去做后面的处理
    Future.wait([Future.delayed(Duration(seconds : 2), Future.delayed(Duration(seconds : 2)]).
    then((t){
        print("------")
    });
    // 按顺序执行
    Future.foreach([1,2,3,4]), ((t){
        print(t);
    });
    

    这其实就和Rxjava中的操作符一样,可以根据不同的需求做不同的事情。

    3 Stream

    Stream(流)在dart中也经常出现,表示发出一系列的异步数据。Stream是一个异步数据源,它是dart中处理异步事件流的统一api。
    与Future只能表示一次在未来异步获得的数据,而Stream可以表示多次异步获得的数据。比如IO处理的时候,File的readAsString()是一次性读取整个文件的内容进来,而Stream可以一部分一部分的读取。如果文件过大,使用Future显然会出现内存问题。

    单订阅模式
    void main(){
        // 返回一个Stream对象 泛型微List<int> 由于dart中没有byte类型,所以会用int来表示
        Stream<List<int>> stream =  new File("C:\user\test\a.txt").openRead();
        // 返回一个订阅者
        StreamSubscription subscriber = stream.listen((t){
            print("stream"); // 这里会多次调用
        });
    }
    

    在调用stream.listen()之后我们可以看到返回了一个订阅者StreamSubscription。那么既然是订阅者那么就对应的有取消订阅,任务完成等之类的回调:

    subscriber.cancel(); // 取消
    subscriber.onDone((){ //任务完成之后会调用此方法 
    });
    subscriber.onData((t){ // 此方法中的匿名方法会完全替代listen中的匿名方法。也就是说listen中的匿名方法不会再调用
    });
    subscriber.pause(); // 暂停
    
    subscriber.resume(); // 继续
    

    还有比较多的方法,这里就不一一展示了。

    多订阅模式

    如果采用上面的用法,那么只能调用一次stream.listen(),再次调用的话运行就会报错。幸运的是Stream支持多订阅,就跟和被观察者一样可以被多个观察者订阅。

    void main(){
        Stream<List<int>> stream =  new File("C:\user\test\a.txt").openRead();
        // 转成广播模式
        var broadcastStream = stream.asBroadcastStream();
        broadcastStream.listen((t){
            print("stream1");
        });
    
        broadcastStream.listen((t){
            print("stream2");
        });
    
        broadcastStream.listen((t){
            print("stream3");
        });
    }
    

    上面的广播是由单订阅转换成广播的,其实在dart中可以直接创建广播:

    void main(){
         var controller = StreamController.broadcast();
          //发送一个广播
         controller.add("echo");
         // controller.stream直接创建的就是广播
         controller.stream.listen((t){
            print(t);
         });
    }
    

    运行之后很遗憾的是没有监听到数据。这是因为如果是这种创建的广播需要先订阅,才能接收数据。也就是说必须先订阅再接收,所以需要把发送和监听这两行代码调换一下位置。

    void main(){
        var stream = Stream.fromIterable([1,2,3]);
        var broadcastStream = stream.asBroadcastStream();
        broadcastStream.listen((t){
            print("subscriber 1 :  ${t}");
        });
    
        broadcastStream.listen((t){
            print("subscriber 2 :  ${t}");
        });
        // ----------------------
         var controller = StreamController.broadcast();
         controller.add("echo");
         controller.stream.listen((t){
            print("controller :  ${t}");
         });
        // ----------------------
        broadcastStream.listen((t){
            print("subscriber 3 :  ${t}");
        });
    }
    

    上半部分是由单订阅的Stream转换成了多订阅的广播,中间部分则是和上一段代码一样直接创建了广播,最后则是由转换而来的广播又在controller这个直接创建的广播发送数据之后又去监听了一下。


    根据运行结果我们可以看到转换的广播是可以收到controller发出的数据的。由此我们可以发现转换的广播有些类似于粘性广播,可以在广播发出之后再订阅依然可以收到数据。

    3 async/await

    使用async和await的代码是异步的,但是看起来却很像同步代码。前面学习Future的时候我们知道可以使用then()方法实现先执行完一个操作之后再执行另外一个操作。但是利用async和await可以很好的解决这种频繁会调的问题。
    假设现在有一个需求要求我们按顺序的读取文件内容,就是A文件读完了再去读B文件,以此类推知道读完所有的文件。如果我们用Future该怎么实现呢?

    void main(){
        new File("xxx\xx\a.txt").readAsString()
      .then((s){
          return  new File("xxx\xx\b.txt").readAsString();
      }).then((s){
          return  new File("xxx\xx\c.txt").readAsString();
      }).then......
    
    }
    

    可以看到我们需要不停的调用then来进行回调。虽然写起来也不是很麻烦,但是我们有更优雅的方式为什么不用呢?

    Future<String> readFile() async {
       String text1 = await new File("xxx\xx\a.txt").readAsString();
       String text2 = await new File("xxx\xx\b.txt").readAsString();
       String text3 = await new File("xxx\xx\c.txt").readAsString();
        ......
      return "text${text2}${text3}......";
    }
    
    void main(){
        readFile().then((String text){
            print(text);
        });
    }
    
    

    首先async和await是两个关键字。被async修饰的方法表示这是一个异步方法,返回值只能是void或者Future。await必须用在async方法中,表示转成同步执行(串行),也就是等待上一个执行完再执行下一个。

    相关文章

      网友评论

          本文标题:Flutter学习之Dart异步编程

          本文链接:https://www.haomeiwen.com/subject/edynvhtx.html