(一)flatMap (mergeMap) 、 concatMap、switchMap 区别
flatMap它是mergeMap的一个别名, mergeMap
mergeMap在处理 inner Observables时是立即执行,也就是说所有的inner Observables在执行下一个inner Observable 时没有等待上一个 inner Observable是否结束,同时也不能取消上一个 inner Observable的执行且inner Observable的执行还是无序的,所有的inner Observables 是直接合并到最终输出的Observable上。
concatMap
concatMap是按输入的顺序执行的,concatMap在处理下一个inner Observable前会等待上一个inner Observable执行的结束,同时concatMap和mergeMap一样不能取消inner Observable的执行。
switchMap
switchMap会优先处理最近的inner Observable,与concatMap和mergeMap不同的是switchMap在执行下一个inner Observable前可以取消上一个inner Observable的执行,在Observable被取消后emit的 inner Observable会被丢弃(不包括最终的Observable)
(二)forkJoin, zip, combineLatest之间的区别
forkJoin, zip, combineLatest是rxjs中的合并操作符,用于对多个流进行合并。很多人第一次接触rxjs时往往分不清它们之间的区别,其实这很正常,因为当你准备用来合并的流是那种只会发射一次数据就关闭的流时(比如http请求),就结果而言这三个操作符没有任何区别。
const ob1 = Rx.Observable.of(1).delay(1000);
const ob2 = Rx.Observable.of(2).delay(2000);
const ob3 = Rx.Observable.of(3).delay(3000);
Rx.Observable.forkJoin(ob1, ob2, ob3).subscribe((data) => console.log(data));
Rx.Observable.zip(ob1, ob2, ob3).subscribe((data) => console.log(data));
Rx.Observable.combineLatest(ob1, ob2, ob3).subscribe((data) => console.log(data));
// [1, 2, 3]
// [1, 2, 3]
// [1, 2, 3]
// 都是在3秒的时候打印
rxjs中很多操作符功能相近,只有当其操作的流会多次发射数据时才会体现出它们之间的区别,下面我们来详细解释forkJoin, zip, 和combineLatest。
一个基本概念
首先我们要知道,一个流(或者说Observable序列)的生命周期中,每次发射数据会发出next信号(Notification),结束发射时会发出complete信号,发生错误时发出error信号,三个信号分别对应observer的三个方法。next信号会由于发射源的不同发射0到多次;而complete和error仅会发射其中一个,且只发射一次,标志着流的结束。
subscribe接收一个observer对象用来处理上述三种信号,只传入一个函数会被认为是next方法,因此传入subscribe的next方法会执行0到N次,N为序列正常发射信号的次数。
forkJoin
用forkJoin合并的流,会在每个被合并的流都发出结束信号时发射一次也是唯一一次数据。假设我们有两个流:
const ob1 = Rx.Observable.interval(1000).map(d => `ob1:${d}`).take(3);
const ob2 = Rx.Observable.interval(2000).map(d => `ob2:${d}`).take(2);
Rx.Observable.forkJoin(ob1, ob2).subscribe((data) => console.log(data));
// ["ob1:2", "ob2:1"]
ob1会在发射完第三个数据时停止发射,ob2会在发射完第二个数据时停止,而forkJoin合并后的流会等到ob1和ob2都结束时,发射一次数据,也就是触发一次subscribe里的回调,接收到的数据为ob1和ob2发射的最后一次数据的数组。
zip
zip工作原理如下,当每个传入zip的流都发射完毕第一次数据时,zip将这些数据合并为数组并发射出去;当这些流都发射完第二次数据时,zip再次将它们合并为数组并发射。以此类推直到其中某个流发出结束信号,整个被合并后的流结束,不再发射数据。
const ob1 = Rx.Observable.interval(1000).map(d => `ob1:${d}`).take(3);
const ob2 = Rx.Observable.interval(2000).map(d => `ob2:${d}`).take(2);
Rx.Observable.zip(ob1, ob2).subscribe({
next: (data) => console.log(data),
complete: () => console.log('complete')
});
// ["ob1:0", "ob2:0"] ob1等待ob2发射数据,之后合并
// ["ob1:1", "ob2:1"] 此时ob2结束,整个合并的流也结束
// "complete"
zip和forkJoin的区别在于,forkJoin仅会合并各个子流最后发射的一次数据,触发一次回调;zip会等待每个子流都发射完一次数据然后合并发射,之后继续等待,直到其中某个流结束(因为此时不能使合并的数据包含每个子流的数据)。
combineLatest
combineLatest与zip很相似,combineLatest一开始也会等待每个子流都发射完一次数据,但是在合并时,如果子流1在等待其他流发射数据期间又发射了新数据,则使用子流最新发射的数据进行合并,之后每当有某个流发射新数据,不再等待其他流同步发射数据,而是使用其他流之前的最近一次数据进行合并。
const ob1 = Rx.Observable.interval(1000).map(d => `ob1:${d}`).take(3);
const ob2 = Rx.Observable.interval(2000).map(d => `ob2:${d}`).take(2);
Rx.Observable.combineLatest(ob1, ob2).subscribe({
next: (data) => console.log(data),
complete: () => console.log('complete')
});
// ["ob1:1", "ob2:0"] ob1等待ob2发射,当ob2发射时ob1已经发射了第二次数据,使用ob1的第二次数据
// ["ob1:2", "ob2:0"] ob1继续发射第三次也是最后一次数据,ob2虽然还未发射,但是可以使用它上一次的数据
// ["ob1:2", "ob2:1"] ob2发射第二次也是最后一次数据,使ob1上一次的数据。
// "complete"
(三)distinctUntilChanged 过滤
distinctUntilChanged 用于在连续的数据流中按照设定条件过滤数据源。
只有当前值与之前最后一个值不同时才将其发出。distinctUntilChanged 在非连续数据流的时候,是会自动跳过不执行的,比如以of(1) 开头的数据流。
网友评论