美文网首页
12RxJS多播操作符

12RxJS多播操作符

作者: learninginto | 来源:发表于2021-01-11 10:03 被阅读0次
    multicast

    之前演示过用Subject将单播Observable转成多播,multicast做的也是同样的事。(源头发送的值,会传送到各个observalbe中去)

    • 通过subject将单播转成多播的方式
    const observable = from([1, 2, 3])
    const subject = new Subject<number>();
    subject.subscribe({
      next: (v) => console.log(`observer A:${v}`)
    })
    subject.subscribe({
      next: (v) => console.log(`observer B:${v}`)
    })
    observable.subscribe(subject)
    //observer A:1
    //observer B:1
    //observer A:2
    //observer B:2
    //observer A:3
    //observer B:3
    
    • 通过multicast实现相同的效果
    const source = from([1, 2, 3])
    const multicasted = source.pipe(multicast(new Subject())) as ConnectableObservable<number>
    
    multicasted.subscribe({
      next: (v) => console.log(`observer A:${v}`)
    })
    multicasted.subscribe({
      next: (v) => console.log(`observer B:${v}`)
    })
    multicasted.connect()
    

    这里的connect相当于之前的subscribe,返回的同样是subscription

    • 官网案例
    const source = interval(500)
    const subject = new Subject();
    const multicasted = source.pipe(multicast(subject)) as ConnectableObservable<number>
    let subscription1, subscription2, subscriptionConnect;
    subscription1 = multicasted.subscribe({
      next: (v) => console.log(`observerA:${v}`)
    })
    subscriptionConnect = multicasted.connect();
    setTimeout(() => {
      subscription2 = multicasted.subscribe({
        next: (v) => console.log(`observerB:${v}`)
      })
    }, 1000)
    setTimeout(() => {
      subscription1.unsubscribe();
    }, 2000)
    setTimeout(() => {
      subscription2.unsubscribe();
      //subscription2退订后,source已经没有订阅者了,但要取消subscriptionConnect才是真正的退订
      subscriptionConnect.unsubscribe();
    }, 7000)
    //observerA:0 1 2
    //observerB:2
    //observerA:3
    //observerB:3 4 5 6 7 8 9 10 11 12 13
    

    如果觉得multicast必须调用connect方法才能推送值,还要multicasted.unsubscribe()结束推送有些麻烦,就可以用refCount

    refCount

    当有Observer订阅源Observable时,自动调用connect,当Observer全部unsubscribe后,即没有Observer了,自动调用connect().unsubscribe()退订

    const source = interval(500)
    const subject = new Subject();
    const multicasted = source.pipe(multicast(new (Subject)), refCount()) as ConnectableObservable<number>
    let subscription1, subscription2, subscriptionConnect;
    subscription1 = multicasted.subscribe({
      next: (v) => console.log(`observerA:${v}`)
    })
    //当有subscribe会自动调用,不再需要connect()
    // subscriptionConnect = multicasted.connect();
    setTimeout(() => {
      subscription2 = multicasted.subscribe({
        next: (v) => console.log(`observerB:${v}`)
      })
    }, 1000)
    setTimeout(() => {
      subscription1.unsubscribe();
    }, 2000)
    setTimeout(() => {
      subscription2.unsubscribe();
      //同样,当不存在订阅的时候,会自动将subscriptionConnect取消
      // subscriptionConnect.unsubscribe();
    }, 7000)
    

    通过使用refCount,效果和上面的相同

    publish

    multicast(new Subject)很常用,可用publish将其简化:

    const subject = new Subject();
    const refCount = source.pipe(multicast(subject),refCount())
    
    //等价于
    const refCounted = source.pipe(publish(),refCount())
    

    与Subject类似,publish也有三种变种方法:

    1. publishBehavior(0) => new BehaviorSubject(0)
    2. publishReplay(2) => new ReplaySubject(2)
    3. publishLast() => new AsyncSubject()
    1. publishBehavior

    与BehaviorSubject()略有不同,observserB并没有推送

    const source = range(1, 5)
    const refCounted = source.pipe(publishBehavior(0), refCount())
    
    // 一旦有了subscriber,就会自动调用connect()
    console.log('observerA subscribed')
    refCounted.subscribe({
      next: (v) => console.log(`observerA:${v}`)
    })
    
    setTimeout(() => {
      // 两秒后再增加一个subscribed
      console.log('observerB subscribed')
      refCounted.subscribe({
        next: (v) => console.log(`observerB:${v}`)
      })
    }, 2000)
    
    //observerA subscribed
    //observerA:0 1 2 3 4 5
    //observerB subscribed
    
    1. publishReplay

    可以通过publishReplay(2)拿到最新的两个值,也可以在第二个参数publishReplay(2,3000)中设置过期时间

    const source = range(1, 5)
    const refCounted = source.pipe(publishReplay(2), refCount())
    
    // 一旦有了subscriber,就会自动调用connect()
    console.log('observerA subscribed')
    refCounted.subscribe({
      next: (v) => console.log(`observerA:${v}`)
    })
    
    setTimeout(() => {
      // 两秒后再增加一个subscribed
      console.log('observerB subscribed')
      refCounted.subscribe({
        next: (v) => console.log(`observerB:${v}`)
      })
    }, 2000)
    
    //observerA subscribed
    //observerA:0 1 2 3 4 5
    //observerB subscribed
    //observerB:4 5
    
    1. publishLast

    在结束时,推送最后一个值

    const source = range(1, 5)
    const refCounted = source.pipe(publishLast(), refCount())
    
    // 一旦有了subscriber,就会自动调用connect()
    console.log('observerA subscribed')
    refCounted.subscribe({
      next: (v) => console.log(`observerA:${v}`)
    })
    
    setTimeout(() => {
      // 两秒后再增加一个subscribed
      console.log('observerB subscribed')
      refCounted.subscribe({
        next: (v) => console.log(`observerB:${v}`)
      })
    }, 2000)
    //observerA subscribed
    //observerA:5
    //observerB subscribed
    //observerB:5
    
    share

    publish+refCount的简写

    const source = interval(1000)
    const refCounted = source.pipe(share())
    
    // 一旦有了subscriber,就会自动调用connect()
    console.log('observerA subscribed')
    refCounted.subscribe({
      next: (v) => console.log(`observerA:${v}`)
    })
    
    setTimeout(() => {
      // 两秒后再增加一个subscribed
      console.log('observerB subscribed')
      refCounted.subscribe({
        next: (v) => console.log(`observerB:${v}`)
      })
    }, 2000)
    //observerA subscribed
    //observerA:0
    //observerA:1
    //observerB subscribed
    //observerA:2
    //observerB:2
    //……
    
    shareReplay

    publishReplay+refCount的简写

    const source = range(2,8);
    const refCounted = source.pipe(shareReplay(2,3000))
    

    相关文章

      网友评论

          本文标题:12RxJS多播操作符

          本文链接:https://www.haomeiwen.com/subject/cxwraktx.html