原文地址:RxDart: Magical transformations of Streams
(国际惯例,英文好的请去看原文,我的翻译可能会省去一些东西,同时加入一些自己的想法,便于自己理解。与其说是一篇翻译,不如说是一份自己的学习笔记。)
在这篇文章中,我们将来谈一谈如何使用RxDart对Streams进行各种神奇对变换,我会重点介绍RxDart中的常用函数,同时讲解对应的App。
RxDart是 Frank Pepermans 和 Brian Egan两位大神主导的项目,是Rx大家族对Dart语言的支持。如果你已经使用过其他语言的Rx库,那么对于RxDart应该很容易上手,因为很多方法函数上听起来都差不多。
文章中的示例代码可以在GitHub中找到:https://github.com/escamoteur/stream_rx_tutorial/tree/rx_magic
我们使用Streams在App中传输数据,但其实Stream的使用场景非常广泛,所以我们今天来讲一讲RxDart在使用Stream时有哪些特性。
创建Observables
Observables比直接使用普通Streams拥有更多的特性,我们有很多方法来创建一个Observable。
- 从一个Stream中创建
通过将任意Stream
传给Observable
的构造函数,就得到了一个Observable
实例:
var controller = new StreamController<String>();
var streamObservable = new Observable(controller.stream);
streamObservable.listen(print);
- 创建周期性事件(Periodic events)
我们可以创建一个Observable
,然后以特性的频率持续发射数据流,这个功能类似于计时器(Timer
),但是用起来会更方便一点:
var timerObservable = Observable.periodic(Duration(seconds: 1), (x) => x.toString() );
timerObservable.listen(print);
- 从单个值中创建
有时候API需要一个Stream/Observable
,但是你的初始数据只是一个简单的值,这个时候你可以使用just
工厂函数:
var justObservable = Observable<int>.just(42);
justObservable.listen(print);
// this will print : 42
- 从Future中创建
通过一个Future
创建的Observable
,会先等待Future
执行完毕,完后发射数据,这个输出数据就是Future
的执行结果,如果Future没有任何返回值,那么输出null
。另一种从Future
中创建Stream
的方法是调用Future的toStream()
方法。
你可能有点好奇,我都等Future
执行完了,为什么还要把它转化为一个Observable/Stream
呢?放心,当你后边看到在Stream
上利用各种函数操作数据有多方便的时候就明白了。
Future<String> asyncFunction() async {
return Future.delayed(const Duration(seconds: 1), () => "AsyncRsult");
}
test('Create Observable from Future', () async {
print('start');
var fromFutureObservable = Observable.fromFuture(asyncFunction());
fromFutureObservable.listen(print);
}
Subjects
Subjects
是RxDart的流控制器(StreamController
),但Subjects
但行为跟StreamControllers
还是有些区别的:
- 你可以在一个
Subject
上直接listen()
,而不需要拥有对这个Stream
的访问权限; - 你可以添加多个
subscription
,它们会同时收到同样的数据; -
Subjects
有三种类型,我们下面会用例子来说明。
PublishSubjects
和StreamControllers的行为很像,也支持多个监听:
var subject = new PublishSubject<String>();
subject.listen((item) => print(item));
subject.add("Item1");
// 添加第二个listener
subject.listen((item) => print(item.toUpperCase()));
subject.add("Item2");
subject.add("Item3");
// 这句话只是为了防止在数据处理完毕之前,进程就被杀死了
await Future.delayed(Duration(seconds: 5));
// 取消所有的Subscriptions
subject.close;
运行上边的代码,输出结果如下:
Item1
ITEM2
Item2
ITEM3
Item3
✓ PublishSubject
因为第二个监听是在中途加进来的,所以它并没有监听到数据Item1。
BehaviourSubject
每一个新加的监听,接收到的第一个数据都是上一个数据(再往前的数据不会监听到,只会缓存一个数据)。
var subject = new BehaviorSubject<String>();
subject.listen((item) => print(item));
subject.add("Item1");
subject.add("Item2");
subject.listen((item) => print(item.toUpperCase()));
subject.add("Item3");
运行上边的代码,输出结果如下:
Item1
ITEM2
ITEM3
Item2
Item3
✓ BehaviourSubject
我们发现第二个subscriber没有监听到Item1,但是监听到了Item2,而且第二个subscriber比第一个subscriber先监听到了Item3。这是因为,你没法决定多个监听的服务顺序(实际上对于单个item,总是后加的监听先接收到数据),但是每个监听获取到的数据依然是有序的。BehaviourSubject
只会为后加的Subscribers缓存最近的一条输出数据。如果你想要缓存更多的数据,可以使用ReplaySubject
,但是大多数情况下我们都用不到。
我们可以多加几个item,可以看的更清楚一些:
var subject = new BehaviorSubject<String>();
subject.listen((item) => print("$item 第1个"));
subject.add("Item1");
subject.add("Item2");
subject.add("Item3");
subject.add("Item4");
subject.listen((item) => print("$item 第2个"));
subject.add("Item5");
subject.add("Item6");
subject.add("Item7");
subject.listen((item) => print("$item 第3个"));
subject.add("Item8");
subject.add("Item9");
输出结果:
I/flutter (18732): Item1 第1个
I/flutter (18732): Item4 第2个
I/flutter (18732): Item5 第2个
I/flutter (18732): Item7 第3个
I/flutter (18732): Item8 第3个
I/flutter (18732): Item2 第1个
I/flutter (18732): Item6 第2个
I/flutter (18732): Item9 第3个
I/flutter (18732): Item3 第1个
I/flutter (18732): Item7 第2个
I/flutter (18732): Item4 第1个
I/flutter (18732): Item8 第2个
I/flutter (18732): Item5 第1个
I/flutter (18732): Item9 第2个
I/flutter (18732): Item6 第1个
I/flutter (18732): Item7 第1个
I/flutter (18732): Item8 第1个
I/flutter (18732): Item9 第1个
第1个监听可以接收item1~item9,第2个监听可以接收item4~item9,第3个监听可以接收item7~item9;对于item4~6,第2个监听比第1个监听先接收到数据;对于item7~7,第3个监听最先接收到数据,然后是第2个监听,最后才是第1个监听;
不难看出,每一个新加的监听接收到的第一条数据,是最近的那条数据(也就是只会缓存最近一条数据的意思);对于单条数据而言,总是后加的监听先接收到。
数据操作
Rx的最大魅力就是让你能够在Stream上自由操作数据,每一个Rx方法都会返回一个新的Stream,同时携带了一个返回值,这意味着你可以链式调用,这是非常有用的。
Map:数据转换
如果你不想错过Stream中的每一个操作,那么请使用map()
。map()
会接收每一个数据,处理之后将数据再push出去,作为返回值返回给Stream:
var subject = new PublishSubject<String>();
subject.map((item) => item.toUpperCase()).listen(print);
subject.add("Item1");
subject.add("Item2");
subject.add("Item3");
输出结果:
ITEM1
ITEM2
ITEM3
✓ Map toUpper
map的返回值类型不一定要和输入值类型相同,下面这个例子就把
integers转换为了Strings,我们组合了两个map操作:
var subject = new PublishSubject<int>();
subject.map((intValue) => intValue.toString())
.map((item) => item.toUpperCase())
.listen(print);
subject.add(1);
subject.add(2);
subject.add(3);
通过map我们可以随意转换数据:
class DataClass{}
class WrapperClass
{
final A wrapped;
WrapperClass(this.wrapped);
}
var subject = new PublishSubject<A>();
subject.map<WrapperClass>((a) => new Wrapper(a));
.map
最常见的使用场景就是:当你从REST API或者数据库中读取数据时,需要将这些数据转化为你需要的自定义类型:
class User {
final String name;
final String adress;
final String phoneNumber;
final int age;
// 实际项目中,我推荐大家使用serializer插件,而不是手动写serializer
factory User.fromJson(String jsonString) {
var jsonMap = json.decode(jsonString);
return User(
jsonMap['name'],
jsonMap['adress'],
jsonMap['phoneNumber'],
jsonMap['age'],
);
}
User(this.name, this.adress, this.phoneNumber, this.age);
@override
String toString() {
return '$name - $adress - $phoneNumber - $age';
}
}
void main() {
test('Map', () {
// 一些比较恶心的源数据
var jsonStrings = [
'{"name": "Jon Doe", "adress": "New York", "phoneNumber":"424242","age": 42 }',
'{"name": "Stephen King", "adress": "Castle Rock", "phoneNumber":"123456","age": 71 }',
'{"name": "Jon F. Kennedy", "adress": "Washington", "phoneNumber":"111111","age": 66 }',
];
// 我们模拟从API/数据库中读到了一串json字符串流,并且构造了一个Subject
// 实际情况下,我们可能是通过类似`asyncWebCallFcuntion().asStream()`的方法来获取的
var dataStreamFromAPI = new PublishSubject<String>();
dataStreamFromAPI
.map<User>((jsonString) => User.fromJson(jsonString)) // 这里将json字符串转为了我们需要的User对象
.listen((user) => print(user.toString()));
// 模拟输入数据
dataStreamFromAPI.add(jsonStrings[0]);
dataStreamFromAPI.add(jsonStrings[1]);
dataStreamFromAPI.add(jsonStrings[2]);
});
说句题外话,除了Streams之外,每一个Iterable都提供了map方法,将其转化为一个List。
Where:数据过滤
如果你只关心Stream中的特定数据,那么可以使用.where()
函数。这个函数其实就是替代if
语句的,但是.where()
会更加方便阅读:
var subject = new PublishSubject<int>();
subject.where((val) => val.isOdd)
.listen( (val) => print('This only prints odd numbers: $val'));
subject.where((val) => val.isEven)
.listen( (val) => print('This only prints even numbers: $val'));
subject.add(1);
subject.add(2);
subject.add(3);
prints:
This only prints odd numbers: 1
This only prints even numbers: 2
This only prints odd numbers: 3
Debounce:数据拦截
来想象一个实际场景:你有一个输入框,里边的文字内容改变的时候,会触发一个函数。用户每按一个键就触发一次函数调用,这个操作挺昂贵的。所以,你可能希望在用户的连续输入暂停一段时间后,再触发这个函数。debounce()
就是为了解决这个问题的,它会在特定时间内吞掉所有的输入事件:
var subject = new PublishSubject<String>();
subject.debounce(new Duration(milliseconds: 500)).listen((s) => print(s));
subject.add('A');
subject.add('AB');
await Future.delayed(Duration(milliseconds: 200));
subject.add("ABC");
// 这时还没有任何的output
await Future.delayed(Duration(milliseconds: 700));
// 现在我们接收到了最终值: 'ABC'
Expand:展开数组
如果你的数据源是一串数组,但是你希望得到的是这些数组中的每一个值,那么可以使用.expand
:
详细的使用方法可以在demo中的FireStore例子中看到。
Merge:数据合并
如果你有多个Stream,然后希望同时处理它们的数据,那么可以使用.mergeWith
(在有些Rx库中这个方法名就叫merge)。它可以将多个Streams组合为一个Stream:
但是要注意,
.mergeWith
的结果顺序是没法手动指定的,哪个Stream的数据先进来,那么就排在前面,先进先出。举个例子,你有两个组建都通过各自的Stream报错了,你希望这些报错能显示在同一个Dialog中:
@override
initState()
{
super.initState();
component1.errors.mergeWith([component2.errors])
.listen( (error) async => await showDialog(error.message));
}
或者你想要从不同的网络中获取信息:
final observableTwitter = getTwitterStream().map((data) => new MyAppPost.fromTwitter(data));
final observableFacebook = getFacebookStream().map((data) => new MyAppPost.fromFaceBook(data));
final postStream = observableTwitter.mergeWith([observableFacebook]);
Distinct:过滤相同数据
假设我们有两个不同的Observable:isBusyOne 和 isBusyTwo,但是它们的结果是一样的,那么它们会用同样的数据去各自刷新一遍UI。但我们知道,这是不必要的。为了防止这种情况发生,我们可以使用.distinct()
(有些Rx库将这个方法命名为distinctUntilChanged)。它保证了传递给Stream的每一条数据都和上一条不一样:
bservable<bool> isBusy => isBusyOne.mergeWith([isBusyTwo]).distinct();
ZipWith:数据合并
zipWith
也是将一个Stream和另一个合并到一起,但是它和.mergeWith
不一样。.mergeWith
只要拿到了数据就立刻发射出去,但是zipWith
会等到所有数据都接收完毕后,将这些数据重组后再发射出去:
一个最基本使用方法是:
new Observable.just(1) // .just() creates an Observable that directly emits the past value
.zipWith(new Observable.just(2), (one, two) => one + two)
.listen(print); // prints 3
在App开发中一个很常见的场景就是:你需要等待两个异步函数完成,每个异步函数都返回了一个Future,你希望这两个异步操作都完成之后再处理数据。下面这个例子中,我们模拟一个异步操作返回一个User,另一个返回一串json字符串,我们在调用Invoice之前,需要等待这两个操作都完成:
class Invoice {
final User user;
final Product product;
Invoice(this.user, this.product);
printInvoice() {
print(user.toString());
print(product.toString());
}
}
// 模拟网络返回JSON字符串
Future<String> getProduct() async {
print("Started getting product");
await Future.delayed(Duration(seconds: 2));
print("Finished getting product");
return '{"name": "Flux compensator", "price": 99999.99}';
}
// 模拟网络返回JSON字符串
Future<String> getUser() async {
print("Started getting User");
await Future.delayed(Duration(seconds: 4));
print("Finished getting User");
return '{"name": "Jon Doe", "adress": "New York", "phoneNumber":"424242","age": 42 }';
}
void main() {
test('zipWith', () async {
var userObservable =
Observable.fromFuture(getUser()).map<User>((jsonString) => User.fromJson(jsonString));
var productObservable = Observable.fromFuture(getProduct())
.map<Product>((jsonString) => Product.fromJson(jsonString));
Observable<Invoice> invoiceObservable = userObservable.zipWith<Product, Invoice>(
productObservable, (user, product) => Invoice(user, product));
print("Start listening for invoices");
invoiceObservable.listen((invoice) => invoice.printInvoice());
// 这段话只是为了防止stream数据操作完成时进程被杀死
await Future.delayed(Duration(seconds: 5));
});
}
输出结果:
Started getting User
Started getting product
Start listening for invoices
Finished getting product
Finished getting User
Jon Doe - New York - 424242 - 42
Flux compensator - 99999.99
CombineLatest:组合数据
combineLatest跟merge和zip一样也是组合数据,但是有一些轻微都区别。它接收到一个Stream的数据后,不仅仅会发射这个Stream带来的数据,还会将其他Stream中的最近的数据也发射出去。也就是说,有n个Stream,它每一次就发射n个数据,发射的数据是每个Stream上最近的一条数据;任意一个Stream的数据进来的时候,都会触发一次发射;刚开始的时候,数据种类不足n时,会等待(也就是第一次发射必须保证每个Stream都有数据被传递过来):
和之前的方法都不一样,combineLatest不是一个实例方法,而是一个静态方法。Dart也不允许开发者去定义各种版本的combineLatest2、combineLatest3······ combineLatest n。
如果你有两个
Observable<bool>
来标示你的App某些部分处于busy状态,你想要展示一个busy spinner,那么最佳的实现方案是:
class Model
{
Observable<bool> get isBusy =>
Observable.combineLatest2(isBusyOne,isBusyTwo, (b1, b2) => b1 || b2);
PublishSubject<bool> isBusyOne;
PublishSubject<bool> isBusyTwo;
}
在UI层,你可以用一个StreamBuilder来展示busy spinner,如果接收值是true则显示。
combineLatest和FireStore snapshots streams结合使用也是很强大的。假设你的App有一个这样的需求:将股票行情和天气预报展示在一起。两个数据被存放在了两个不同的FireStore集合中,两者都是通过不同的后端服务独立更新。你可以通过StreamBuilder来更新数据,配合combineLatest使用会非常简单:
class WeatherForecast {
final String forecastText;
final GeoPoint location;
factory WeatherForecast.fromMap(Map<String, dynamic> map) {
return WeatherForecast(map['forecastText'], map['location']);
}
WeatherForecast(this.forecastText, this.location);
}
class NewsMessage {
final String newsText;
final GeoPoint location;
factory NewsMessage.fromMap(Map<String, dynamic> map) {
return NewsMessage(map['newsText'], map['location']);
}
NewsMessage(this.newsText, this.location);
}
class CombinedMessage {
final WeatherForecast forecast;
final NewsMessage newsMessage;
CombinedMessage(this.forecast, this.newsMessage);
}
class Model {
CollectionReference weatherCollection;
CollectionReference newsCollection;
Model() {
weatherCollection = Firestore.instance.collection('weather');
newsCollection = Firestore.instance.collection('news');
}
Observable<CombinedMessage> getCombinedMessages() {
Observable<WeatherForecast> weatherForecasts = weatherCollection
.snapshots()
.expand((snapShot) => snapShot.documents)
.map<WeatherForecast>((document) => WeatherForecast.fromMap(document.data));
Observable<NewsMessage> news = newsCollection
.snapshots()
.expand((snapShot) => snapShot.documents)
.map<NewsMessage>((document) => NewsMessage.fromMap(document.data));
return Observable.combineLatest2(
weatherForecasts, news, (weather, news) => CombinedMessage(weather, news));
}
}
在UI层,你可以这么写:StreamBuilder<CombinedMessage>(stream: model.getCombinedMessages(),...)
AsyncMap:异步数据转换
除了map()
,还有一个asyncMap
方法,让你可以在map函数中进行异步操作。我们可以将上面的FireStore例子稍微修改一下,WeatherForecast取决于NewsMessage中的location的值,并且只有NewsMessage更新的时候才会跟着一起更新:
Observable<CombinedMessage> getDependendMessages() {
Observable<NewsMessage> news = newsCollection.snapshots().expand((snapShot) {
return snapShot.documents;
}).map<NewsMessage>((document) {
return NewsMessage.fromMap(document.data);
});
return news.asyncMap((newsEntry) async {
var weatherDocuments =
await weatherCollection.where('location', isEqualTo: newsEntry.location).getDocuments();
return new CombinedMessage(
WeatherForecast.fromMap(weatherDocuments.documents.first.data), newsEntry);
});
}
每次newsCollection
变化的时候,getDependendMessages
返回的Observable
会发射一个新的CombinedMessage
。
Observables调试
Dart的=>
使得在debug的时候很困难:
Observable<NewsMessage> news = newsCollection
.snapshots()
.expand((snapShot) => snapShot.documents)
.map<NewsMessage>((document) => NewsMessage.fromMap(document.data));
所以如果要打断点的话,请将=>
展开(IDE一般会有Convert to block body
的方法,比如Android Studio可以使用Alt+Enter):
Observable<NewsMessage> news = newsCollection
.snapshots()
.expand((snapShot) {
return snapShot.documents;
})
.map<NewsMessage>((document) {
return NewsMessage.fromMap(document.data);
});
注意点
RxDart的transforming函数应该只用来处理数据流,所以不要尝试在这些函数中修改变量/状态(variables/state),这些代码请写在.listen
中。所以,不要这么写:
Observable.fromFuture(getProduct())
.map<Product>((jsonString) {
var product = Product.fromJson(jsonString);
database.save(product);
setState((){ _product = product });
return product;
}).listen();
而是这么写:
Observable.fromFuture(getProduct())
.map<Product>((jsonString) => Product.fromJson(jsonString))
.listen( (product) {
database.save(product);
setState((){ _product = product });
});
map()
函数的唯一作用就是数据转换,不要在里面做任何多余的操作。在map函数里面写其他操作也会降低代码的可读性,也容易隐藏一些bug。
资源释放
为了防止内存泄漏,请在适当的时候调用subscriptions
的cancel()
方法,或者者dispose
掉你的StreamControllers
,或者关闭你的Subjects
。
网友评论