美文网首页
Flutter中的Stream初探

Flutter中的Stream初探

作者: emdd2016 | 来源:发表于2019-08-28 16:40 被阅读0次

    Stream: 超级抽象的一个XXX

    Stream 的分类

    (1) "Single-subscription" -- 单订阅流
    (2) "broadcast" -- 广播式的流(可多订阅)

    • 单订阅流只能被订阅一次,重复订阅会报错, 直到设置listen 后才会发送。单订阅流通常用于流式数据块较大的连续数据,如文件I/O。
    • 广播式的 可以订阅多次,在listen之前的数据会丢失。
      - 如果一个流是单订阅模式 却想多次订阅,可以通过asBroadcastStream ()方法来修改。

    Stream 的创建

    (1) 从集合中创建一个新的单订阅流, Stream.fromIterable
    Stream stream1 = Stream.fromIterable([11, 22, 33]);
    (2) 从Future中创建一个新的单订阅流, Stream.fromFuture
    Stream stream = Stream.fromFuture(Future(()=> 1));
    (3) 通过Stream.fromFutures创建
    Stream stream3 = Stream.fromFutures([
    Future(() => 111),
    Future(() => 111),
    ]);
    (4) 创建一个每隔自定义时间发送一个数据的流
    Stream stream2 = Stream.periodic(Duration(seconds: 2), (a) {
    print("a------>$a");
    if (a < 3)
    return a;
    else
    return a *10;
    });
    它有2 个参数, 第一个是间隔的时间, 第二个是每次发送数据前的回调方法,可以通过这个方法的返回值来修改流的值。 (返回值即向流中添加的内容)
    上面创建的流是一个固定间隔时间无限发送的流,这就有问题了。 正常情况下谁会去搞一个无限发送数据流的功能呢? 怎样控制它的结束?-----------
    用stream的take 方法。 stream2 = stream2.take(5); 这样就只会发送5次了。
    升级版的takeWhile () : 可以用来做筛选

    stream2 = stream2.takeWhile((data) {
     return data < 10; // 设置一个上限为10 
    });
    
    image.png

    为了方操作 Stream ,官方提供了StreamController;如上图所示,他提供StreamSink来添加流 (入口),同时又提供 stream 属性用于对外的监听和变换。 stream.listen的返回时一个StreamSubscription,可以通过它的pause(),resume(),cancel()等方法来操作流的订阅。

    StreamController:
    StreamController controller = StreamController<String>(); // 创建一个单订阅流
    StreamController controller = StreamController.broadcast(); // 创建一个广播式的订阅流
    参数sync用来指定是同步还是异步。

    listen : 用来设置监听, 它的返回值是 StreamSubscribe。
    StreamSubscribe:
    pause() : 暂停监听(是立即暂停),暂停后的事件流不会丢失,会在resume后一起回调
    resume(): 唤醒pause的流
    cancel(): 取消
    举个栗子🌰

     // 1. StreamControl
    StreamController controller = StreamController<String>();
    // 2. StreamSink
    StreamSink sink = controller.sink;
    // 3. Stream
    Stream stream = controller.stream;
    stream.transform(StreamTransformer<String, String>.fromHandlers(
        handleData: (String data, EventSink<String> sink) {
      // 在这里设置transform 是没有用的,不会走这里; 除非在stream.transform返回的stream上加listen监听。
      if (!data.contains("数据2")) {
        sink.add(data);
      }
    }));
    sink.add("3秒后才设置监听。");
    // 4. subscribe
    Timer(Duration(seconds: 3), () {
      StreamSubscription subscription = controller.stream.transform(
          StreamTransformer<String, String>.fromHandlers(
              handleData: (String data, EventSink<String> sink) {
        print("transform");
        if (!data.contains("数据3")) {
          sink.add(data);
        }
      })).listen((event) {
        print("接收到新的消息: " + event);
      });
      sink.add("我是一条新的数据"); 
      Timer(Duration(milliseconds: 100), () {
        sink.add("pause...");
        subscription.pause(); // 暂停
        sink.add("我是一条新的数据pause"); 
      });
    
      Timer(Duration(seconds: 5), () {
        subscription.resume();
    
        sink.add("我是一条新的数据2"); 
      });
    });
    

    输出结果: 绿色的先输出,过5秒后黄色的才输出


    image1.png

    那么问题来了, 为什么pause 之前的add的那一个流没有输出呢? 跟进去看源码就明白了,pause 期间是不会分发事件的。


    image2.png
    schedule 的实现
    image3.png

    这样也就明白了, Stream最终是 想microtask queue 中添加了一个microtask 来实现异步的功能。

    说点题外的: flutter是单线程,他的异步实现是通过Event Looper 来实现的。Event looper 中包含2个队列: (1)MicorTask Queue (2) Event Queue , MicroTask 的优先级是大于Event Queue的,只有所有的MicroTask Queue中的任务都完成以后才会去执行Event Queue中的内容。

    当然 StreamController 可以是同步的,只要在创建的时候将参数sync设置为true即可,sync: true

    如何通过Stream来实现响应式的组件 ?

    通过StreamBuilder

    看个例子:

    class StreamModel {
      StreamController _controller;
    
      StreamSink<List<BookResponseData>> _sink;
    
      Stream<List<BookResponseData>> stream;
    
      StreamModel() {
        // 构造方法中初始化流相关的对象
        _controller = StreamController<List<BookResponseData>>.broadcast();
        _sink = _controller.sink;
        stream = _controller.stream;
      }
      /// 获取书本列表
      getBookList() async {
        var httpClient = new HttpClient();
        var uri = new Uri.https('www.apiopen.top', '/novelApi');
        var request = await httpClient.getUrl(uri);
        var response = await request.close();
        var responseBody = await response.transform(utf8.decoder).join();
        // 将获取到的字符串转换成定义好的Book实体类
        BookResponseEntity entity =
            BookResponseEntity.fromJson(json.decode(responseBody));
        // 接口中拿到数据之后,通过sink.add 添加一条流即可, 这样在StreamBuild中就会有回调。
        _sink.add(entity.data);
      }
      /// 资源
      dispose() {
        _sink.close();
        _controller.close();
      }
    }
    

    组件类:

    class BookList extends StatefulWidget {
      @override
      State<StatefulWidget> createState() {
        return _BookListState();
      }
    }
    
    //https://api.apiopen.top/getSingleJoke?sid=28654780
    //https://www.apiopen.top/novelApi
    
    class _BookListState extends State<BookList> {
      StreamModel streamModel;
    
      @override
      void initState() {
        super.initState();
        streamModel = StreamModel();
      }
    
      @override
      Widget build(BuildContext context) {
    //    print("build");
        return Scaffold(
          appBar: AppBar(
            title: Text("stream demo"),
          ),
          body: Container(
            child: StreamBuilder<List<BookResponseData>>(
              stream: streamModel.stream,  // 要监听的流 
              initialData: [], // 初始值,可以不设
              builder: (context, a) { // sink.add 后,就会回调这个方法。
                List<Widget> views = [];
                if (a.data != null && a.data.length > 0) {
                  a.data.forEach((BookResponseData data) {
                    views.add(Container(
                      padding: EdgeInsets.all(10.0),
                      child: Column(
                        children: <Widget>[
                          Text(
                            data.bookname,
                            style: TextStyle(fontWeight: FontWeight.w600),
                          ),
                          Text(data.bookInfo),
                        ],
                      ),
                    ));
                  });
                }
                return ListView(
                  children: views,
                );
              },
            ),
          ),
          floatingActionButton: FloatingActionButton(
              child: Text("获取数据"),
              onPressed: () {
                streamModel.getBookList();
              }),
        );
      }
      @override
      void dispose() {
        streamModel.dispose(); 
        super.dispose();
      }
    }
    

    以上便可以实现基于Stream的响应式组件。

    又有问题了。。。。 没办法问题就是这么多 (来打我... )

    为什么StreamBuilder能够监听到Stream的变化来刷新UI?
    跟进去看一下源码

    void _subscribe() {
      if (widget.stream != null) {
        _subscription = widget.stream.listen((T data) {
          setState(() {
            _summary = widget.afterData(_summary, data);
          });
        }, onError: (Object error) {
          setState(() {
            _summary = widget.afterError(_summary, error);
          });
        }, onDone: () {
          setState(() {
            _summary = widget.afterDone(_summary);
          });
        });
        _summary = widget.afterConnected(_summary);
      }
    }
    

    你会发现 StreamBuilder是一个StatefulWidget, 本质还是在stream.listen中通过setState 来实现响应数据刷新View。

    这样,对Stream及基于Stream的响应式组件就有个大致的了解了... ...

    over。。。

    相关文章

      网友评论

          本文标题:Flutter中的Stream初探

          本文链接:https://www.haomeiwen.com/subject/dormectx.html