美文网首页
11RxJS subject和及其3个子类

11RxJS subject和及其3个子类

作者: learninginto | 来源:发表于2021-01-10 17:19 被阅读0次
    单播和多播

    之前我们看到的所有Observable都是单播的,即源头有值发出时,不管这个Observable被几个Observer订阅,它一次只会给一个Observer推送多播:当源头有值发出时,这个值会同一时间发给所有的Observer

    简单来说,单播与多播的区别类似于concat和merge的区别

    • 单播

    按顺序依次执行,A先走完再开始B

    const source$ = range(5)
    source$.subscribe(val => console.log('A' + val))
    source$.subscribe(val => console.log('B' + val))
    //A0
    //A1
    //A2
    //B0
    //B1
    //B2
    
    • 多播

    这里的A和B同时被调用

    const source$ = range(2);
    const subject$ = new Subject();
    subject$.subscribe(val => console.log('A:' + val))
    subject$.subscribe(val => console.log('B:' + val))
    source$.subscribe(subject$);
    //A0
    //B0
    //A1
    //B1
    
    Subject
    • 什么是subject?

    subject是一种特殊的Observable,而且是多播的。

    既然Observable,就可以被subscribe,只不过每个observer都会存一份list,一旦有值发出,每个observer都会同时收到值

    subject还是observer,可以执行next(),error(),complete()的方法

    const subject = new Subject<number>();
    subject.subscribe({
      next: (v) => console.log(`observerA:${v}`)
    })
    subject.subscribe({
      next: (v) => console.log(`observerB:${v}`)
    })
    subject.next(1);
    subject.next(2);
    //observerA:1
    //observerB:1
    //observerA:2
    //observerB:2
    

    有了subject之后,我们可以直接把subject传到subscribe中

    const observable = from([1, 2])
    const subject = new Subject<number>();
    subject.subscribe({
      next: (v) => console.log(`observerA:${v}`)
    })
    subject.subscribe({
      next: (v) => console.log(`observerB:${v}`)
    })
    observable.subscribe(subject)
    //A1
    //B1
    //A2
    //B2
    

    上面的例子中,通过subject将单播的Observable转成了多播的,这是其中一种方式,rxjs提供了一些多播类的操作符也可以将单播的Observable转成多播的

    Subject三个子类

    BehaviorSubject

    BehaviorSubject可以储存最近发送的一个值,只要有新的Observer订阅,就立马推送当前的最新值

    const subject = new Subject()
    const observerA = {
      next: val => console.log('A next:' + val),
      error: err => console.log('A error:' + err),
      complete: () => console.log('A complete')
    }
    const observerB = {
      next: val => console.log('B next:' + val),
      error: err => console.log('B error:' + err),
      complete: () => console.log('B complete')
    }
    subject.subscribe(observerA);
    subject.next(1);
    subject.next(2);
    subject.subscribe(observerB)
    //这里只能接收到2之后的推送
    subject.next(3);
    //A next:1
    //A next:2
    //A next:3
    //B next:3
    

    如果想要拿到2的推送,可以使用BehaviorSubject,且必须要指定一个初始值

    const subject = new BehaviorSubject(0)
    const observerA = {
      next: val => console.log('A next:' + val),
      error: err => console.log('A error:' + err),
      complete: () => console.log('A complete')
    }
    const observerB = {
      next: val => console.log('B next:' + val),
      error: err => console.log('B error:' + err),
      complete: () => console.log('B complete')
    }
    subject.subscribe(observerA);
    subject.next(1);
    subject.next(2);
    subject.subscribe(observerB)
    subject.next(3);
    //A next:1
    //A next:2
    //B next:2
    //A next:3
    //B next:3
    
    ReplaySubject

    ReplaySubject可以指定推送最近的多少个值给新的Observer,而BehaviorSubject只会推最近的一个值

    const subject = new ReplaySubject(2)
    const observerA = {
      next: val => console.log('A next:' + val),
      error: err => console.log('A error:' + err),
      complete: () => console.log('A complete')
    }
    const observerB = {
      next: val => console.log('B next:' + val),
      error: err => console.log('B error:' + err),
      complete: () => console.log('B complete')
    }
    subject.subscribe(observerA);
    subject.next(1);
    subject.next(2);
    subject.next(3);
    subject.subscribe(observerB)
    subject.next(4);
    subject.next(5);
    // 1 2 3 2 3 4 4 5 5
    

    还可以指定第二个参数,设置缓存的有效期

    缓存2个值,并且只在3秒内有效,超过3秒新的Observer将不会订阅任何值

    const subject = new ReplaySubject(2, 3000)
    subject.subscribe({
      next: (v) => console.log(`observerA:${v}`)
    })
    range(3).subscribe(val => subject.next(val))
    setTimeout(() => {
      subject.subscribe({
        next: (v) => console.log(`ovserverB:${v}`)
      })
    }, 2000)//这里的时间不能超过3000ms
    //observerA:0
    //observerA:1
    //observerA:2
    //observerA:1
    //observerA:2
    
    AsyncSubject

    只在Subject结束时,推送最后一个值。所以,在没有推送complete时,不会有任何响应

    const subject = new AsyncSubject()
    
    subject.subscribe({
      next: (v) => console.log(`observerA:${v}`)
    })
    subject.next(1);
    subject.next(2);
    subject.subscribe({
      next: (v) => console.log(`observerB:${v}`)
    })
    subject.next(3);
    subject.complete();
    //observerA:3
    //observerB:3
    

    相关文章

      网友评论

          本文标题:11RxJS subject和及其3个子类

          本文链接:https://www.haomeiwen.com/subject/omflaktx.html