multicast
之前演示过用Subject将单播Observable转成多播,multicast做的也是同样的事。(源头发送的值,会传送到各个observalbe中去)
- 通过subject将单播转成多播的方式
const observable = from([1, 2, 3])
const subject = new Subject<number>();
subject.subscribe({
next: (v) => console.log(`observer A:${v}`)
})
subject.subscribe({
next: (v) => console.log(`observer B:${v}`)
})
observable.subscribe(subject)
//observer A:1
//observer B:1
//observer A:2
//observer B:2
//observer A:3
//observer B:3
- 通过multicast实现相同的效果
const source = from([1, 2, 3])
const multicasted = source.pipe(multicast(new Subject())) as ConnectableObservable<number>
multicasted.subscribe({
next: (v) => console.log(`observer A:${v}`)
})
multicasted.subscribe({
next: (v) => console.log(`observer B:${v}`)
})
multicasted.connect()
这里的connect相当于之前的subscribe,返回的同样是subscription
- 官网案例
const source = interval(500)
const subject = new Subject();
const multicasted = source.pipe(multicast(subject)) as ConnectableObservable<number>
let subscription1, subscription2, subscriptionConnect;
subscription1 = multicasted.subscribe({
next: (v) => console.log(`observerA:${v}`)
})
subscriptionConnect = multicasted.connect();
setTimeout(() => {
subscription2 = multicasted.subscribe({
next: (v) => console.log(`observerB:${v}`)
})
}, 1000)
setTimeout(() => {
subscription1.unsubscribe();
}, 2000)
setTimeout(() => {
subscription2.unsubscribe();
//subscription2退订后,source已经没有订阅者了,但要取消subscriptionConnect才是真正的退订
subscriptionConnect.unsubscribe();
}, 7000)
//observerA:0 1 2
//observerB:2
//observerA:3
//observerB:3 4 5 6 7 8 9 10 11 12 13
如果觉得
multicast
必须调用connect
方法才能推送值,还要multicasted.unsubscribe()
结束推送有些麻烦,就可以用refCount
refCount
当有
Observer
订阅源Observable
时,自动调用connect
,当Observer
全部unsubscribe
后,即没有Observer
了,自动调用connect().unsubscribe()
退订
const source = interval(500)
const subject = new Subject();
const multicasted = source.pipe(multicast(new (Subject)), refCount()) as ConnectableObservable<number>
let subscription1, subscription2, subscriptionConnect;
subscription1 = multicasted.subscribe({
next: (v) => console.log(`observerA:${v}`)
})
//当有subscribe会自动调用,不再需要connect()
// subscriptionConnect = multicasted.connect();
setTimeout(() => {
subscription2 = multicasted.subscribe({
next: (v) => console.log(`observerB:${v}`)
})
}, 1000)
setTimeout(() => {
subscription1.unsubscribe();
}, 2000)
setTimeout(() => {
subscription2.unsubscribe();
//同样,当不存在订阅的时候,会自动将subscriptionConnect取消
// subscriptionConnect.unsubscribe();
}, 7000)
通过使用refCount,效果和上面的相同
publish
multicast(new Subject)很常用,可用publish将其简化:
const subject = new Subject();
const refCount = source.pipe(multicast(subject),refCount())
//等价于
const refCounted = source.pipe(publish(),refCount())
与Subject类似,publish也有三种变种方法:
- publishBehavior(0) => new BehaviorSubject(0)
- publishReplay(2) => new ReplaySubject(2)
- publishLast() => new AsyncSubject()
- publishBehavior
与BehaviorSubject()略有不同,observserB并没有推送
const source = range(1, 5)
const refCounted = source.pipe(publishBehavior(0), refCount())
// 一旦有了subscriber,就会自动调用connect()
console.log('observerA subscribed')
refCounted.subscribe({
next: (v) => console.log(`observerA:${v}`)
})
setTimeout(() => {
// 两秒后再增加一个subscribed
console.log('observerB subscribed')
refCounted.subscribe({
next: (v) => console.log(`observerB:${v}`)
})
}, 2000)
//observerA subscribed
//observerA:0 1 2 3 4 5
//observerB subscribed
- publishReplay
可以通过publishReplay(2)拿到最新的两个值,也可以在第二个参数publishReplay(2,3000)中设置过期时间
const source = range(1, 5)
const refCounted = source.pipe(publishReplay(2), refCount())
// 一旦有了subscriber,就会自动调用connect()
console.log('observerA subscribed')
refCounted.subscribe({
next: (v) => console.log(`observerA:${v}`)
})
setTimeout(() => {
// 两秒后再增加一个subscribed
console.log('observerB subscribed')
refCounted.subscribe({
next: (v) => console.log(`observerB:${v}`)
})
}, 2000)
//observerA subscribed
//observerA:0 1 2 3 4 5
//observerB subscribed
//observerB:4 5
- publishLast
在结束时,推送最后一个值
const source = range(1, 5)
const refCounted = source.pipe(publishLast(), refCount())
// 一旦有了subscriber,就会自动调用connect()
console.log('observerA subscribed')
refCounted.subscribe({
next: (v) => console.log(`observerA:${v}`)
})
setTimeout(() => {
// 两秒后再增加一个subscribed
console.log('observerB subscribed')
refCounted.subscribe({
next: (v) => console.log(`observerB:${v}`)
})
}, 2000)
//observerA subscribed
//observerA:5
//observerB subscribed
//observerB:5
share
publish+refCount的简写
const source = interval(1000)
const refCounted = source.pipe(share())
// 一旦有了subscriber,就会自动调用connect()
console.log('observerA subscribed')
refCounted.subscribe({
next: (v) => console.log(`observerA:${v}`)
})
setTimeout(() => {
// 两秒后再增加一个subscribed
console.log('observerB subscribed')
refCounted.subscribe({
next: (v) => console.log(`observerB:${v}`)
})
}, 2000)
//observerA subscribed
//observerA:0
//observerA:1
//observerB subscribed
//observerA:2
//observerB:2
//……
shareReplay
publishReplay+refCount的简写
const source = range(2,8);
const refCounted = source.pipe(shareReplay(2,3000))
网友评论