美文网首页FlutterFlutter学习Flutter
这可能是最早的RxDart使用入门教程。。。

这可能是最早的RxDart使用入门教程。。。

作者: 吉原拉面 | 来源:发表于2018-09-19 17:57 被阅读1391次

    原文地址:RxDart: Magical transformations of Streams
    国际惯例,英文好的请去看原文,我的翻译可能会省去一些东西,同时加入一些自己的想法,便于自己理解。与其说是一篇翻译,不如说是一份自己的学习笔记。

      在这篇文章中,我们将来谈一谈如何使用RxDart对Streams进行各种神奇对变换,我会重点介绍RxDart中的常用函数,同时讲解对应的App。
      RxDart是 Frank PepermansBrian Egan两位大神主导的项目,是Rx大家族对Dart语言的支持。如果你已经使用过其他语言的Rx库,那么对于RxDart应该很容易上手,因为很多方法函数上听起来都差不多。
      文章中的示例代码可以在GitHub中找到:https://github.com/escamoteur/stream_rx_tutorial/tree/rx_magic
      我们使用Streams在App中传输数据,但其实Stream的使用场景非常广泛,所以我们今天来讲一讲RxDart在使用Stream时有哪些特性。

    创建Observables

       Observables比直接使用普通Streams拥有更多的特性,我们有很多方法来创建一个Observable。

    • 从一个Stream中创建
      通过将任意Stream传给Observable的构造函数,就得到了一个Observable实例:
    var controller = new StreamController<String>();
    var streamObservable = new Observable(controller.stream);
    streamObservable.listen(print);
    
    • 创建周期性事件(Periodic events)
      我们可以创建一个Observable,然后以特性的频率持续发射数据流,这个功能类似于计时器(Timer),但是用起来会更方便一点:
    var timerObservable = Observable.periodic(Duration(seconds: 1), (x) => x.toString() );
    timerObservable.listen(print);
    
    • 从单个值中创建
      有时候API需要一个Stream/Observable,但是你的初始数据只是一个简单的值,这个时候你可以使用just工厂函数:
    var justObservable = Observable<int>.just(42);
    justObservable.listen(print);
    // this will print : 42
    
    • 从Future中创建
      通过一个Future创建的Observable,会先等待Future执行完毕,完后发射数据,这个输出数据就是Future的执行结果,如果Future没有任何返回值,那么输出null。另一种从Future中创建Stream的方法是调用Future的toStream()方法。
      你可能有点好奇,我都等Future执行完了,为什么还要把它转化为一个Observable/Stream呢?放心,当你后边看到在Stream上利用各种函数操作数据有多方便的时候就明白了。
     Future<String> asyncFunction() async {
        return Future.delayed(const Duration(seconds: 1), () => "AsyncRsult");
      }
    
      test('Create Observable from Future', () async {
        print('start');
        var fromFutureObservable = Observable.fromFuture(asyncFunction());
        fromFutureObservable.listen(print);
      }
    

    Subjects

      Subjects是RxDart的流控制器(StreamController),但Subjects但行为跟StreamControllers还是有些区别的:

    • 你可以在一个Subject上直接listen(),而不需要拥有对这个Stream的访问权限;
    • 你可以添加多个subscription,它们会同时收到同样的数据;
    • Subjects有三种类型,我们下面会用例子来说明。
    PublishSubjects

      和StreamControllers的行为很像,也支持多个监听:

    var subject = new PublishSubject<String>();
    subject.listen((item) => print(item)); 
    subject.add("Item1");
    
    // 添加第二个listener
    subject.listen((item) => print(item.toUpperCase())); 
    
    subject.add("Item2");
    subject.add("Item3");
    
    // 这句话只是为了防止在数据处理完毕之前,进程就被杀死了
    await Future.delayed(Duration(seconds: 5));
    
    // 取消所有的Subscriptions
    subject.close;
    

      运行上边的代码,输出结果如下:

    Item1
    ITEM2
    Item2
    ITEM3
    Item3
    ✓ PublishSubject
    

      因为第二个监听是在中途加进来的,所以它并没有监听到数据Item1。

    BehaviourSubject

      每一个新加的监听,接收到的第一个数据都是上一个数据(再往前的数据不会监听到,只会缓存一个数据)。

    var subject = new BehaviorSubject<String>();
    subject.listen((item) => print(item)); 
    
    subject.add("Item1");
    subject.add("Item2");
    
    subject.listen((item) => print(item.toUpperCase())); 
    
    subject.add("Item3");
    

      运行上边的代码,输出结果如下:

    Item1
    ITEM2
    ITEM3
    Item2
    Item3
    ✓ BehaviourSubject
    

      我们发现第二个subscriber没有监听到Item1,但是监听到了Item2,而且第二个subscriber比第一个subscriber先监听到了Item3。这是因为,你没法决定多个监听的服务顺序(实际上对于单个item,总是后加的监听先接收到数据),但是每个监听获取到的数据依然是有序的。BehaviourSubject只会为后加的Subscribers缓存最近的一条输出数据。如果你想要缓存更多的数据,可以使用ReplaySubject
    ,但是大多数情况下我们都用不到。
      我们可以多加几个item,可以看的更清楚一些:

        var subject = new BehaviorSubject<String>();
    
        subject.listen((item) => print("$item 第1个"));
    
        subject.add("Item1");
        subject.add("Item2");
        subject.add("Item3");
        subject.add("Item4");
    
        subject.listen((item) => print("$item 第2个"));
        subject.add("Item5");
        subject.add("Item6");
        subject.add("Item7");
    
        subject.listen((item) => print("$item 第3个"));
        subject.add("Item8");
        subject.add("Item9");
    

      输出结果:

    I/flutter (18732): Item1 第1个
    I/flutter (18732): Item4 第2个
    I/flutter (18732): Item5 第2个
    I/flutter (18732): Item7 第3个
    I/flutter (18732): Item8 第3个
    I/flutter (18732): Item2 第1个
    I/flutter (18732): Item6 第2个
    I/flutter (18732): Item9 第3个
    I/flutter (18732): Item3 第1个
    I/flutter (18732): Item7 第2个
    I/flutter (18732): Item4 第1个
    I/flutter (18732): Item8 第2个
    I/flutter (18732): Item5 第1个
    I/flutter (18732): Item9 第2个
    I/flutter (18732): Item6 第1个
    I/flutter (18732): Item7 第1个
    I/flutter (18732): Item8 第1个
    I/flutter (18732): Item9 第1个
    

      第1个监听可以接收item1~item9,第2个监听可以接收item4~item9,第3个监听可以接收item7~item9;对于item4~6,第2个监听比第1个监听先接收到数据;对于item7~7,第3个监听最先接收到数据,然后是第2个监听,最后才是第1个监听;
      不难看出,每一个新加的监听接收到的第一条数据,是最近的那条数据(也就是只会缓存最近一条数据的意思);对于单条数据而言,总是后加的监听先接收到

    数据操作

      Rx的最大魅力就是让你能够在Stream上自由操作数据,每一个Rx方法都会返回一个新的Stream,同时携带了一个返回值,这意味着你可以链式调用,这是非常有用的。

    Map:数据转换

      如果你不想错过Stream中的每一个操作,那么请使用map()map()会接收每一个数据,处理之后将数据再push出去,作为返回值返回给Stream:

    var subject = new PublishSubject<String>();
    
    subject.map((item) => item.toUpperCase()).listen(print);
    
    subject.add("Item1");
    subject.add("Item2");
    subject.add("Item3");
    

      输出结果:

    ITEM1
    ITEM2
    ITEM3
    ✓ Map toUpper
    

      map的返回值类型不一定要和输入值类型相同,下面这个例子就把
    integers转换为了Strings,我们组合了两个map操作:

    var subject = new PublishSubject<int>();
    
    subject.map((intValue) => intValue.toString())
        .map((item) => item.toUpperCase())
        .listen(print);
    
    subject.add(1);
    subject.add(2);
    subject.add(3);
    

      通过map我们可以随意转换数据:


    class DataClass{}
    
    class WrapperClass
    {
      final A wrapped;
    
      WrapperClass(this.wrapped); 
    }
    
    var subject = new PublishSubject<A>();
    
    subject.map<WrapperClass>((a) => new Wrapper(a));
    

      .map最常见的使用场景就是:当你从REST API或者数据库中读取数据时,需要将这些数据转化为你需要的自定义类型:

    class User {
      final String name;
      final String adress;
      final String phoneNumber;
      final int age;
    
      // 实际项目中,我推荐大家使用serializer插件,而不是手动写serializer
      factory User.fromJson(String jsonString) {
        var jsonMap = json.decode(jsonString);
    
        return User(
          jsonMap['name'],
          jsonMap['adress'],
          jsonMap['phoneNumber'],
          jsonMap['age'],
        );
      }
    
      User(this.name, this.adress, this.phoneNumber, this.age);
    
      @override
      String toString() {
        return '$name - $adress - $phoneNumber - $age';
      }
    }
    
    void main() {
      test('Map', () {
        // 一些比较恶心的源数据
        var jsonStrings = [
          '{"name": "Jon Doe", "adress": "New York", "phoneNumber":"424242","age": 42 }',
          '{"name": "Stephen King", "adress": "Castle Rock", "phoneNumber":"123456","age": 71 }',
          '{"name": "Jon F. Kennedy", "adress": "Washington", "phoneNumber":"111111","age": 66 }',
        ];
    
        // 我们模拟从API/数据库中读到了一串json字符串流,并且构造了一个Subject
        // 实际情况下,我们可能是通过类似`asyncWebCallFcuntion().asStream()`的方法来获取的
        var dataStreamFromAPI = new PublishSubject<String>();
    
        dataStreamFromAPI
            .map<User>((jsonString) => User.fromJson(jsonString)) // 这里将json字符串转为了我们需要的User对象
            .listen((user) => print(user.toString()));
    
    
        // 模拟输入数据
        dataStreamFromAPI.add(jsonStrings[0]);
        dataStreamFromAPI.add(jsonStrings[1]);
        dataStreamFromAPI.add(jsonStrings[2]);
      });
    

      说句题外话,除了Streams之外,每一个Iterable都提供了map方法,将其转化为一个List。

    Where:数据过滤

      如果你只关心Stream中的特定数据,那么可以使用.where()函数。这个函数其实就是替代if语句的,但是.where()会更加方便阅读:

    var subject = new PublishSubject<int>();
    
    subject.where((val) => val.isOdd)
        .listen( (val) => print('This only prints odd numbers: $val'));
    
    
    subject.where((val) => val.isEven)
    .listen( (val) => print('This only prints even numbers: $val'));
    
    
    subject.add(1);
    subject.add(2);
    subject.add(3);
    
    prints:
    This only prints odd numbers: 1
    This only prints even numbers: 2
    This only prints odd numbers: 3
    
    Debounce:数据拦截

      来想象一个实际场景:你有一个输入框,里边的文字内容改变的时候,会触发一个函数。用户每按一个键就触发一次函数调用,这个操作挺昂贵的。所以,你可能希望在用户的连续输入暂停一段时间后,再触发这个函数。debounce()就是为了解决这个问题的,它会在特定时间内吞掉所有的输入事件:

    var subject = new PublishSubject<String>();
    
    
    subject.debounce(new Duration(milliseconds: 500)).listen((s) => print(s));
    
    
    subject.add('A');
    subject.add('AB');
    
    await Future.delayed(Duration(milliseconds: 200));
    
    
    subject.add("ABC");
    // 这时还没有任何的output
    
    await Future.delayed(Duration(milliseconds: 700));
    
    // 现在我们接收到了最终值: 'ABC'
    
    Expand:展开数组

      如果你的数据源是一串数组,但是你希望得到的是这些数组中的每一个值,那么可以使用.expand


      详细的使用方法可以在demo中的FireStore例子中看到。
    Merge:数据合并

      如果你有多个Stream,然后希望同时处理它们的数据,那么可以使用.mergeWith(在有些Rx库中这个方法名就叫merge)。它可以将多个Streams组合为一个Stream:


      但是要注意,.mergeWith的结果顺序是没法手动指定的,哪个Stream的数据先进来,那么就排在前面,先进先出。
      举个例子,你有两个组建都通过各自的Stream报错了,你希望这些报错能显示在同一个Dialog中:
    @override
    initState()
    {
      super.initState();
    
      component1.errors.mergeWith([component2.errors])
        .listen( (error) async => await showDialog(error.message));
    }
    

      或者你想要从不同的网络中获取信息:

    final observableTwitter = getTwitterStream().map((data) => new MyAppPost.fromTwitter(data));
    final observableFacebook = getFacebookStream().map((data) => new MyAppPost.fromFaceBook(data));
    final postStream = observableTwitter.mergeWith([observableFacebook]);
    
    Distinct:过滤相同数据

      假设我们有两个不同的Observable:isBusyOne 和 isBusyTwo,但是它们的结果是一样的,那么它们会用同样的数据去各自刷新一遍UI。但我们知道,这是不必要的。为了防止这种情况发生,我们可以使用.distinct()(有些Rx库将这个方法命名为distinctUntilChanged)。它保证了传递给Stream的每一条数据都和上一条不一样:

    bservable<bool> isBusy => isBusyOne.mergeWith([isBusyTwo]).distinct();
    
    ZipWith:数据合并

      zipWith也是将一个Stream和另一个合并到一起,但是它和.mergeWith不一样。.mergeWith只要拿到了数据就立刻发射出去,但是zipWith会等到所有数据都接收完毕后,将这些数据重组后再发射出去:


      一个最基本使用方法是:
    new Observable.just(1) // .just() creates an Observable that directly emits the past value
        .zipWith(new Observable.just(2), (one, two) => one + two)
        .listen(print); // prints 3
    

      在App开发中一个很常见的场景就是:你需要等待两个异步函数完成,每个异步函数都返回了一个Future,你希望这两个异步操作都完成之后再处理数据。下面这个例子中,我们模拟一个异步操作返回一个User,另一个返回一串json字符串,我们在调用Invoice之前,需要等待这两个操作都完成:

    class Invoice {
      final User user;
      final Product product;
    
      Invoice(this.user, this.product);
    
      printInvoice() {
        print(user.toString());
        print(product.toString());
      }
    }
    
    // 模拟网络返回JSON字符串
    Future<String> getProduct() async {
      print("Started getting product");
      await Future.delayed(Duration(seconds: 2));
      print("Finished getting product");
      return '{"name": "Flux compensator", "price": 99999.99}';
    }
    
    // 模拟网络返回JSON字符串
    Future<String> getUser() async {
      print("Started getting User");
      await Future.delayed(Duration(seconds: 4));
      print("Finished getting User");
      return '{"name": "Jon Doe", "adress": "New York", "phoneNumber":"424242","age": 42 }';
    }
    
    void main() {
      test('zipWith', () async {
        var userObservable =
            Observable.fromFuture(getUser()).map<User>((jsonString) => User.fromJson(jsonString));
    
        var productObservable = Observable.fromFuture(getProduct())
            .map<Product>((jsonString) => Product.fromJson(jsonString));
    
        Observable<Invoice> invoiceObservable = userObservable.zipWith<Product, Invoice>(
            productObservable, (user, product) => Invoice(user, product));
    
    
        print("Start listening for invoices");
        invoiceObservable.listen((invoice) => invoice.printInvoice());
    
        // 这段话只是为了防止stream数据操作完成时进程被杀死
        await Future.delayed(Duration(seconds: 5));
      });
    }
    

      输出结果:

    Started getting User
    Started getting product
    Start listening for invoices
    Finished getting product
    Finished getting User
    Jon Doe - New York - 424242 - 42
    Flux compensator - 99999.99
    
    CombineLatest:组合数据

       combineLatest跟merge和zip一样也是组合数据,但是有一些轻微都区别。它接收到一个Stream的数据后,不仅仅会发射这个Stream带来的数据,还会将其他Stream中的最近的数据也发射出去。也就是说,有n个Stream,它每一次就发射n个数据,发射的数据是每个Stream上最近的一条数据;任意一个Stream的数据进来的时候,都会触发一次发射;刚开始的时候,数据种类不足n时,会等待(也就是第一次发射必须保证每个Stream都有数据被传递过来):


       和之前的方法都不一样,combineLatest不是一个实例方法,而是一个静态方法。Dart也不允许开发者去定义各种版本的combineLatest2、combineLatest3······ combineLatest n。
       如果你有两个Observable<bool>来标示你的App某些部分处于busy状态,你想要展示一个busy spinner,那么最佳的实现方案是:
    class Model
    {
      Observable<bool> get isBusy => 
        Observable.combineLatest2(isBusyOne,isBusyTwo, (b1, b2) => b1 || b2);
    
      PublishSubject<bool> isBusyOne;
      PublishSubject<bool> isBusyTwo;
    }
    

       在UI层,你可以用一个StreamBuilder来展示busy spinner,如果接收值是true则显示。
       combineLatest和FireStore snapshots streams结合使用也是很强大的。假设你的App有一个这样的需求:将股票行情和天气预报展示在一起。两个数据被存放在了两个不同的FireStore集合中,两者都是通过不同的后端服务独立更新。你可以通过StreamBuilder来更新数据,配合combineLatest使用会非常简单:

    class WeatherForecast {
      final String forecastText;
      final GeoPoint location;
    
      factory WeatherForecast.fromMap(Map<String, dynamic> map) {
        return WeatherForecast(map['forecastText'], map['location']);
      }
    
      WeatherForecast(this.forecastText, this.location);
    }
    
    class NewsMessage {
      final String newsText;
      final GeoPoint location;
    
      factory NewsMessage.fromMap(Map<String, dynamic> map) {
        return NewsMessage(map['newsText'], map['location']);
      }
    
      NewsMessage(this.newsText, this.location);
    }
    
    class CombinedMessage {
      final WeatherForecast forecast;
      final NewsMessage newsMessage;
    
      CombinedMessage(this.forecast, this.newsMessage);
    }
    
    class Model {
      CollectionReference weatherCollection;
      CollectionReference newsCollection;
    
      Model() {
        weatherCollection = Firestore.instance.collection('weather');
        newsCollection = Firestore.instance.collection('news');
      }
    
      Observable<CombinedMessage> getCombinedMessages() {
        Observable<WeatherForecast> weatherForecasts = weatherCollection
            .snapshots()
            .expand((snapShot) => snapShot.documents)
            .map<WeatherForecast>((document) => WeatherForecast.fromMap(document.data));
    
        Observable<NewsMessage> news = newsCollection
            .snapshots()
            .expand((snapShot) => snapShot.documents)
            .map<NewsMessage>((document) => NewsMessage.fromMap(document.data));
    
        return Observable.combineLatest2(
            weatherForecasts, news, (weather, news) => CombinedMessage(weather, news));
      }
    }
    

       在UI层,你可以这么写:StreamBuilder<CombinedMessage>(stream: model.getCombinedMessages(),...)

    AsyncMap:异步数据转换

       除了map(),还有一个asyncMap方法,让你可以在map函数中进行异步操作。我们可以将上面的FireStore例子稍微修改一下,WeatherForecast取决于NewsMessage中的location的值,并且只有NewsMessage更新的时候才会跟着一起更新:

    Observable<CombinedMessage> getDependendMessages() {
    
      Observable<NewsMessage> news = newsCollection.snapshots().expand((snapShot) {
        return snapShot.documents;
      }).map<NewsMessage>((document) {
        return NewsMessage.fromMap(document.data);
      });
    
      return news.asyncMap((newsEntry) async {
        var weatherDocuments =
            await weatherCollection.where('location', isEqualTo: newsEntry.location).getDocuments();
        return new CombinedMessage(
            WeatherForecast.fromMap(weatherDocuments.documents.first.data), newsEntry);
      });
    }
    

       每次newsCollection变化的时候,getDependendMessages返回的Observable会发射一个新的CombinedMessage

    Observables调试

       Dart的=>使得在debug的时候很困难:

    Observable<NewsMessage> news = newsCollection
        .snapshots()
        .expand((snapShot) => snapShot.documents)
        .map<NewsMessage>((document) => NewsMessage.fromMap(document.data));
    

       所以如果要打断点的话,请将=>展开(IDE一般会有Convert to block body的方法,比如Android Studio可以使用Alt+Enter):

     Observable<NewsMessage> news = newsCollection
            .snapshots()
            .expand((snapShot) {
              return snapShot.documents;
            })
            .map<NewsMessage>((document) {
              return NewsMessage.fromMap(document.data);
            });
    

    注意点

       RxDart的transforming函数应该只用来处理数据流,所以不要尝试在这些函数中修改变量/状态(variables/state),这些代码请写在.listen中。所以,不要这么写:

    Observable.fromFuture(getProduct())
            .map<Product>((jsonString) { 
         var product = Product.fromJson(jsonString);
        database.save(product);
        setState((){ _product =  product });
        return product;
    }).listen();
    

      而是这么写:

    Observable.fromFuture(getProduct())
            .map<Product>((jsonString) => Product.fromJson(jsonString))
            .listen( (product) {
              database.save(product);  
              setState((){ _product =  product });
            });
    

      map()函数的唯一作用就是数据转换,不要在里面做任何多余的操作。在map函数里面写其他操作也会降低代码的可读性,也容易隐藏一些bug。

    资源释放

      为了防止内存泄漏,请在适当的时候调用subscriptionscancel()方法,或者者dispose掉你的StreamControllers,或者关闭你的Subjects

    相关文章

      网友评论

        本文标题:这可能是最早的RxDart使用入门教程。。。

        本文链接:https://www.haomeiwen.com/subject/rydinftx.html