美文网首页Flutter
深耕flutter异步编程---flutter异步编程,看着一篇

深耕flutter异步编程---flutter异步编程,看着一篇

作者: CSDN老黑牛 | 来源:发表于2020-05-19 22:15 被阅读0次

    Isolate

    基本概念

    Isolate更像是一个进程,所有的dart代码都会在上面运行,其内部有一个线程,用来处理event loop,还有一块内存,并且这块内存是私有的,也就说两个Isolate不共享内存(这也是和Java 的Thread的主要区别),要是多个Isolate想共同工作,只能通过port来发送消息进行通信(一般会把特别耗时的操作,比如IO操作、图片压缩等容易引起main isolate卡顿的操作放到一个单独的Isolate中运行)。这样做的好处是分配内存和垃圾回收的时候不用对操作加锁,对于像flutter这种需要快速构建或者拆卸大量widget的应用来说很有帮助。


    两个独立运行的Isolate

    创建Isolate

    两种方式创建:

    1. 通过静态方法Isolate.spawn创建(常用)
    external static Future<Isolate> spawn<T>(
          void entryPoint(T message), T message,
          {bool paused: false,
          bool errorsAreFatal,
          SendPort onExit,
          SendPort onError,
          @Since("2.3") String debugName});
    

    entryPoint入参必须是一个顶层函数或者一个static方法,不接受一个函数表达式或者一个实例的方法,否则会Invalid argument(s): Isolate.spawn expects to be passed a static or top-level function的错误。

    1. 通过构造函数创建(几乎用不到)
    Isolate(this.controlPort, {this.pauseCapability, this.terminateCapability});
    

    使用这种方法创建Isolate必须自己传入pause和terminate能力,否则调用pausekill方法是没有任何效果的。

    Isolate间通信

    • 不同的Isolate之间通信是通过 ReceivePortSendPort来实现的。
    • 可以在Isoalte之间传输的数据类型有基础类型(null, num, bool, double,String)、SendPortListMapListMap元素的类型也必须是允许传输的类型,允许传递SendPort是实现双向通信的基础),实践中发现也能传递一些属性为基础类型的简单对象,在1.17.1版本中甚至能传递future对象。
    • ReceivePort 实现了Stream类,是一个非广播的Stream,所以只允许有一个监听者。
    • 通过看C的源码可以知道,每个Isolate其实对应一个线程,其背后是有一个线程池的。
    • 创建Isolate时会创建对应的堆空间、消息处理handler等等对应的信息。

    单向通信

    void startOnWay() async {
      ReceivePort receivePort = ReceivePort();
      Isolate isolate = await Isolate.spawn(oneWayEntryPoint, receivePort.sendPort);
      receivePort.listen((data) {
        print('$data');
        receivePort.close();//如果不关的,它会一直监听
        isolate?.kill(priority: Isolate.immediate);//杀死isolate
        isolate = null;
      });
    }
    
    void oneWayEntryPoint(SendPort sendPort) {
      sendPort.send('hello world!');
    }
    

    双向通信

    void startBothWay() async {
      ReceivePort receivePort = ReceivePort();
    
      await Isolate.spawn(entryPointBothWay, receivePort.sendPort);
    
      receivePort.listen((data) {
        if (data is SendPort) {
          data.send('hello child!');
        } else {
          print('parent: $data');
        }
      });
    }
    
    void entryPointBothWay(SendPort sendPort) {
      ReceivePort r = ReceivePort();
      sendPort.send(r.sendPort);
    
      r.listen((data) {
        print('child: $data');
        sendPort.send('hello parent!');
      });
    }
    

    compute函数

    在flutter中实现isolate间通信,可以直接使用更方便的API--compute函数。它实际上是对isolate之间通信的封装,每次调用后都会执行isolate.kill方法。demo如下:

    class Food {
      String name;
      double price;
    
      Food(this.name, this.price);
    }
    
    class AsyncDemo extends StatefulWidget {
      @override
      _AsyncState createState() => _AsyncState();
    }
    
    class _AsyncState extends State<AsyncDemo> {
      var food = Food('apple', 23.1);
    
      @override
      Widget build(BuildContext context) {
        return Scaffold(
          body: Center(
            child: Text(
              '''
              name:   ${food.name}
              price:  ${food.price}
              ''',
              style: TextStyle(
                fontSize: 22.0,
              ),
            ),
          ),
          floatingActionButton: FloatingActionButton(onPressed: () {
            compute<Food, Food>(handleCounter, food).then((value) {
              setState(() {
                food = value;
              });
            });
          }),
        );
      }
    }
    
    Food handleCounter(Food food) {
      return food..price = 1.8;
    }
    

    EventLoop

    通过Isolate我们知道dart app是个单线程app,那他是怎么实现异步操作的呢?答案是EventLoop和他相关联的两个队列--event queue、microtask queue。

    Event Queue

    event queue包含几乎所有的事件,比如IO操作、绘制操作、多个isolate之间通信、Future的部分操作等等。

    Microtask Queue

    这个队列用来执行用时很短的事件,这些事件会在把控制权交给Event Queue之前运行完毕。在实际开发中使用场景十分有限,整个Flutter源码里面总共有7处使用到了Microtask Queue(flutter中将事件提交到Microtask Queue上是使用 scheduleMicrotask api来操作的)。

    执行顺序

    执行顺序

    从上图可以看到,总是会先执行microtask,当microtask queue空的时候才会去执行event queue里的事件,每执行完一个event,都会重新去执行microtask queue里的事件。如果执行microtask里的事件消耗太多的时间,容易造成event queue阻塞,从而造成app的卡顿。
    看下面demo,

    import 'dart:async';
    main() {
      print('main #1 of 2');
      scheduleMicrotask(() => print('microtask #1 of 3'));
    
      new Future.delayed(new Duration(seconds:1),// 1
          () => print('future #1 (delayed)'));
    
      new Future(() => print('future #2 of 4')) // 2
          .then((_) => print('future #2a'))
          .then((_) {
            print('future #2b');
            scheduleMicrotask(() => print('microtask #0 (from future #2b)'));
          })
          .then((_) => print('future #2c')); // X
    
      scheduleMicrotask(() => print('microtask #2 of 3'));
    
      new Future(() => print('future #3 of 4')) // 3
          .then((_) => new Future(
                       () => print('future #3a (a new future)')))// 5
          .then((_) => print('future #3b'));// Y
    
      new Future(() => print('future #4 of 4')); // 4
      scheduleMicrotask(() => print('microtask #3 of 3'));
      print('main #2 of 2');
    }
    

    输出:

    main #1 of 2
    main #2 of 2 // a
    microtask #1 of 3
    microtask #2 of 3
    microtask #3 of 3 // b
    future #2 of 4
    future #2a
    future #2b // c
    future #2c
    microtask #0 (from future #2b)
    future #3 of 4
    future #4 of 4
    future #3a (a new future)
    future #3b
    future #1 (delayed)
    

    分析
    程序会从上到下一次执行main方法里的同步方法,所以会依此输出到标 a的位置。scheduleMicrotask会把任务提交到microtask queue里,future会被提交到event queue里,所以先执行scheduleMicrotask的操作,对应输出中标记 b 的地方。microtaks queue里的事件都被处理完了,接下来,eventloop开始处理event queue里的事件。按道理来讲代码里标记 1 2 3 4的future会被依此放到event queue队列中,但是标 1 的地方有一秒的延迟,所以标 1 的地方会在一秒后才会被放到event queue里,所以此时event queue里的事件排序是:2 3 4。先执行 2,依此输出到标记 c 的地方。这个时候看到了一个scheduleMicrotask,所以会把这个事件放到microtask的队列中去,此时这个future还未执行完,继续执行,输出future #2c。执行完毕后,event queue里的事件变成了 3 4,回去接着执行microtask queue里的事件,输出microtask #0 (from future #2b)。此时microtask queue里没有事件了,接着执行event queue里的事件,也就是事件 3,输出future #3 of 4,当执行到 5 的时候,又生成了一个新的future,放到了event queue里,事件3也就执行完了,此时event queue里的事件变成了 4(等待执行) 5(等待执行)。接着执行事件4,输出future #4 of 4,接着执行事件 5,依此输出future #3a (a new future) future #3b,最后经过一秒后事件1入队被执行,也就是最后输出的future #1 (delayed)。
    需要注意的是代码中标记X和Y的地方,标记X的地方,由于他的上一步并没有返回Future,所以标X的操作是在事件2后面进行的,而标记Y的地方,他上一步返回了一个Future对象,也就是事件 5,所以这个then函数里的操作是跟在事件5后面进行的。

    Future

    按官方文档上的介绍:它代表在将来某一时刻会获取到想要的值。当一个返回Future的函数被调用的时候,会发生两件事情:

    1. 这个Future对象会进去队列中排队等待被执行,此时会直接返回一个未完成状态的Future对象,但是这个函数是不进去队列的,它仅仅是被调用而已。
    2. 当值可获得的时候,Future处于一个叫 Completed with value 的状态,当执行失败时,处于一个叫做Completed with error的状态。

    Future的三种状态:

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-GQwBThcr-1589897339224)(/Users/jackshen/Pictures/1_UD63BMoIBmzoA6jo3LjCCg.png)]

    要想使用Future,可以直接使用其提供的api,也可以使用dart提供的语法糖async、await。

    async、await

    main() {
      fetchData();
    }
    
    void fetchData() async {
      print('fetch data start');
      await Future.delayed(Duration(seconds: 2));
      print('Data is returned!');
    }
    
    //输出:
    //fetch data start
    //Data is returned!(两秒后会输出)
    

    需要注意的是,在dart 2.0 之前,async中的同步代码不会运行,遇见async就直接返回一个未完成的Future对象。当被async标记的函数没有返回值时,会返回一个Future<void>对象,有返回值的话,返回一个Future<T>对象。

    Future API

    Future()

    void main() {
      var future = Future((){
        return 'hello world!';
      });
    
      future.then(print);
    }
    

    背后是通过Timer来实现的,直接看源码:

    factory Future(FutureOr<T> computation()) {
      _Future<T> result = new _Future<T>();
      Timer.run(() {
        try {
          result._complete(computation());
        } catch (e, s) {
          _completeWithErrorCallback(result, e, s);
        }
      });
      return result;
    }
    

    Future.delayed

      Future.delayed(Duration(seconds: 1), () {
        print('delayed 1');
      });
    

    1秒后输出相应值,其背后也是用的timer来实现,只不是传了一个duration对象过去。

    factory Future.delayed(Duration duration, [FutureOr<T> computation()]) {
      _Future<T> result = new _Future<T>();
      new Timer(duration, () {
        if (computation == null) {
          result._complete(null);
        } else {
          try {
            result._complete(computation());
          } catch (e, s) {
            _completeWithErrorCallback(result, e, s);
          }
        }
      });
      return result;
    }
    

    Future.value

    Future.value(1).then(print);
    

    背后调用的源码如下(只考虑value非Future的情况下):

    void _asyncCompleteWithValue(T value) {
      _setPendingComplete();
      _zone.scheduleMicrotask(() {
        _completeWithValue(value);
      });
    }
    

    可以看到背后并没有调用Timer相关api,使用的是scheduleMicrotask,也就是说Future.value函数是在mictotask queue里完成的,可以通过下面的例子验证:

    Future.delayed(Duration(seconds: 0)).then((value) => print('Future.delayed'));
    Future.value('Future.value').then(print);
    
    //输出:
    //Future.value  ---优先执行microtask queue里的事件。
    //Future.delayed
    

    Future.sync

    Future.sync(() => 'sync').then(print);
    

    同步执行代码,下面是源码实现:

    factory Future.sync(FutureOr<T> computation()) {
      try {
        var result = computation();
        if (result is Future<T>) {
          return result;
        } else {
          return new _Future<T>.value(result as dynamic);
        }
      } catch (error, stackTrace) {
        var future = new _Future<T>();
        AsyncError? replacement = Zone.current.errorCallback(error, stackTrace);
        if (replacement != null) {
          future._asyncCompleteError(replacement.error, replacement.stackTrace);
        } else {
          future._asyncCompleteError(error, stackTrace);
        }
        return future;
      }
    }
    

    要注意的是如果computation返回一个非future对象,会通过Future.value创建对象Future对象并返回,别忘了Future.value是在microtask 里完成的。再对比下下面两个例子的输出:

    Demo1:
    Future.delayed(Duration(seconds: 0)).then((value) => print('delayed'));
    Future.sync(() => Future.delayed(Duration(seconds: 0)))
        .then((value) => print('sync'));
    //输出:
    //delayed
    //sync
    
    Demo2:
    Future.delayed(Duration(seconds: 0)).then((value) => print('delayed'));
    Future.sync(() => 'sync').then(print);
    //输出:
    //sync
    //delayed
    

    Future.wait

    final f1 = Future.delayed(Duration(seconds: 1)).then(
      (value) => 'first future.',
    );
    
    final f2 = Future.delayed(Duration(seconds: 2)).then(
      (value) => 'second future.',
    );
    
    Future.wait([f1, f2]).then((value) => print(value));
    //输出:
    //[first future., second future.]
    

    等待所有的future完成并返回一个包含所有future执行结果的list,一旦有一个future执行过程中出现了error,最后的结果都不会输出。源码实现:

    static Future<List<T>> wait<T>(Iterable<Future<T>> futures,
        {bool eagerError: false, void cleanUp(T successValue)}) {
      final _Future<List<T>> result = new _Future<List<T>>();
      List<T> values; // Collects the values. Set to null on error.
      int remaining = 0; // How many futures are we waiting for.
      var error; // The first error from a future.
      StackTrace stackTrace; // The stackTrace that came with the error.
    
      // Handle an error from any of the futures.
      // TODO(jmesserly): use `void` return type once it can be inferred for the
      // `then` call below.
      handleError(Object theError, StackTrace theStackTrace) {
        remaining--;
        if (values != null) {
          if (cleanUp != null) {
            for (var value in values) {
              if (value != null) {
                // Ensure errors from cleanUp are uncaught.
                new Future.sync(() {
                  cleanUp(value);
                });
              }
            }
          }
          values = null;
          if (remaining == 0 || eagerError) {
            result._completeError(theError, theStackTrace);
          } else {
            error = theError;
            stackTrace = theStackTrace;
          }
        } else if (remaining == 0 && !eagerError) {
          result._completeError(error, stackTrace);
        }
      }
    
      try {
        // As each future completes, put its value into the corresponding
        // position in the list of values.
        for (var future in futures) {
          int pos = remaining;
          future.then((T value) {
            remaining--;
            if (values != null) {
              values[pos] = value;
              if (remaining == 0) {
                result._completeWithValue(values);
              }
            } else {
              if (cleanUp != null && value != null) {
                // Ensure errors from cleanUp are uncaught.
                new Future.sync(() {
                  cleanUp(value);
                });
              }
              if (remaining == 0 && !eagerError) {
                result._completeError(error, stackTrace);
              }
            }
          }, onError: handleError);
          // Increment the 'remaining' after the call to 'then'.
          // If that call throws, we don't expect any future callback from
          // the future, and we also don't increment remaining.
          remaining++;
        }
        if (remaining == 0) {
          return new Future.value(const []);
        }
        values = new List<T>(remaining);
      } catch (e, st) {
        // The error must have been thrown while iterating over the futures
        // list, or while installing a callback handler on the future.
        if (remaining == 0 || eagerError) {
          // Throw a new Future.error.
          // Don't just call `result._completeError` since that would propagate
          // the error too eagerly, not giving the callers time to install
          // error handlers.
          // Also, don't use `_asyncCompleteError` since that one doesn't give
          // zones the chance to intercept the error.
          return new Future.error(e, st);
        } else {
          // Don't allocate a list for values, thus indicating that there was an
          // error.
          // Set error to the caught exception.
          error = e;
          stackTrace = st;
        }
      }
      return result;
    }
    

    整体来说很简单,可以看到内部维护了一个future的列表,当全部执行完毕后返回结果,如果出错就直接返回错误信息。

    Future.microtask

    看起一个microtask的简单写法,用法同scheduleMicrotask,内部也是通过scheduleMicrotask来实现的,直接看源码吧:

    factory Future.microtask(FutureOr<T> computation()) {
      _Future<T> result = new _Future<T>();
      scheduleMicrotask(() {
        try {
          result._complete(computation());
        } catch (e, s) {
          _completeWithErrorCallback(result, e, s);
        }
      });
      return result;
    }
    

    异常处理

    async、await的异常处理

    async、await的异常处理和同步代码的异常处理是一样的,采用try catch的方法,同样也是可以使用finally的:

    main() {
      errorDemo();
    }
    
    void errorDemo() async {
      try {
        await Future.delayed(Duration(seconds: 1)).then(
          (value) => throw '####Error####',
        );
      } catch (e) {
        print(e);
      } finally {
        print('finally compeleted.');
      }
    }
    // 输出:
    // ####Error####
    // finally compeleted.
    

    Future API 的异常处理

    方法简介

    Future api提供两种方式来处理异常,一个是Future<R> then<R>(FutureOr<R> onValue(T value), {Function onError}); 里的onError回调,另一个是Future<T> catchError(Function onError, {bool test(Object error)});

    • 使用onError回调

      main() {
        Future.delayed(Duration(seconds: 1), () {
          throw '#####Error#####';
        }).then(print, onError: (error) {
          print('has error');
        }).whenComplete(() => print('has complete!'));
      }
      // 输出:
      // has error
      // has complete!
      

      可以看到并没有报错,而是按照预期依此调用了onError、whenCompelte(相当于try catch里的 finally)回调。这有一个小细节需要注意的是,onError和onValue是同级的,当onValue里面发生异常时是不会走同级的onError回调的,onValue和onError是服务于一个future的。

    • 使用catchError

      Future.delayed(Duration(seconds: 1), () {
        throw '#####Error#####';
      }).catchError((error){
        print('has error');
      });
      // 输出:
      // has error
      

      catchError和onError回调的用法差不多,最后也是输出了has error。看他的api,发现还有一个叫做test的可选命名参数,它是做什么的呢?只有当test返回true的时候才会走catchError回调,返回为false时会直接报错。他的入参是error对象,所以可以在test里面做一些特殊处理,比如error类型的过滤等等,默认情况下test始终返回true。

      main() {
        Future.delayed(Duration(seconds: 1), () {
          throw '#####Error#####'; // 1
        }).catchError((String error) {
          print('has error');
        }, test: (error) {
          if (error is String) {
            return true;
          }
          return false;
        });
      }
      

      可以把标1的地方改成 throw 1再运行下试试效果。

    onError和catchError的优先级问题

    main() {
      Future.delayed(Duration(seconds: 1), () {// 1
        throw '##### Error #####'; // 2
      }).then(print, onError: (error) { // 3
        print('onError callback'); // 4
      }).catchError((error) { 
        print('catchError callback');
      });
    }
    // 输出:
    // onError callback.
    

    运行完delayed方法后,会返回一个Future,叫他Future1,标记 3 处的onError里的回调只对Future 1负责,Future 1爆了异常所以会走到onError回调里,也就是标记 4 的地方,一旦onError捕获了异常就不会再去执行catchError里的代码。如果说标 3 的地方并没有设置onError回调,才会往catchError回调里走。

    链式调用的异常处理

    链式调用中,一旦某个环节爆了异常,下面的每一个节点都会返回相同的错误,直到遇到catchError回调或者被下面的某个节点的onError回调拦截,直接看官网的一个例子吧:

    Future<String> one() => new Future.value("from one");
    
    Future<String> two() => new Future.error("error from two");
    
    Future<String> three() => new Future.value("from three");
    
    Future<String> four() => new Future.value("from four");
    
    void main() {
      one() // Future completes with "from one".
          .then((_) => two()) // Future completes with two()'s error.
          .then((_) => three()) // Future completes with two()'s error.
          .then((_) => four()) // Future completes with two()'s error.
          .then((value) => Future.value(1)) // Future completes with two()'s error.
          .catchError((e) {
        print("Got error: ${e.error}"); // Finally, callback fires.
        return 42; // Future completes with 42.
      }).then((value) {
        print("The value is $value");
      });
    }
    
    // Output of this program:
    //   Got error: error from two
    //   The value is 42
    

    同步、异步混合异常处理

    无论是async、await,还是Future,都是异步执行的,上面介绍的错误处理方式只能处理异步的异常,那么同步代码和异步代码混合的情况下该怎么处理异常呢?看下面的例子:

    void main() {
      getLegalErpLength().then(print).catchError((error) {
        print('error happen!');
      });
    }
    
    Future<int> getLegalErpLength() {
      final erp = findErpFromDb(); // 1
      return Future.value(erp).then(parseErp);
    }
    
    int parseErp(String erp) {
      return erp.length;
    }
    
    String findErpFromDb() {
      throw 'unknown error';
    }
    
    //输出:
    Unhandled exception:
    unknown error
    #0      findErpFromDb 
    #1      getLegalErpLength 
    #2      main
    #3      _startIsolate.<anonymous closure> (dart:isolate-patch/isolate_patch.dart:301:19)
    #4      _RawReceivePortImpl._handleMessage (dart:isolate-patch/isolate_patch.dart:168:12)
    
    Process finished with exit code 255
    

    getLegalErpLength是一个异步方法,里面调用了同步方法:findErpFromDb,当findErpFromDb出现异常的时候getLegalErpLength时无法进行捕获的,所以直接输出了上面的错误栈。那如果想捕获findErpFromDb的异常应该怎么写呢?需要引入Future.sync ,修改如下:

    Future<int> getLegalErpLength() {
      return Future.sync(() {
        final erp = findErpFromDb();
        return Future.value(erp).then(parseErp);
      });
    }
    // 输出:
    // error happen!
    

    使用Future.sync将同步方法和异步方法包装一下,这样就完美的解决了上面的问题。

    Future原理简介[1]

    • Future()、Future.delayed()

      背后使用的timer来实现,具体的源码可以参考dart sdk中的timer_impl.dart文件。简单来讲,是用一个_TimerHeap的数据结构来存取timer,内部采用堆排序算法对其进行排序,当预先设置的时间到时,有一个eventhandler的东西负责处理设置的回调。

    • Future.value()、Future.microtask()

      内部层层调用后,最终调用scheduleMicrotask方法,将其放到microtask queue中执行。内部也维护了一个loop,将每一个回调封装成_AsyncCallbackEntry放到loop中,然后循环执行,直到loop为空。具体可以参考schedule_microtask.dart文件。

    创建Future的另外一种方式:Completer

    completer可以用来创建future,相比使用future,completer可以自己指定future的完成时机。在开发中,能使用future的地方就直接使用future,它比completer更易读,除非有些情况下拿不到想要的future对象,可以使用completer来替代,比如下面这个获取图片长宽的例子:

    class CompleterDemo extends StatelessWidget {
      Widget build(BuildContext context) {
        Image image = new Image.network('https://img01.e23.cn/2020/0519/20200519022445869.jpg');
        Completer<ui.Image> completer = new Completer<ui.Image>();
        image.image.resolve(new ImageConfiguration()).addListener(ImageStreamListener(
          (image, synchronousCall) {
            completer.complete(image.image);
          },
        ));
        return new Scaffold(
          appBar: new AppBar(
            title: new Text("Image Dimensions Example"),
          ),
          body: new ListView(
            children: [
              new FutureBuilder<ui.Image>(
                future: completer.future,
                builder: (BuildContext context, AsyncSnapshot<ui.Image> snapshot) {
                  if (snapshot.hasData) {
                    return new Text(
                      '${snapshot.data.width}x${snapshot.data.height}',
                      style: Theme.of(context).textTheme.display3,
                    );
                  } else {
                    return new Text('Loading...');
                  }
                },
              ),
              image,
            ],
          ),
        );
      }
    }
    

    Stream

    相比future,stream可以用来处理数据流。Stream分为两种,一种是single subscription streams,只允许监听一次,另外一种是Broadcast streams,可以允许监听多次。直接看Stream的API你会发现,这东西和RxJava实在是太像了,一些API的名字和用法几乎差不多,所以这里不再对Stream的常用API进行梳理,感兴趣的直接去看官方文档吧,对RxJava有了解的同学学起来是很容易的。

    创建Stream

    • Stream自带的一些factory构造函数,比如Stream.fromFuture(Future<T> future)、Stream.fromIterable(Iterable<T> elements)等等。以fromIterable为例:

      Stream.fromIterable([1, 2]).listen((event) {
        print('event is $event');
      });
      
      // 输出:
      // event is 1
      // event is 2
      
    • 异步生成器函数:async* and yield(那对应的同步生成器函数使用的是 sync* and yield)

      main() async {
        print('Start...');
        await for (int number in generateNumbers(2)) { // 1
          print(number);
        }
        print('End');
      }
      
      Stream<int> generateNumbers(int maxCount) async* {
        print('yield output: ');
        for (int i = 0; i < maxCount; i++) {
          yield i; // 2
        }
      
        print('yield* output: ');
        yield* createNumbers(maxCount); // 3
      }
      
      Stream<int> createNumbers(int maxCount) async* {
        for (int i = 0; i <= maxCount; i++) {
          yield i;
        }
      }
      
      
      // 输出:
      // Start...
      // yield output: 
      // 0
      // 1
      // yield* output: 
      // 0
      // 1
      // 2
      // End
      
      

      需要注意两点:

      1、异步生成器的产物Stream可以使用await for进行遍历,await for必须实在async标记的函数中才会有效,另外同步生成器的产物Iterable可以使用for进行遍历。

      2、代码中标记2的地方使用的是 yield,标记3的地方使用的是yield*,他俩的区别是一个生成单个值,一个生成一串值,也就是一个Stream。

    • 使用StreamController创建Stream

      使用StreamController创建Stream可以在任何地方、任何时间添加event并处理event,但是和async相比要复杂的多,最重要的一点是async创建的Stream不会马上执行,当第一个注册者注册的时候才会执行,但是利用StreamController创建的Stream则不然,看下面的代码:

      main() async {
        var counterStream = timedCounter(const Duration(seconds: 1), 5);
        await Future.delayed(Duration(seconds: 3), () {
          print('3 seconds has gone.');
        });
        counterStream.listen((value) {
          print('$value -- ${DateTime.now()}');
        }); 
      }
      
      Stream<int> timedCounter(Duration interval, [int maxCount]) {
        var controller = StreamController<int>();
        int counter = 0;
        void tick(Timer timer) {
          counter++;
          controller.add(counter); // Ask stream to send counter values as event.
          if (maxCount != null && counter >= maxCount) {
            timer.cancel();
            controller.close(); // Ask stream to shut down and tell listeners.
          }
        }
      
        Timer.periodic(interval, tick); // BAD: Starts before it has subscribers.
        return controller.stream;
      }
      // 输出:
      3 seconds has gone.
      1 -- 2020-05-19 21:52:20.843943
      2 -- 2020-05-19 21:52:20.847416
      3 -- 2020-05-19 21:52:20.847438
      4 -- 2020-05-19 21:52:21.831694
      5 -- 2020-05-19 21:52:22.828021
      

    可以看到前三个几乎是同时输出的,这是因为StreamController一旦创建Stream,里面的代码就开始执行,生成的数据会缓存下来,当有注册的时候立马把缓存输出给监听者,为了避免这种情况,可以使用StreamController各种回调方法进行限制,上面的代码修改如下:

    Stream<int> timedCounter(Duration interval, [int maxCount]) {
      StreamController controller;
      int counter = 0;
      Timer timer;
    
      void tick(_) {
        counter++;
        controller.add(counter);
        if (maxCount != null && counter >= maxCount) {
          timer.cancel();
          controller.close();
        }
      }
    
      void startTimer() {
        timer = Timer.periodic(interval, tick);
      }
    
      controller = StreamController<int>(
        onListen: () {
          startTimer();
        },
      );
      return controller.stream;
    }
    
    //输出:
    3 seconds has gone.
    1 -- 2020-05-19 22:02:32.488114
    2 -- 2020-05-19 22:02:33.479319
    3 -- 2020-05-19 22:02:34.477549
    4 -- 2020-05-19 22:02:35.476372
    5 -- 2020-05-19 22:02:36.481389
      
    

    StreamController还有onCancel、onPause、onResume三个回调,用来处理取消、暂停、继续等操作。

    相关文章

      网友评论

        本文标题:深耕flutter异步编程---flutter异步编程,看着一篇

        本文链接:https://www.haomeiwen.com/subject/fhaiohtx.html