美文网首页
Dart中的Stream初步研究

Dart中的Stream初步研究

作者: xmb | 来源:发表于2021-09-02 14:45 被阅读0次

    官方文档中文翻译:异步编程:使用 stream

    1、stream是什么

    Stream是一些列异步事件的序列。

    2、Stream的接收

    ① 使用异步for循环

    /// 使用异步for循环来接收所有的stream
    Future<int> sumStream(Stream<int> stream) async {
      var sum = 0;
      await for(var value in stream) {
        sum += value;
      }
      return sum;
    }
    

    ② 使用streamlisten来监听

    3、stream的种类

    ① 单订阅streamSingle-Subscription

    只包含一个事件序列,事件需要按顺序提供,不能丢失。比如读取一个文件,接收一个网页。

    ② 广播streamBroadcast

    针对单个消息的,一次处理一个消息。比如浏览器的鼠标事件。
    可以在任何时候监听,可以添加多个监听,还可以随时取消监听。

    4、处理stream的方法

    Future<T> get first;
    Future<bool> get isEmpty;
    Future<T> get last;
    Future<int> get length;
    Future<T> get single;
    Future<bool> any(bool Function(T element) test);
    Future<bool> contains(Object? needle);
    Future<E> drain<E>([E? futureValue]);
    Future<T> elementAt(int index);
    Future<bool> every(bool Function(T element) test);
    Future<T> firstWhere(bool Function(T element) test, {T Function()? orElse});
    Future<S> fold<S>(S initialValue, S Function(S previous, T element) combine);
    Future forEach(void Function(T element) action);
    Future<String> join([String separator = '']);
    Future<T> lastWhere(bool Function(T element) test, {T Function()? orElse});
    Future pipe(StreamConsumer<T> streamConsumer);
    Future<T> reduce(T Function(T previous, T element) combine);
    Future<T> singleWhere(bool Function(T element) test, {T Function()? orElse});
    Future<List<T>> toList();
    Future<Set<T>> toSet();
    

    5、修改stream的方法

    修改stream会产生一个新的stream,在原来stream上添加的监听,会转到新的stream上,如果新的stream结束了,会转到原来的stream上。

    详细参考文章:Flutter Stream简介及部分操作符使用

    // 将一个Stream转为元素都为R类型的Stream
    Stream<R> cast<R>();
    
    // 把Stream中的每一个元素,转换为一个序列sequence,比如元素1转为[1, 1]
    Stream<S> expand<S>(Iterable<S> Function(T element) convert);
    
    // 按map的实现规则,转换Stream中的每一个元素成为一个新的Stream
    Stream<S> map<S>(S Function(T event) convert);
    
    // 跳过前面count个事件
    Stream<T> skip(int count);
    
    // 根据传入的条件规则,进行跳过
    Stream<T> skipWhile(bool Function(T element) test);
    
    // 指定只发送count个事件
    Stream<T> take(int count);
    
    // 只发送指定条件的事件
    Stream<T> takeWhile(bool Function(T element) test);
    
    // 用条件丢弃一些元素,创建一个新的Stream
    Stream<T> where(bool Function(T event) test);
    
    ...
    

    6、listen方法

      StreamSubscription<T> listen(void onData(T event)?,
          {Function? onError, void onDone()?, bool? cancelOnError});
    

    7、stream的创建

    ① 从其他stream转换
    以上提到的修改stream的方法

    ② 使用异步生成器(async*)生成stream

      /// 异步生成器(async*) 生成 stream
      /// 通过 yield 和 yield* 向stream提交事件
      Stream<int> createAsyncStream() async* {
        int num = 0;
        while (true) {
          // 间隔1秒钟
          await Future.delayed(Duration(seconds: 1));
          // 将num运算后的值放入stream
          yield num++;
          // 终止条件
          if (num == 10) break;
        }
      }
    

    ③ 使用StreamController来创建

        var streamController = StreamController<int>(
          onListen: () {},
          onResume: () {},
          onPause: () {},
          onCancel: () {},
        );
    

    比如,event_bus的实现,使用广播stream

      EventBus({bool sync = false})
          : _streamController = StreamController.broadcast(sync: sync);
    

    8、实际应用场景

    event_bus 插件的使用

    相关文章:Flutter EventBus 的使用和底层实现分析

    1. 初始化EventBus,会创建一个通过广播方式初始化的StreamController
    2. 订阅监听,on方法返回的是一个streamstreamlisten方法传入的对监听到事件的处理方法。
    3. fire方法实现,是通过往StreamController中添加自定义的事件。

    Bloc插件的使用

    StreamBuilder组件

    9、相关的语法关键字(await、async、sync、async、yield、yield*)

    await用于等待异步方法返回数据

    async用于异步方法

    sync*多元素同步函数生成器,返回Iterable<T>

    async*多元素异步函数生成器,返回Stream<T>

    yield发送的是一个元素

    yield*操作的是一个IterableStream

    具体使用例子,如下:

    import 'dart:async';
    
    import 'package:flutter/material.dart';
    
    void main() {
      runApp(MyApp());
    }
    
    class MyApp extends StatelessWidget {
      @override
      Widget build(BuildContext context) {
        return MaterialApp(
          title: 'Flutter Demo',
          theme: ThemeData(
            primarySwatch: Colors.blue,
          ),
          home: StreamPage(),
        );
      }
    }
    
    class StreamPage extends StatefulWidget {
      const StreamPage({Key? key}) : super(key: key);
    
      @override
      _StreamPageState createState() => _StreamPageState();
    }
    
    class _StreamPageState extends State<StreamPage> {
      late Stream<String> _stream;
    
      @override
      void initState() {
        super.initState();
    
        _stream = fetchEmojiStream(10);
      }
    
      @override
      Widget build(BuildContext context) {
        return Scaffold(
          appBar: AppBar(),
          body: Column(
            children: [
              Column(
                children: [
                  MaterialButton(
                    onPressed: test1,
                    child: Text('测试1'),
                  ),
                  MaterialButton(
                    onPressed: test2,
                    child: Text('测试2'),
                  ),
                  MaterialButton(
                    onPressed: test3,
                    child: Text('测试3'),
                  ),
                  MaterialButton(
                    onPressed: test4,
                    child: Text('测试4'),
                  ),
                  MaterialButton(
                    onPressed: test5,
                    child: Text('测试5'),
                  ),
                ],
              ),
              StreamBuilder(
                stream: _stream,
                builder: _builder,
              ),
            ],
          ),
        );
      }
    
      Widget _builder(BuildContext context, AsyncSnapshot snapshot) {
        switch (snapshot.connectionState) {
          case ConnectionState.none:
            break;
          case ConnectionState.waiting:
            return CircularProgressIndicator();
          case ConnectionState.active:
            return Text(snapshot.requireData);
          case ConnectionState.done:
            return Text(snapshot.requireData);
        }
        return Container();
      }
    
      void test1() async {
        getEmoji(10).forEach(print);
        /*
        flutter: 👿
        flutter: 💀
        flutter: 💁
        flutter: 💂
        flutter: 💃
        flutter: 💄
        flutter: 💅
        flutter: 💆
        flutter: 💇
        flutter: 💈
         */
      }
    
      void test2() {
        getEmojiWithTime(10).forEach(print);
        /*
        flutter: 👿-2021-09-02T16:41:24.557020
        flutter: 💀-2021-09-02T16:41:24.558755
        flutter: 💁-2021-09-02T16:41:24.559007
        flutter: 💂-2021-09-02T16:41:24.559160
        flutter: 💃-2021-09-02T16:41:24.559338
        flutter: 💄-2021-09-02T16:41:24.559515
        flutter: 💅-2021-09-02T16:41:24.559663
        flutter: 💆-2021-09-02T16:41:24.559846
        flutter: 💇-2021-09-02T16:41:24.560122
        flutter: 💈-2021-09-02T16:41:24.560372
         */
      }
    
      void test3() {
        fetchEmoji(0).then(print);
        /*
        flutter: 👿
         */
      }
    
      void test4() {
        fetchEmojiStream(10).listen(print);
        /* 每个一秒生成一个
        flutter: 👿
        flutter: 💀
        flutter: 💁
        flutter: 💂
        flutter: 💃
        flutter: 💄
        flutter: 💅
        flutter: 💆
        flutter: 💇
        flutter: 💈
         */
      }
    
      void test5() {
        fetchEmojiWithTime(10).forEach(print);
        /*
        flutter: 👿-2021-09-02T17:15:21.591821
        flutter: 💀-2021-09-02T17:15:22.595980
        flutter: 💁-2021-09-02T17:15:23.601100
        flutter: 💂-2021-09-02T17:15:24.608355
        flutter: 💃-2021-09-02T17:15:25.610364
        flutter: 💄-2021-09-02T17:15:26.612006
        flutter: 💅-2021-09-02T17:15:27.619303
        flutter: 💆-2021-09-02T17:15:28.623427
        flutter: 💇-2021-09-02T17:15:29.626712
        flutter: 💈-2021-09-02T17:15:30.629052
         */
      }
    
      /// 多元素同步
      /// 多元素同步函数生成器(sync*),返回Iterable
      /// yield 发送一个元素
      Iterable<String> getEmoji(int count) sync* {
        Runes first = Runes('\u{1f47f}');
        for (int i = 0; i < count; i++) {
          yield String.fromCharCodes(first.map((e) => e + i));
        }
      }
    
      /// 多元素同步
      /// 多元素同步生成器(sync*),返回Iterable
      /// yield* 操作的是一个Iterable
      Iterable<String> getEmojiWithTime(int count) sync* {
        yield* getEmoji(10).map((e) => '$e-${DateTime.now().toIso8601String()}');
      }
    
      /// 单元素异步
      /// await async
      Future<String> fetchEmoji(int count) async {
        await Future.delayed(Duration(milliseconds: 1000));
        return String.fromCharCodes(Runes('\u{1f47f}').map((e) => e + count));
      }
    
      /// 多元素异步
      /// 多元素异步生成器(async*),返回Stream
      /// yield操作的是一个元素
      Stream<String> fetchEmojiStream(int count) async* {
        for (int i = 0; i < count; i++) {
          yield await fetchEmoji(i);
        }
      }
    
      /// 多元素异步
      /// 多元素异步生成器(async*),返回Stream
      /// yield*操作的是一个Stream
      Stream<String> fetchEmojiWithTime(int count) async* {
        yield* fetchEmojiStream(count).map((event) => '$event-${DateTime.now().toIso8601String()}');
      }
    }
    

    相关文章

      网友评论

          本文标题:Dart中的Stream初步研究

          本文链接:https://www.haomeiwen.com/subject/sezqwltx.html