美文网首页
RxJS 学习系列 14. Subject 基本概念

RxJS 学习系列 14. Subject 基本概念

作者: 飞凡的陀螺 | 来源:发表于2019-05-11 16:34 被阅读0次

    终于进到了 RxJS 的第二个重点 Subject,不知道读者们有没有发现? 我们在这篇文章之前的范例,每个 observable 都只订阅了一次,而实际上 observable 是可以多次订阅的

     const source = rxjs.interval(1000).pipe(take(3));
    
      // observer 其实就是实现了next方法,error 方法和 complete 方法的对象
      const observerA = {
        next: value => {
          console.log(`A:` + value )
        },
        error: error => {
          console.log('Error:', error);
        },
        complete: () => {
          console.log('complete');
        }
      };
    
      const observerB = {
        next: value => {
          console.log(`B:` + value)
        },
        error: error => {
          console.log('Error:', error);
        },
        complete: () => {
          console.log('complete');
        }
      };
    
    source.subscribe(observerA);
    source.subscribe(observerB);
    
    // "A: 0"
    // "B: 0"
    // "A: 1"
    // "B: 1"
    // "A: 2"
    // "A complete!"
    // "B: 2"
    // "B complete!"
    

    上面这段代码,分别用 observerA 跟 observerB 订阅了 source,从 log 可以看出来 observerA 跟 observerB 都各自收到了元素,但请记得这两个 observer 其实是分开执行的也就是说他们是完全独立的,我们把 observerB 延迟订阅来证明看看。

    const source = rxjs.interval(1000).pipe(take(3));
    
    const observerA = {
        next: value => console.log('A next: ' + value),
        error: error => console.log('A error: ' + error),
        complete: () => console.log('A complete!')
    }
    
    const observerB = {
        next: value => console.log('B next: ' + value),
        error: error => console.log('B error: ' + error),
        complete: () => console.log('B complete!')
    }
    
    source.subscribe(observerA);
    setTimeout(() => {
        source.subscribe(observerB);
    }, 1000);
    
    // "A next: 0"
    // "A next: 1"
    // "B next: 0"
    // "A next: 2"
    // "A complete!"
    // "B next: 1"
    // "B next: 2"
    // "B complete!"
    

    这里我们延迟一秒再用 observerB 订阅,可以从 log 中看出 1 秒后 observerA 已经打印到了 1,这时 observerB 开始打印却是从 0 开始,而不是接著 observerA 的进度,代表这两次的订阅是完全分开来执行的,或者说是每次的订阅都建立了一个新的执行。
    这样的行为在大部分的情景下使用,但有些情况下我们会希望第二次订阅 source 不会从头开始接收元素,而是从第一次订阅到当前处理的元素开始发送,我们把这种处理方式称为组播(multicast),那我们要如何做到组播呢?

    手动实现 subject

    或许已经有读者想到解法了,其实我们可以建立一个中间人来订阅 source 再由中间人转送数据出去,就可以达到我们想要的效果

    const source = rxjs.interval(1000).pipe(take(3));
    
    const observerA = {
        next: value => console.log('A next: ' + value),
        error: error => console.log('A error: ' + error),
        complete: () => console.log('A complete!')
    }
    
    const observerB = {
        next: value => console.log('B next: ' + value),
        error: error => console.log('B error: ' + error),
        complete: () => console.log('B complete!')
    }
    
    const subject = {
        observers: [],
        addObserver: function(observer) {
            this.observers.push(observer)
        },
        next: function(value) {
            this.observers.forEach(o => o.next(value))    
        },
        error: function(error){
            this.observers.forEach(o => o.error(error))
        },
        complete: function() {
            this.observers.forEach(o => o.complete())
        }
    }
    
    subject.addObserver(observerA)
    
    source.subscribe(subject);
    
    setTimeout(() => {
        subject.addObserver(observerB);
    }, 1000);
    
    // "A next: 0"
    // "A next: 1"
    // "B next: 1"
    // "A next: 2"
    // "B next: 2"
    // "A complete!"
    // "B complete!"
    

    从上面的代码可以看到,我们先建立了一个对象叫 subject,这个对象具备 observer 所有的方法(next, error, complete),并且还有一个 addObserver 方法,是把 observer 加到内部的清单中,每当有值送出就会遍历清单中的所有 observer 并把值再次送出,这样一来不管多久之后加进来的 observer,都会是从当前处理到的元素接续往下走,就像范例中所示,我们用 subject 订阅 source 并把 observerA 加到 subject 中,一秒后再把 observerB 加到 subject,这时就可以看到 observerB 是直接收 1 开始,这就是组播或多播(multicast)的行为。
    让我们把 subject 的 addObserver 改名成 subscribe。

    const subject = {
        observers: [],
        subscribe: function(observer) {
            this.observers.push(observer)
        },
        next: function(value) {
            this.observers.forEach(o => o.next(value))    
        },
        error: function(error){
            this.observers.forEach(o => o.error(error))
        },
        complete: function() {
            this.observers.forEach(o => o.complete())
        }
    }
    

    虽然上面是我们自己手写的 subject,但运行方式跟 RxJS 的 Subject 实例是几乎一样的,我们把前面的代码改成 RxJS 提供的 Subject 试试

      const source = rxjs.interval(1000).pipe(take(3));
      const subject = new rxjs.Subject();
    
      const observerA = {
        next: value => {
          console.log(`A:` + value )
        },
        error: error => {
          console.log('Error:', error);
        },
        complete: () => {
          console.log('complete');
        }
      };
    
      const observerB = {
        next: value => {
          console.log(`B:` + value)
        },
        error: error => {
          console.log('Error:', error);
        },
        complete: () => {
          console.log('complete');
        }
      };
    
      // 不会执行,相当于注册 observer
      subject.subscribe(observerA);
      subject.subscribe(observerB);
    
      source.subscribe(subject);
    

    大家会发现使用方式跟前面是相同的,建立一个 subject 先拿去订阅 observable(source),再把我们真正的 observer 加到 subject 中,这样一来就能完成订阅,而每个加到 subject 中的 observer 都能整组的接收到相同的元素。

    什么是 Subject?

    首先 Subject 可以拿去订阅 Observable(source) 代表他是一个 Observer,同时 Subject 又可以被 Observer(observerA, observerB) 订阅,代表他是一个 Observable。

    总结成两句话

    • Subject 同时是 Observable 又是 Observer
    • Subject 会对内部的 observers 清单列表进行组播(multicast)

    参考:
    https://blog.jerry-hong.com/series/rxjs/thirty-days-RxJS-22/

    相关文章

      网友评论

          本文标题:RxJS 学习系列 14. Subject 基本概念

          本文链接:https://www.haomeiwen.com/subject/qemtkqtx.html