Flutter 80: 初识 Flutter Stream (一

作者: 阿策神奇 | 来源:发表于2020-03-22 20:43 被阅读0次

          小菜在之前尝试 EventChannel 时曾经用到过 Stream 流数据,现在准备学习一下 flutter_bloc 时也主要用到 Stream 来做异步处理,于是简单学习一下何为 Stream

    A source of asynchronous data events.

          Stream 主要应用于 Flutter 的异步操作,在其他编程语言中也存在;Stream 提供了一种接受事件队列的方法,可通过 listen 进行数据监听,通过 error 接收失败状态,通过 done 来接收结束状态;

    1. Stream 创建

          Flutter 提供了多种创建 Stream 的方式;

    1.1 Stream.fromFuture(Future<T> future)

          Stream 通过 Future 创建新的单订阅流,当 Future 完成时会触发 data / error,然后以 done 事件结束;

    Future<String> getData() async {
      await Future.delayed(Duration(seconds: 3));
      return '当前时间为:${DateTime.now()}';
    }
    
    _streamFromFuture() {
      Stream.fromFuture(getData())
          .listen((event) => print('Stream.fromFuture -> $event'))
          .onDone(() => print('Stream.fromFuture -> done 结束'));
    }
    
    1.2 Stream.fromFutures(Iterable<Future<T>> futures)

          Stream 通过一系列的 Future 创建新的单订阅流,每个 Future 都会有自身的 data / error 事件,当这一系列的 Future 均完成时,Streamdone 事件结束;若 Futures 为空,则 Stream 会立刻关闭;其分析源码,很直接的看到是将每一个 Future 事件监听完之后才会执行的微事件结束;

    ====================================== 源码 ======================================
    factory Stream.fromFutures(Iterable<Future<T>> futures) {
        _StreamController<T> controller =
            new _SyncStreamController<T>(null, null, null, null);
        int count = 0;
        var onValue = (T value) {
          if (!controller.isClosed) {
            controller._add(value);
            if (--count == 0) controller._closeUnchecked();
          }
        };
        var onError = (error, StackTrace stack) {
          if (!controller.isClosed) {
            controller._addError(error, stack);
            if (--count == 0) controller._closeUnchecked();
          }
        };
        for (var future in futures) {
          count++;
          future.then(onValue, onError: onError);
        }
        // Use schedule microtask since controller is sync.
        if (count == 0) scheduleMicrotask(controller.close);
        return controller.stream;
    }
    ====================================== 测试 ======================================
    _streamFromFutures() {
      var data = [getData(), getData(), getData()];
      Stream.fromFutures(data)
          .listen((event) => print('Stream.fromFutures -> $event'))
          .onDone(() => print('Stream.fromFutures -> done 结束'));
    }
    
    1.3 Stream.fromIterable(Iterable<T> elements)

          Stream 通过数据集合中获取并创建单订阅流,通过 listen 监听迭代器中每一个子 element,当 Stream 监听到取消订阅或 Iterator.moveNext 返回 false / throw 异常 时停止迭代;

    _streamFromIterable() {
      var data = [1, 2, '3.toString()', true, false, 6];
      Stream.fromIterable(data)
          .listen((event) => print('Stream.fromIterable -> $event'))
          .onDone(() => print('Stream.fromIterable -> done 结束'));
    }
    
    1.4 Stream.periodic(Duration period, [T computation(int computationCount)])

          Stream 通过 Duration 对象作为参数创建一个周期性事件流,其中若不设置 computationonData 获取数据为 null;若没有事件结束则会一直周期性执行;

    _streamFromPeriodic() {
      Duration interval = Duration(seconds: 1);
      // onData 获取数据为 null
      Stream<int> stream = Stream<int>.periodic(interval);
      stream.listen((event) {
      print('Stream.periodic -> $event');
      }).onDone(() {
      print('Stream.periodic -> done 结束');
      });
    
      // onData 获取数据为 int 类型 data
      Stream<int> streamData = Stream<int>.periodic(interval, (data) => data);
      streamData.listen((event) {
        print('Stream.periodic -> $event');
        if (event >= 10) {}
      }).onDone(() {
        print('Stream.periodic -> done 结束');
      });
    }
    

    2. Stream 基本操作

    2.1 Stream<T> take(int count)

          take() 对于单订阅方式,可以提供 take 设置之前的 Stream 订阅数据,例如设置中断 Stream.periodic 周期展示次数;小菜粗略理解为 take 可以作为中断订阅,如果 take 设置次数大于 onDone 之前的订阅数据次数,Stream 依旧获取所有 onDone 之前的订阅数据;

    _streamFromPeriodic() {
      Duration interval = Duration(seconds: 1);
      Stream<int> streamData = Stream<int>.periodic(interval, (data) => data + 1);
      streamData.take(5).listen((event) {
        print('Stream.periodic -> $event');
      }).onDone(() {
        print('Stream.periodic -> done 结束');
      });
    }
    
    _streamFromIterable() {
      var data = [1, 2, '3.toString()', true, false, 6];
      Stream.fromIterable(data)
          .take(8)
          .listen((event) => print('Stream.fromIterable -> $event'))
          .onDone(() => print('Stream.fromIterable -> done 结束'));
    }
    
    2.2 Stream<T> takeWhile(bool test(T element))

          takeWhile 也可以实现上述相同效果,通过 test 返回一个 boolean 类型,如果为 false 则中断订阅;

    _streamFromPeriodic() {
      Duration interval = Duration(seconds: 1);
      Stream<int> streamData = Stream<int>.periodic(interval, (data) => data + 1);
      streamData.takeWhile((element) {
        print('Stream.periodic.takeWhile -> $element');
        return element <= 5;
      }).listen((event) {
        print('Stream.periodic -> $event');
      }).onDone(() {
        print('Stream.periodic -> done 结束');
      });
    }
    
    2.3 Stream<T> where(bool test(T event))

          where 用于在当前 Stream 中创建一个新的 Stream 用来丢弃不符合 test 的数据;小菜简单理解为类似数据库查询一样,仅过滤符合需求的数据流;且 where 可以设置多次;

    Stream<int> streamData = Stream<int>.periodic(interval, (data) => data + 1);
    streamData.takeWhile((element) {
      print('Stream.periodic.takeWhile -> $element');
      return element <= 5;
    }).where((event) {
      print('Stream.periodic.where -> $event');
      return event > 3;
    }).listen((event) {
      print('Stream.periodic -> $event');
    }).onDone(() {
      print('Stream.periodic -> done 结束');
    });
    
    2.4 Stream<T> distinct([bool equals(T previous, T next)])

          distinct 小菜理解为相邻两个数据滤重;

    var data = [1, 2, '3.toString()', true, true, false, true, 6];
    Stream.fromIterable(data)
        .distinct()
        .listen((event) => print('Stream.fromIterable -> $event'))
        .onDone(() => print('Stream.fromIterable -> done 结束'));
    
    2.5 Stream<T> skip(int count)

          skip 用于跳过符合条件的订阅数据次数;

    Stream<int> streamData = Stream<int>.periodic(interval, (data) => data + 1);
    streamData.takeWhile((element) {
          print('Stream.periodic.takeWhile -> $element');
          return element <= 6;
        }).where((event) {
          print('Stream.periodic.where -> $event');
          return event > 2;
        })
        .skip(2).listen((event) {
          print('Stream.periodic -> $event');
        }).onDone(() {
          print('Stream.periodic -> done 结束');
        });
    
    2.6 Stream<T> skipWhile(bool test(T element))

          skipWhile 用于跳过在 where 符合条件下满足设置 test 条件的订阅数据;即当 testtrue 时跳过当前订阅数据监听;

    Stream<int> streamData = Stream<int>.periodic(interval, (data) => data + 1);
    streamData.takeWhile((element) {
      print('Stream.periodic.takeWhile -> $element');
      return element <= 6;
    }).where((event) {
      print('Stream.periodic.where -> $event');
      return event > 2;
    }).skipWhile((element) {
      print('Stream.periodic.skipWhile -> $element');
      return element <= 4;
    }).listen((event) {
      print('Stream.periodic -> $event');
    }).onDone(() {
      print('Stream.periodic -> done 结束');
    });
    
    2.7 Stream<S> map<S>(S convert(T event))

          在当前 Stream 基础上创建一个新的 Stream 并对当前 Stream 进行数据操作,onData 监听到的是 map 变更后的新的数据流;

    Stream<int> streamData = Stream<int>.periodic(interval, (data) => data + 1);
    streamData.takeWhile((element) {
      print('Stream.periodic.takeWhile -> $element');
      return element <= 6;
    }).where((event) {
      print('Stream.periodic.where -> $event');
      return event > 2;
    }).skipWhile((element) {
      print('Stream.periodic.skipWhile -> $element');
      return element <= 4;
    }).map((event) {
      print('Stream.periodic.map -> $event -> ${event * 100}');
      return event * 100;
    }).listen((event) {
      print('Stream.periodic -> $event');
    }).onDone(() {
      print('Stream.periodic -> done 结束');
    });
    
    2.8 Stream<S> expand<S>(Iterable<S> convert(T element))

          在当前 Stream 基础上创建新的 Stream 并将当前订阅数据转为新的订阅数据组onData 监听 数据组 中每个新的订阅数据元素;

    Stream<int> streamData = Stream<int>.periodic(interval, (data) => data + 1);
    streamData.takeWhile((element) {
      print('Stream.periodic.takeWhile -> $element');
      return element <= 6;
    }).where((event) {
      print('Stream.periodic.where -> $event');
      return event > 2;
    }).skipWhile((element) {
      print('Stream.periodic.skipWhile -> $element');
      return element <= 4;
    }).expand((element) {
      print('Stream.periodic.expand -> $element -> ${element * 10} -> ${element * 100}');
      return [element, element * 10, element * 100];
    }).listen((event) {
      print('Stream.periodic -> $event');
    }).onDone(() {
      print('Stream.periodic -> done 结束');
    });
    
    2.9 Future<int> get length

          Stream 监听订阅事件结束后,符合 where 条件的数量;

    _streamLength(index) async {
      Duration interval = Duration(seconds: 1);
      Stream<int> streamData = Stream<int>.periodic(interval, (data) => data + 1);
      streamData = streamData.takeWhile((element) {
        print('Stream.periodic.takeWhile -> $element');
        return element <= 6;
      }).where((event) {
        print('Stream.periodic.where -> $event');
        return event > 2;
      }).skipWhile((element) {
        print('Stream.periodic.skipWhile -> $element');
        return element <= 4;
      });
      switch (index) {
        case 1:
          var length = await streamData.length;
          print('Stream.length -> $length');
          break;
        case 2:
          var isEmpty = await streamData.isEmpty;
          print('Stream.isEmpty -> $isEmpty');
          break;
        case 3:
          var isBroadcast = await streamData.isBroadcast;
          print('Stream.isBroadcast -> $isBroadcast');
          break;
        case 4:
          var first = await streamData.first;
          print('Stream.first -> $first');
          break;
        case 5:
          var last = await streamData.last;
          print('Stream.last -> $last');
          break;
      }
    }
    
    2.10 Future<bool> get isEmpty

          Stream 监听订阅事件结束后,统计是否符合 where 条件的订阅数据是否为空;

    _streamLength(2);
    
    2.11 Future<T> get first

          获取 Stream 符合条件的第一个订阅数据;

    _streamLength(4);
    
    2.12 Future<bool> get last

          获取 Stream 符合条件的最后一个订阅数据;

    _streamLength(5);
    
    2.13 Future<List<T>> toList()

          在 Stream 监听结束之后,将订阅数据存储在 List 中,该操作为异步操作;

    _streamToList() async {
      var data = [1, 2, '3.toString()', true, true, false, true, 6];
      Stream stream = Stream.fromIterable(data).distinct();
      List list = await stream.toList();
      if (list != null) {
        print('Stream.toList -> ${list}');
        for (int i = 0; i < list.length; i++) {
          print('Stream.toList -> ${i + 1} -> ${list[i]}');
        }
      }
    }
    
    2.14 Future<Set<T>> toSet()

          在 Stream 监听结束之后,将订阅数据存储在 Set 中,Set 可以过滤重复数据;

    _streamToSet() async {
      var data = [1, 2, '3.toString()', true, true, false, true, 6];
      Stream stream = Stream.fromIterable(data);
      Set set = await stream.toSet();
      if (set != null) {
        print('Stream.toSet -> ${set}');
      }
    }
    
    2.15 Future forEach(void action(T element))

          监听 Stream 中订阅数据,是对 listen 方式的一种监听;

    _streamForEach() {
      var data = [1, 2, '3.toString()', true, true, false, true, 6];
      Stream stream = Stream.fromIterable(data).distinct();
      stream.forEach((element) => print('Stream.forEach -> $element'));
    }
    

          小菜对 Stream 的尝试才刚刚开始,还有众多方法未曾尝试,对 Stream 的理解还很浅显,如有错误请多多指导!

    来源: 阿策小和尚

    相关文章

      网友评论

        本文标题:Flutter 80: 初识 Flutter Stream (一

        本文链接:https://www.haomeiwen.com/subject/leavyhtx.html