单播和多播
之前我们看到的所有Observable都是单播的,即源头有值发出时,不管这个Observable被几个Observer订阅,它一次只会给一个Observer推送多播:当源头有值发出时,这个值会同一时间发给所有的Observer
简单来说,单播与多播的区别类似于concat和merge的区别
- 单播
按顺序依次执行,A先走完再开始B
const source$ = range(5)
source$.subscribe(val => console.log('A' + val))
source$.subscribe(val => console.log('B' + val))
//A0
//A1
//A2
//B0
//B1
//B2
- 多播
这里的A和B同时被调用
const source$ = range(2);
const subject$ = new Subject();
subject$.subscribe(val => console.log('A:' + val))
subject$.subscribe(val => console.log('B:' + val))
source$.subscribe(subject$);
//A0
//B0
//A1
//B1
Subject
- 什么是subject?
subject是一种特殊的Observable,而且是多播的。
既然Observable,就可以被subscribe,只不过每个observer都会存一份list,一旦有值发出,每个observer都会同时收到值
subject还是observer,可以执行next(),error(),complete()的方法
const subject = new Subject<number>();
subject.subscribe({
next: (v) => console.log(`observerA:${v}`)
})
subject.subscribe({
next: (v) => console.log(`observerB:${v}`)
})
subject.next(1);
subject.next(2);
//observerA:1
//observerB:1
//observerA:2
//observerB:2
有了subject之后,我们可以直接把subject传到subscribe中
const observable = from([1, 2])
const subject = new Subject<number>();
subject.subscribe({
next: (v) => console.log(`observerA:${v}`)
})
subject.subscribe({
next: (v) => console.log(`observerB:${v}`)
})
observable.subscribe(subject)
//A1
//B1
//A2
//B2
上面的例子中,通过subject将单播的Observable转成了多播的,这是其中一种方式,rxjs提供了一些多播类的操作符也可以将单播的Observable转成多播的
Subject三个子类
BehaviorSubject
BehaviorSubject可以储存最近发送的一个值,只要有新的Observer订阅,就立马推送当前的最新值
const subject = new Subject()
const observerA = {
next: val => console.log('A next:' + val),
error: err => console.log('A error:' + err),
complete: () => console.log('A complete')
}
const observerB = {
next: val => console.log('B next:' + val),
error: err => console.log('B error:' + err),
complete: () => console.log('B complete')
}
subject.subscribe(observerA);
subject.next(1);
subject.next(2);
subject.subscribe(observerB)
//这里只能接收到2之后的推送
subject.next(3);
//A next:1
//A next:2
//A next:3
//B next:3
如果想要拿到2的推送,可以使用BehaviorSubject,且必须要指定一个初始值
const subject = new BehaviorSubject(0)
const observerA = {
next: val => console.log('A next:' + val),
error: err => console.log('A error:' + err),
complete: () => console.log('A complete')
}
const observerB = {
next: val => console.log('B next:' + val),
error: err => console.log('B error:' + err),
complete: () => console.log('B complete')
}
subject.subscribe(observerA);
subject.next(1);
subject.next(2);
subject.subscribe(observerB)
subject.next(3);
//A next:1
//A next:2
//B next:2
//A next:3
//B next:3
ReplaySubject
ReplaySubject可以指定推送最近的多少个值给新的Observer,而BehaviorSubject只会推最近的一个值
const subject = new ReplaySubject(2)
const observerA = {
next: val => console.log('A next:' + val),
error: err => console.log('A error:' + err),
complete: () => console.log('A complete')
}
const observerB = {
next: val => console.log('B next:' + val),
error: err => console.log('B error:' + err),
complete: () => console.log('B complete')
}
subject.subscribe(observerA);
subject.next(1);
subject.next(2);
subject.next(3);
subject.subscribe(observerB)
subject.next(4);
subject.next(5);
// 1 2 3 2 3 4 4 5 5
还可以指定第二个参数,设置缓存的有效期
缓存2个值,并且只在3秒内有效,超过3秒新的Observer将不会订阅任何值
const subject = new ReplaySubject(2, 3000)
subject.subscribe({
next: (v) => console.log(`observerA:${v}`)
})
range(3).subscribe(val => subject.next(val))
setTimeout(() => {
subject.subscribe({
next: (v) => console.log(`ovserverB:${v}`)
})
}, 2000)//这里的时间不能超过3000ms
//observerA:0
//observerA:1
//observerA:2
//observerA:1
//observerA:2
AsyncSubject
只在Subject结束时,推送最后一个值。所以,在没有推送complete时,不会有任何响应
const subject = new AsyncSubject()
subject.subscribe({
next: (v) => console.log(`observerA:${v}`)
})
subject.next(1);
subject.next(2);
subject.subscribe({
next: (v) => console.log(`observerB:${v}`)
})
subject.next(3);
subject.complete();
//observerA:3
//observerB:3
网友评论