rxdart学习笔记
rxdart是对流的操作,在dart里,有时如果可以用Future实现,会比用流来实现方便,毕竟流需要回调监听
例如使用dio发起多个并发请求:
response = await Future.wait([dio.post("/info"), dio.get("/token")]);
[TOC]
官方rxdart github地址
RxDart为Dart Streams 和StreamControllers增加了额外的功能。
Dart提供了一个非常不错的开箱即用的 Streams API,RxDart并没有试图为这个API提供一个替代方案,而是在其上添加了反应性扩展规范的功能
RxDart不提供自己的可观察类作为Dart流的替代品。相反,它提供了许多额外的流类、操作符(流类上的扩展方法)和主题。
如果您熟悉其他语言的可观察性,请参阅 the Rx Observables vs Dart Streams comparison chart,以了解两者之间的显著区别
RxDart通过三种方式向Dart Streams添加功能:
Stream Classes——创建具有特定功能的流,例如将多个流合并在一起。
Extension Methods - 将源流转换为具有不同功能的新流,例如节流或缓冲事件
Subjects - 具有额外能力的流控制器
Stream Classes
Stream Classes提供了创建流的不同方法例如:Stream. fromiterable
或Stream. periodic
.RxDart为各种任务提供了额外的流类,比如将流合并在一起!
可以通过两种方式构建RxDart提供的流。下面的例子在功能上是等价的:
1.直接实例化流类。
例如:final mergedStream = MergeStream([myFirstStream, mySecondStream]);
2.使用来自Rx类的静态工厂,这对于发现RxDart提供的流类型非常有用。在内部,这些工厂只是调用相应的流构造函数。
例如:final mergedStream = Rx.merge([myFirstStream, mySecondStream]);
类/静态工厂的列表
ConcatStream / Rx.concat
连接所有指定的流序列,只要前一个流序列成功终止。它通过逐个订阅每个流、发送所有项并在订阅下一个流之前完成此操作
rxdart-concat.pngRx.concat([
Stream.value(1),
Rx.timer(2, Duration(milliseconds: 1000)),
Stream.value(3)
])
.listen(print); // prints 1, 2, 3
ConcatEagerStream / Rx.concatEager
连接所有指定的流序列,只要前一个流序列成功终止。在concatEager的情况下,不是订阅一个接一个的流,而是立即订阅所有流。然后,在前一个流完成项目发射之后,在正确的时间捕获和发射事件。如果有多个耗时的流,连接后的流完成时,总用时会比 Rx.concat 总用时少。
Rx.concatEager([
Stream.value(1),
Rx.timer(2, Duration(days: 1)),
Stream.value(3)
])
.listen(print); // prints 1, 2, 3
DeferStream / Rx.defer
等待:延迟工厂将一直等待,直到观察者订阅它,然后它使用给定的工厂函数创建一个流。
在某些情况下,等到最后一分钟(即订阅时)才生成流可以确保该流包含最新的数据。
默认情况下,延迟流是单次订阅。然而,使它们可重用是可能的。
Rx.defer(() => Stream.value(1))
.listen(print); //prints 1
MergeStream / Rx.merge
合并: 将给定流发出的项扁平化为单个流序列。
Rx.merge([
Rx.timer(1, Duration(milliseconds: 1000)),
Stream.value(2),
Rx.timer(3, Duration(milliseconds: 1000)),
]).listen((value){
print('value:$value\ttime:${DateTime.now().toIso8601String()}');
});
rxdart-merge.png
NeverStream / Rx.never
从不:返回一个不终止的流序列,可用于表示无限的持续时间。
never操作符具有非常具体和有限的行为。这些对于测试目的很有用,有时也可以用于与其他流结合,或者作为期望其他流作为参数的操作符的参数。
Rx.never().listen((value){
print('never');
}); // Neither prints nor terminates
print('end');
RaceStream / Rx.race
给定两个或多个源流,仅从其中第一个流发出所有项,以发出一个项或通知。只发出最快执行完的流的值,其他流的就不会发出来了
Rx.race([
Rx.timer(1, Duration(days: 1)),
Rx.timer(2, Duration(days: 2)),
Rx.timer(3, Duration(seconds: 1))
]).listen(print); // prints 3
rxdart-race.png
RepeatStream / Rx.repeat
重复:创建将重新创建并重新侦听源流指定次数的流,直到流成功终止。
如果没有指定count,则无限重复。
RepeatStream((int repeatCount) =>
Stream.value('repeat index: $repeatCount'), 3)
.listen((i) => print(i); // Prints 'repeat index: 0, repeat index: 1, repeat index: 2'
RetryStream / Rx.retry
重试:创建将重新创建并重新侦听源流指定次数的流,直到流成功终止。
如果没有指定重试计数,它会无限地重试。如果满足重试计数,但流没有成功终止,将抛出一个RetryError。RetryError将包含导致失败的所有错误和堆栈跟踪。
Rx.retry(() { Stream.value(1); })
.listen((i) => print(i); // Prints 1
Rx
.retry(() {
Stream.value(1).concatWith([Stream.error(Error())]);
}, 1)
.listen(print, onError: (e, s) => print(e); // Prints 1, 1, RetryError
RetryWhenStream / Rx.retryWhen
重试:创建一个流,该流将在通知器发出新值时重新创建并重新侦听源流。如果源流发出错误或它完成,则流终止。
如果retryWhenFactory发出错误,将抛出一个RetryError。RetryError将包含导致失败的所有错误和堆栈跟踪。
//Basic Example
RetryWhenStream<int>(
() => Stream<int>.fromIterable(<int>[1]),
(dynamic error, StackTrace s) => throw error,
).listen(print); // Prints 1
//Periodic Example
RetryWhenStream<int>(
() => Stream<int>
.periodic(const Duration(seconds: 1), (int i) => i)
.map((int i) => i == 2 ? throw 'exception' : i),
(dynamic e, StackTrace s) {
return Rx.timer('random value', const Duration(milliseconds: 200);
},
).take(4).listen(print); // Prints 0, 1, 0, 1
//Complex Example
//出错时streamFactory出错时,进入retryWhenFactory,
// 如果retryWhenFactory返回是一个普通流,则重新创建
//如果retryWhenFactory返回是一个Stream.error(e),则不重建,全部结束
bool errorHappened = false;
RetryWhenStream(
() => Stream
.periodic(const Duration(seconds: 1), (i) => i)
.map((i) {
if (i == 3 && !errorHappened) {
throw 'We can take this. Please restart.';
} else if (i == 4) {
throw 'It\'s enough.';
} else {
return i;
}
}),
(e, s) {
errorHappened = true;
if (e == 'We can take this. Please restart.') {
return Stream.value('Ok. Here you go!');
} else {
return Stream.error(e);
}
},
).listen(
print,
onError: (e, s) => print(e),
); // Prints 0, 1, 2, 0, 1, 2, 3, RetryError
SequenceEqualStream / Rx.sequenceEqual
对比:确定两个流是否发出相同的项序列。您可以提供一个可选的equals处理程序来确定是否相等。
Rx.sequenceEqual(
Stream.fromIterable([1, 2, 3, 4, 5]),
Stream.fromIterable([1, 2, 3, 4, 5])
).listen(print); // prints true
rxdart-sequenceEqual.png
SwitchLatestStream / Rx.switchLatest
将发出流的流(又名“更高阶流”)转换为发出这些流最近发出的项的单个流。
当新流从源流发出并订阅新流时,此流将取消对先前发出的流的订阅。
final switchLatestStream = SwitchLatestStream<String>(
Stream.fromIterable(<Stream<String>>[
Rx.timer('A', Duration(seconds: 2)),
Rx.timer('B', Duration(seconds: 1)),
Stream.value('C'),
]),
);
// Since the first two Streams do not emit data for 1-2 seconds, and the
// 3rd Stream will be emitted before that time, only data from the 3rd
// Stream will be emitted to the listener.
switchLatestStream.listen(print); // prints 'C'
TimerStream / Rx.timer
在一段指定的时间后发出给定的值。
Rx.timer('hi', Duration(minutes: 1))
.listen((i) => print(i); // print 'hi' after 1 minute
CombineLatestStream (combine2, combine3... combine9) / Rx.combineLatest2...Rx.combineLatest9
每当任何源流序列发出一个项时,使用组合器函数将给定流合并为一个流序列。
在所有流至少发出一个项之前,流不会发出。
Basic Example
这个构造函数接受一个Iterable<Stream<T>>
,并在任何值从源流更改时输出一个Stream<Iterable<T>>
。这对于动态数量的源流非常有用!
CombineLatestStream.list<String>([
Stream.fromIterable(['a']),
Stream.fromIterable(['b']),
Stream.fromIterable(['C', 'D'])])
.listen(print); //prints ['a', 'b', 'C'], ['a', 'b', 'D']
Example with combiner
如果您希望将值列表合并到您面前的新对象中
CombineLatestStream(
[
Stream.fromIterable(['a']),
Stream.fromIterable(['b']),
Stream.fromIterable(['C', 'D'])
],
(values) => values.last
)
.listen(print); //prints 'C', 'D'
Example with a specific number of Streams
如果您希望将特定数量的流与每个流的值的适当类型信息组合在一起,请使用combine2 - combine9操作符。
CombineLatestStream.combine2(
Stream.fromIterable([1]),
Stream.fromIterable([2, 3]),
(a, b) => a + b,
)
.listen(print); // prints 3, 4
Available Extensions
BufferExtensions ConcatExtensions
ConnectableStreamExtensions DebounceExtensions DefaultIfEmptyExtension DelayExtension
DistinctUniqueExtension DoExtensions EndWithExtension EndWithManyExtension
ExhaustMapExtension FlatMapExtension GroupByExtension IgnoreElementsExtension
IntervalExtension MapToExtension MaterializeExtension MaxExtension MergeExtension
MinExtension OnErrorExtensions PairwiseExtension SampleExtensions ScanExtension SkipUntilExtension StartWithExtension StartWithManyExtension SwitchIfEmptyExtension SwitchMapExtension TakeUntilExtension
TakeWhileInclusiveExtension ThrottleExtensions TimeIntervalExtension TimeStampExtension
WhereTypeExtension WindowExtensions WithLatestFromExtensions ZipWithExtension
ForkJoinStream (join2, join3... join9) / Rx.forkJoin2...Rx.forkJoin9
当您拥有一组流并且只关心每个流的最终发出值时,最好使用此操作符。一个常见的用例是,如果您希望在页面加载(或其他事件)上发出多个请求,并且只希望在收到所有响应时采取行动。
在这种情况下,它类似于你如何使用future。
请注意,如果提供给forkJoin错误的任何一个内部流,如果您没有在内部流上正确地捕捉到错误,那么您将丢失任何其他可能或已经完成的流的值。
如果您只关心所有内部流的成功完成,那么您可以从外部捕获错误。同样值得注意的是,如果您有一个排放多个项目的流,并且您关心以前的排放,那么forkJoin不是正确的选择。
在这些情况下,您最好使用像combineLatest或zip这样的操作符。
Basic Example
这个构造函数接受一个Iterable<Stream<T>>
,并在任何值从源流更改时输出一个Stream<Iterable<T>>
。这对于动态数量的源流非常有用!
ForkJoinStream.list<String>([
Stream.fromIterable(['a']),
Stream.fromIterable(['b']),
Stream.fromIterable(['C', 'D'])])
.listen(print); //prints ['a', 'b', 'D']
Example with combiner
如果您希望将值列表合并到您面前的新对象中
CombineLatestStream(
[
Stream.fromIterable(['a']),
Stream.fromIterable(['b']),
Stream.fromIterable(['C', 'D'])
],
(values) => values.last
)
.listen(print); //prints 'D'
Example with a specific number of Streams
如果您希望将特定数量的流与每个流的值的适当类型信息组合在一起,请使用combine2 - combine9操作符。
ForkJoinStream.combine2(
Stream.fromIterable([1]),
Stream.fromIterable([2, 3]),
(a, b) => a + b,
)
.listen(print); // prints 4
Available Extensions
BufferExtensions ConcatExtensions ConnectableStreamExtensions
DebounceExtensions DefaultIfEmptyExtension DelayExtension
DistinctUniqueExtension DoExtensions EndWithExtension
EndWithManyExtension ExhaustMapExtension FlatMapExtension
GroupByExtension IgnoreElementsExtension IntervalExtension MapToExtension
MaterializeExtension MaxExtension MergeExtension
MinExtension OnErrorExtensions PairwiseExtension
SampleExtensions ScanExtension SkipUntilExtension
StartWithExtension StartWithManyExtension SwitchIfEmptyExtension
SwitchMapExtension TakeUntilExtension TakeWhileInclusiveExtension
ThrottleExtensions TimeIntervalExtension TimeStampExtension WhereTypeExtension
WindowExtensions WithLatestFromExtensions ZipWithExtension
RangeStream / Rx.range
返回一个流,该流发出指定范围内的整数序列。
Rx.range(1, 3).listen((i) => print(i); // Prints 1, 2, 3
Rx.range(3, 1).listen((i) => print(i); // Prints 3, 2, 1
ZipStream (zip2, zip3, zip4, ..., zip9) / Rx.zip...Rx.zip9
当所有的流序列在对应的索引处产生了一个元素时,使用给定的zippers函数将指定的流合并为一个流序列。
它严格按照顺序应用这个函数,因此新流发出的第一项将是应用于流#1发出的第一项和流#2发出的第一项的函数的结果;新ZipStream发出的第二项将是应用于流#1发出的第二项和流#2发出的第二项的函数的结果;等等。它将只发出与源流发出的项目数量相等的项目,而源流发出的项目数量最少。
rxdart-zip.pngBasic Example
ZipStream(
[
Stream.fromIterable(['A']),
Stream.fromIterable(['B']),
Stream.fromIterable(['C', 'D']),
],
(values) => values.join(),
).listen(print); // prints 'ABC'
Example with a specific number of Streams
如果您希望压缩特定数量的流以及每个流值的适当类型信息,请使用zip2 - zip9操作符
ZipStream.zip2(
Stream.fromIterable(['A']),
Stream.fromIterable(['B', 'C']),
(a, b) => a + b,
)
.listen(print); // prints 'AB'
Interval / Stream.periodic
创建一个流,在给定的持续时间之后发出流中的每个项。
Stream.periodic(Duration(microseconds: 500), (int i) {
return i;
}).take(10).listen(print);
扩展方法
RxDart提供的扩展方法可以在任何流上使用。它们将源流转换为具有附加功能的新流,例如缓冲或节流事件。
Stream.fromIterable([1, 2, 3])
.throttleTime(Duration(seconds: 1))
.listen(print); // prints 3
扩展方法列表
buffer
创建一个流,其中每个项都是包含源序列项的列表。
每当窗口发出一个事件时,就会发出此列表。
Stream.periodic(Duration(milliseconds: 100), (i) => i)
.buffer(Stream.periodic(Duration(milliseconds: 160), (i) => i))
.listen(print); // prints [0, 1] [2, 3] [4, 5] ...
bufferCount
通过计数从源流中缓冲一些值,然后释放缓冲区并清除它,并启动一个新的缓冲区each startBufferEvery值。如果没有提供startBufferEvery,则在源的开始处以及在每个缓冲区关闭并发出时立即启动新的缓冲区。
count是发出的缓冲区的最大大小
RangeStream(1, 4)
.bufferCount(2)
.listen(print); // prints [1, 2], [3, 4] done!
如果startBufferEvery是2,那么将在源文件的每一个其他值上启动一个新的缓冲区。默认情况下,在源文件的开头处启动一个新的缓冲区。
RangeStream(1, 5)
.bufferCount(3, 2)
.listen(print); // prints [1, 2, 3], [3, 4, 5], [5] done!
bufferTest
创建一个流,其中每个项都是包含源序列项的列表,在测试通过时进行批处理。
Stream.periodic(Duration(milliseconds: 100), (int i) => i)
.bufferTest((i) => i % 2 == 0)
.listen(print); // prints [0], [1, 2] [3, 4] [5, 6] ...
bufferTime
创建一个流,其中每个项都是一个列表,其中包含源序列中的项,在带有持续时间的时间段内进行采样。
Stream.periodic(Duration(milliseconds: 100), (int i) => i)
.bufferTime(Duration(milliseconds: 220))
.listen(print); // prints [0, 1] [2, 3] [4, 5] ...
concatWith
返回一个流,该流从当前流中发出所有项,然后依次从给定流中发出所有项。
TimerStream(1, Duration(seconds: 10))
.concatWith([Stream.fromIterable([2])])
.listen(print); // prints 1, 2
debounce
消除抖動
转换流,以便在一个窗口已完成时仅从源序列发出项,而不由源序列发出另一个项。
此窗口是在发出最后一个取消声明事件之后创建的。您可以使用上一个被取消的事件的值来确定下一个窗口的长度。
窗口将一直打开,直到第一个窗口事件发出。
debounce过滤由源流发出的、随后被另一个发出的项快速跟随的项。
Stream.fromIterable([1, 2, 3, 4])
.debounce((_) => TimerStream(true, Duration(seconds: 1)))
.listen(print); // prints 4
rxdart-debounce.png
debounceTime
转换流,以便当持续时间定义的时间跨度经过时,仅从源序列发出项,而不会由源序列发出另一个项。
此时间跨度在发出最后一个被取消的事件之后开始。
debounceTime过滤掉源流发出的、紧接其后的另一个发出的项。
Stream.fromIterable([1, 2, 3, 4])
.debounceTime(Duration(seconds: 1))
.listen(print); // prints 4
rxdart-debounceTime.png
defaultIfEmpty
从源流发出项,或者如果源流不发出任何东西,则发出单个默认项。
Stream.empty().defaultIfEmpty(10).listen(print); // prints 10
delay
延迟运算符通过在发出源流的每个项之前暂停一个特定的时间增量(您指定的)来修改它的源流。这样做的效果是将流发出的整个项目序列按指定的增量在时间上向前移动。
Stream.fromIterable([1, 2, 3, 4])
.delay(Duration(seconds: 1))
.listen(print); // [after one second delay] prints 1, 2, 3, 4 immediately
dematerialize
将物化流中的onData、onDone和onError通知对象转换为正常的onData、onDone和onError事件。
当一个流被物化后,它会发出onData、onDone和onError事件作为通知对象。去物质化只是通过将通知对象转换回正常的事件流来逆转这种情况。
Example
Stream<Notification<int>>
.fromIterable([Notification.onData(1), Notification.onDone()])
.dematerialize()
.listen((i) => print(i)); // Prints 1
Error example
Stream<Notification<int>>
.fromIterable([Notification.onError(Exception(), null)])
.dematerialize()
.listen(null, onError: (e, s) { print(e) }); // Prints Exception
distinctUnique
警告:在其他Rx实现中通常称为distinct。创建一个流,如果之前已经发出数据事件,则跳过该流。
相等性由提供的equals和hashCode方法确定。如果省略了这些,则使用最后提供的数据元素上的'=='操作符和hashCode。
如果返回的流是广播流,则返回的流是广播流。如果广播流被侦听超过一次,每个订阅将分别执行equals和hashCode测试。
rxdart-distinctUnique.pngdoOnCancel
在流订阅被取消时调用给定的回调函数。通常在其他实现中称为doOnUnsubscribe或doOnDispose。
final subscription = TimerStream(1, Duration(minutes: 1))
.doOnCancel(() => print('hi')).listen(print);
subscription.cancel(); // prints 'hi'
doOnData
当流发出项时调用给定的回调函数。在其他实现中,这称为doOnNext。
Stream.fromIterable([1, 2, 3])
.doOnData(print)
.listen(null); // prints 1, 2, 3
doOnDone
在流完成发送项时调用给定的回调函数。在其他实现中,这称为doOnComplete(d)。
Stream.fromIterable([1, 2, 3])
.doOnDone(() => print('all set'))
.listen(null); // prints 'all set'
doOnEach
当流发出数据、发出错误或发出done时,调用给定的回调函数。回调接收一个通知对象。
通知对象包含事件的类型(OnData、onDone或OnError)以及所发出的项或错误。在onDone的情况下,没有数据作为通知的一部分发出。
Stream.fromIterable([1])
.doOnEach(print)
.listen(null);
//Notification{kind: Kind.OnData, value: 1, error: null, stackTrace: null}
//Notification{kind: Kind.OnDone, value: null, error: null, stackTrace: null}
doOnError
当流发出错误时调用给定的回调函数
Stream.error(Exception())
.doOnError((error, stacktrace) => print('oh no'))
.listen(null); // prints 'Oh no'
//oh no
//Unhandled exception:
doOnListen
在第一次侦听流时调用给定的回调函数。
Stream.fromIterable([1])
.doOnListen(() => print('Is someone there?'))
.listen(null); // prints 'Is someone there?'
doOnPause
在流订阅暂停时调用给定的回调函数。
final subscription = Stream.fromIterable([1]).doOnPause((resumeSignal) {
print('Gimme a minute please resumeSignal:$resumeSignal');
}).listen(print);
subscription.pause(); // prints 'Gimme a minute please'
doOnResume
当流订阅恢复接收项时调用给定的回调函数。
final subscription = Stream.fromIterable([1])
.doOnResume(() => print('Let's do this!'))
.listen(null);
subscription.pause();
subscription.resume(); 'Let's do this!'
endWith
在关闭之前向源流追加一个值。
Stream.fromIterable([2]).endWith(1).listen(print); // prints 2, 1
endWithMany
在关闭之前,将一系列值作为最终事件追加到源流。
Stream.fromIterable([2]).endWithMany([1, 0]).listen(print); // prints 2, 1, 0
exhaustMap
使用给定映射器将项目从源流转换为流。它忽略源流中的所有项,直到新流完成。
当您有一个嘈杂的源流,并且只希望在之前的异步操作完成后进行响应时,该功能非常有用。
RangeStream(0, 2).interval(Duration(milliseconds: 50))
.exhaust((i) =>
TimerStream(i, Duration(milliseconds: 75)))
.listen(print); // prints 0, 2
flatMap
使用给定的映射器函数将每个发出的项转换为流。将侦听新创建的流,并开始向下游发送项。
每个流发出的项将按照它们到达的顺序向下游发出。换句话说,序列被合并在一起。
RangeStream(4, 1)
.flatMap((i) => TimerStream(i, Duration(minutes: i))
.listen(print); // prints 1, 2, 3, 4
flatMapIterable
将每个项转换为流。流必须返回一个Iterable。然后,迭代器中的每个项将逐个发出。
用例:您可能有一个返回项目列表的API,例如流< list >。但是,您可能希望对单个项而不是列表本身进行操作。这就是flatMapIterable的工作。
RangeStream(1, 4)
.flatMapIterable((i) => Stream.fromIterable([[i]])
.listen(print); // prints 1, 2, 3, 4
groupBy
分组:GroupBy操作符将发出项的流划分为发出GroupByStream的流,每个流都发出来自原始源流的项的某个子集。
GroupByStream的作用类似于常规流,但添加了一个“key”属性,它从grouper函数接收其类型和值。
具有相同键的所有项都由相同的GroupByStream发出。
interval
间隔:创建一个流,在给定的持续时间之后发出流中的每个项。
mapTo
映射到:每当源流发出一个值时,就在输出流上发出给定的常数值。
Stream.fromIterable([1, 2, 3, 4])
.mapTo(true)
.listen(print); // prints true, true, true, true
materialize
将onData、on Done和onError事件转换为Notification对象,这些通知对象将传递到下游的onData侦听器中。
Notification对象包含事件的类型(OnData、onDone或OnError)以及所发出的项或错误。在onDone的情况下,没有数据作为通知的一部分发出。
例如:Stream.fromIterable(1) .materialize() .listen((i) => print(i));//打印onData和onDone通知
Stream<int>.error(Exception())
.materialize()
.listen((i) => print(i)); // Prints onError Notification
max
将流转换为Future ,该Future 将使用该流发出的最大项完成。
这类似于在列表中查找最大值,但是这些值是异步的。
final max = await Stream.fromIterable([1, 2, 3]).max();
print(max); // prints 3
mergeWith
将由多个流发出的项组合为单个的项流。这些项是按照它们由其源发出的顺序发出的。
TimerStream(1, Duration(seconds: 10))
.mergeWith([Stream.fromIterable([2])])
.listen(print); // prints 2, 1
min
将流转换为使用流发出的最小项完成的Future。
这类似于在列表中查找最小值,但是这些值是异步的!
final min = await Stream.fromIterable([1, 2, 3]).min();
print(min); // prints 1
onErrorResume
拦截错误事件并切换到由所提供的recoveryFn创建的恢复流。
onErrorResume操作符从源流中截取onError通知。它没有将错误传递给任何侦听器,而是将其替换为由recoveryFn创建的另一个项流。
recoveryFn接收发出的错误并返回一个流。您可以在recoveryFn中执行逻辑,根据发出的错误类型返回不同的流
如果您不需要根据发出的错误类型执行逻辑,请考虑使用onErrorResumeNext或onErrorReturn。
Stream.error(StateError('state error'))
.onErrorResume((dynamic e) =>
Stream.fromIterable([e is StateError ? 1 : 0]))
.listen(print); // prints 0
onErrorResumeNext
拦截错误事件,在这种情况下切换到给定的恢复流
onErrorResumeNext操作符从源流中截取一个onError通知。它不会将错误传递给任何侦听器,而是将其替换为另一个项流。
如果您需要根据发出的错误类型执行逻辑,请考虑使用onErrorResume。
ErrorStream(Exception())
.onErrorResumeNext(Stream.fromIterable([1, 2, 3]))
.listen(print); // prints 1, 2, 3
onErrorReturn
指示流在遇到错误时发出特定项,然后正常终止。
onErrorReturn操作符从源流中截取onError通知。它没有将其传递给任何观察者,而是将其替换为给定的项,然后正常终止。
如果您需要根据发出的错误类型执行逻辑,请考虑使用onErrorReturnWith
ErrorStream(Exception())
.onErrorReturn(1)
.listen(print); // prints 1
onErrorReturnWith
指示流在遇到错误时发出由returnFn创建的特定项,然后正常终止。
onErrorReturnWith操作符从源流中截取onError通知。它没有将其传递给任何观察者,而是将其替换为给定的项,然后正常终止。
returnFn接收发出的错误并返回一个流。您可以在returnFn中执行逻辑,根据发出的错误类型返回不同的流。
如果您不需要根据所发出的错误类型执行逻辑,请考虑使用onErrorReturn。
ErrorStream(Exception())
.onErrorReturnWith((e) => e is Exception ? 1 : 0)
.listen(print); // prints 1
pairwise
将第n和第n-1个事件成对发出。
RangeStream(1, 4)
.pairwise()
.listen(print); // prints [1, 2], [2, 3], [3, 4]
sample
发出自sampleStream上次发出以来源流最近发出的项(如果有的话)。
Stream.fromIterable([1, 2, 3])
.sample(TimerStream(1, Duration(seconds: 1)))
.listen(print); // prints 3
sampleTime
在循环时间范围内(由持续时间定义)发出源流自上次发出以来最近发出的项(如果有的话)
Stream.fromIterable([1, 2, 3])
.sampleTime(Duration(seconds: 1))
.listen(print); // prints 3
scan
对流序列应用累加器函数并返回每个中间结果。可选的种子值用作初始累加器值。
Stream.fromIterable([1, 2, 3])
.scan((acc, curr, i) => acc + curr, 0)
.listen(print); // prints 1, 3, 6
skipUntil
仅在给定流发出项后才开始发出项。
MergeStream([
Stream.fromIterable([1]),
TimerStream(2, Duration(minutes: 2))
])
.skipUntil(TimerStream(true, Duration(minutes: 1)))
.listen(print); // prints 2;
startWith
向源流添加一个值。
Stream.fromIterable([2]).startWith(1).listen(print); // prints 1, 2
startWithMany
在源流前添加一系列值。
Stream.fromIterable([3]).startWithMany([1, 2])
.listen(print); // prints 1, 2, 3
switchIfEmpty
当原始流没有发出项时,此操作符订阅给定的回退流,并从该流发出项。
这在使用来自多个数据源的数据时特别有用。例如,在使用存储库模式时。假设您有一些数据需要加载,您可能希望从最快的接入点开始,然后一直下降到最慢的接入点。例如,首先查询内存中的数据库,然后查询文件系统上的数据库,如果数据不在本地机器上,则查询网络调用。
这可以通过switchIfEmpty非常简单地实现!
switchMap
使用给定的映射器函数将每个发出的项转换为流。新创建的流将被侦听并开始发送项,而之前创建的任何流将停止发送。
switchMap操作符类似于flatMap和concatMap方法,但是它只从最近创建的流发出项。
例如,当您只想要来自异步api的最新状态时,这可能非常有用。
takeUntil
返回源流序列中的值,直到另一个流序列生成一个值。
MergeStream([
Stream.fromIterable([1]),
TimerStream(2, Duration(minutes: 1))
])
.takeUntil(TimerStream(3, Duration(seconds: 10)))
.listen(print); // prints 1
takeWhileInclusive
发出源流所发出的值,只要每个值都满足给定的测试。当某个值不满足测试时,它将发出该值作为最终事件,然后完成。
Stream.fromIterable([2, 3, 4, 5, 6, 1, 2, 3])
.takeWhileInclusive((i) => i < 4)
.listen(print); // prints 2, 3, 4
throttle
当窗口打开时,只发出源流发出的第一项。
如果trailing为真,则会发射最后一项
您可以使用上一个被调节的事件的值来确定下一个window的长度。
Stream.fromIterable([1, 2, 3])
.throttle((_) => TimerStream(true, Duration(seconds: 1)))
throttleTime
在持续时间范围内仅发出源流发出的第一项。
如果尾随为真,则会发射最后一项
Stream.fromIterable([1, 2, 3])
.throttleTime(Duration(seconds: 1))
timeInterval
记录流序列中连续值之间的时间间隔。
Stream.fromIterable([1])
.interval(Duration(seconds: 1))
.timeInterval()
.listen(print); // prints TimeInterval{interval: 0:00:01, value: 1}
timestamp
将源流发出的每个项包装在一个带有时间戳的对象中,该对象包括发出的项和发出项的时间。
Stream.fromIterable([1])
.timestamp()
.listen((i) => print(i)); // prints 'TimeStamp{timestamp: XXX, value: 1}';
whereType
这个转换器是Stream.where然后是Stream.cast缩写。
不匹配T的事件被过滤掉,结果流的类型为T。
Stream.fromIterable([1, 'two', 3, 'four'])
.whereType<int>()
.listen(print); // prints 1, 3
Stream.fromIterable([1, 'two', 3, 'four'])
.where((event) => event is int)
.cast<int>()
.listen(print); // prints 1, 3
window
创建一个流,其中每个项都是包含源序列项的流。
每当窗口发出一个事件时,就会发出此列表。
Stream.periodic(Duration(milliseconds: 100), (i) => i)
.window(Stream.periodic(Duration(milliseconds: 160), (i) => i))
.asyncMap((stream) => stream.toList())
.listen(print); // prints [0, 1] [2, 3] [4, 5] ...
windowCount
通过计数从源流中缓冲一些值,然后将该缓冲区作为流释放并清除它,并启动一个新的缓冲区each startBufferEvery值。如果没有提供startBufferEvery,则在源的开始处以及在每个缓冲区关闭并发出时立即启动新的缓冲区。
RangeStream(1, 4)
.windowCount(2)
.asyncMap((stream) => stream.toList())
.listen(print); // prints [1, 2], [3, 4] done!
windowTest
创建一个流,其中每个项都是包含来自源序列的项的流,在测试通过时进行批处理。
Stream.periodic(Duration(milliseconds: 100), (int i) => i)
.windowTest((i) => i % 2 == 0)
.asyncMap((stream) => stream.toList())
.listen(print); // prints [0], [1, 2] [3, 4] [5, 6] ...
windowTime
创建一个流,其中每个项都是一个流,包含来自源序列的项,在带有持续时间的时间框架中采样。
Stream.periodic(Duration(milliseconds: 100), (int i) => i)
.windowTime(Duration(milliseconds: 220))
.doOnData((_) => print('next window'))
.flatMap((s) => s)
.listen(print); // prints next window 0, 1, next window 2, 3, ...
withLatestFrom
扩展流类,使其能够将源流与另一个流中最后发出的项合并。
zipWith
返回一个流,该流使用给定的zippers函数将当前流与另一个流组合在一起。
Stream.fromIterable([1])
.zipWith(Stream.fromIterable([2]), (one, two) => one + two)
.listen(print); // prints 3
Subjects
Dart提供了StreamController类来创建和管理流。RxDart提供了两个额外的流控制器,它们具有额外的功能,称为Subjects:
BehaviorSubject-一个广播流控制器,缓存最新的增值或错误。当新的侦听器订阅流时,最新的值或错误将发送给侦听器。此外,您可以同步读取最后发出的值。
final subject = BehaviorSubject<int>();
subject.add(1);
subject.add(2);
subject.add(3);
subject.stream.listen(print); // prints 3
subject.stream.listen(print); // prints 3
subject.stream.listen(print); // prints 3
final subject = BehaviorSubject<int>.seeded(1);
subject.stream.listen(print); // prints 1
subject.stream.listen(print); // prints 1
subject.stream.listen(print); // prints 1
ReplaySubject-一个缓存添加值的广播流控制器。当新的侦听器订阅流时,缓存的值将发送到侦听器。
一个特殊的StreamController,它捕获已添加到控制器的所有项,并将它们作为第一个项发送给任何新侦听器。
此主题允许向侦听器发送数据、错误和done事件。当项目被添加到主题时,ReplaySubject将存储它们,当侦听流时,那些已记录的项将被发送到侦听器。在此之后,任何新事件都将被适当地发送到侦听器。可以通过设置maxSize值来限制存储事件的数量。
ReplaySubject,在默认情况下,是一个广播(aka hot)控制器,以履行Rx Subject contract.。这意味着主题的流可以被收听多次。
final subject = ReplaySubject<int>();
subject.add(1);
subject.add(2);
subject.add(3);
subject.stream.listen(print); // prints 1, 2, 3
subject.stream.listen(print); // prints 1, 2, 3
subject.stream.listen(print); // prints 1, 2, 3
final subject = ReplaySubject<int>(maxSize: 2);
subject.add(1);
subject.add(2);
subject.add(3);
subject.stream.listen(print); // prints 2, 3
subject.stream.listen(print); // prints 2, 3
subject.stream.listen(print); // prints 2, 3
网友评论