rxjs

作者: pixels | 来源:发表于2017-09-05 18:19 被阅读10次

    rxjs使用观察者模式、迭代器模式以及函数式编程实现一种理想的、管理序列事件的方式
    rxjs的基础概念

    1. Observable:是一个包含多个值的集合,这些值都是懒推送(lazy push)进集合中的
    2. Observer
    3. Subscription
    4. Operators
    5. Subject
    6. Schedulers

    pull push system

    先搞清楚两个角色:

    1. 生产者(producer): 数据产生的地方
    2. 消费者(consumer): 数据使用的地方

    pull系统:consumer决定什么时候接受producer生产的数据,比如函数,就是一个pull system,它只生产数据,并不知道什么时候这些数据会被使用。
    push系统:producer决定什么时候把生产的数据传递给消费者,如promise,promise决定什么时候把生产的值“push”给callback函数

    Observable

    Observable的核心概念:

    1. creating Observables:可以通过Rx.Observable.create创建,或者通过所谓的创建操作如:of、from、interval等创建
    2. Subscribing Observables:Observables的注册就像调用一个函数,这个函数提供一个回调函数,数据最终会在这个回调函数中使用
    3. Executing the Observable:Observable.create(function subscribe(observer) {...})中的一段代码,在Excution中,如果error或者complete执行了,那么后续的observer就不会执行
    4. Disposing Observables:处理Observables。Executing Observables可能是循环的,需要一个unsubscribe()去终止这个无限循环

    概念很不好理解,下面是一个js写的简易版Observale,仅帮助理解,注释表明了1,2,3的含义

    
    var observerOrigin = function(nextSelf) {
      this.nextSelf = nextSelf ? nextSelf : null
    }
    observerOrigin.closed = false
    observerOrigin.prototype.next = function(val) {
      if (observerOrigin.closed) {
        return
      }
      this.nextSelf ? this.nextSelf.call(this, val) : console.log(val)
    }
    observerOrigin.prototype.error = function(error) {
      if (observerOrigin.closed) {
        return
      }
      observerOrigin.closed = true
      console.error(error)
    }
    observerOrigin.prototype.complete = function(val) {
      if (observerOrigin.closed) {
        return
      }
      observerOrigin.closed = true
      console.log('complete')
    }
    
    var observable = function(subscribeFun) {
      this.subscribeFun = subscribeFun
    }
    observable.prototype.subscribe = function(observer) {
      let observerInner = new observerOrigin(observer) // a observer
      this.subscribeFun.call(this, observerInner)
      return this
    }
    /* subscribeFunEx: Executing Observables
       Executing Observables执行的过程中,如果执行了observer的error或者complete,后续的其他操作就不会执行
    */
    var subscribeFunEx = function(observer) {
      observer.next(1)
      observer.next(2)
      // observer.error('throw a error')
      // observer.complete()
      observer.next(3)
    }
    // Creating Observables
    var observableEx = new observable(subscribeFunEx)
    observableEx
      .subscribe()   // Subscribing to Observables
      .subscribe((val) => console.log('next: ', val))   // Subscribing to Observables
    
    

    对于第四点,官方示例如下

    function subscribe(observer) {
      var intervalID = setInterval(() => {
        observer.next('hi');
      }, 1000);
    
      return function unsubscribe() {
        clearInterval(intervalID);
      };
    }
    
    var unsubscribe = subscribe({next: (x) => console.log(x)});
    
    // Later:
    unsubscribe(); // dispose the resources
    

    Observer

    Observer是一个对象,这个对象有三个回调函数(next,error,complete),任何一个回调函数都可能调用

    Subscription

    var subscription = observable.subscribe(x => console.log(x)),subscription有一个unsubscribe()方法释放所有的资源并且取消Observable的执行,也可以通过add()将多个subscription放在一起(个人感觉类似数组的unshift),这个时候调用一个subscription的unsubscribe()方法可能会将多个Subscriptionunsubscribe()

    var observable1 = Rx.Observable.interval(400);
    var observable2 = Rx.Observable.interval(300);
    
    var subscription = observable1.subscribe(x => console.log('first: ' + x));
    var childSubscription = observable2.subscribe(x => console.log('second: ' + x));
    
    subscription.add(childSubscription);
    
    setTimeout(() => {
      // Unsubscribes BOTH subscription and childSubscription
      subscription.unsubscribe();
    }, 1000);
    

    Subscription还有一个remove(otherSubscription)方法

    Subject

    一个Subject就是一个Observable,和Observable的区别是,Subject可以多播多个observers,它就是一个注册器,订阅者将自己想要订阅的事件注册到注册中心。Subject的subscribe并不会立即执行传递过来的值,它只是将订阅的事件放到一个observers的list中,类似addListener的作用
    一个Subject也是一个Observer,通过next(value)可以将值多播至注册在Subject中的订阅事件

    var subject = new Rx.Subject();  // 一个Observables
    
    subject.subscribe({  // subscribe类似于别的语言中的addListener
      next: (v) => console.log('observerA: ' + v)  // 订阅的事件
    });
    subject.subscribe({
      next: (v) => console.log('observerB: ' + v) // 订阅的事件
    });
    
    subject.next(1); // 将value值1多播至上面的订阅事件中
    subject.next(2);// 将value值2多播至上面的订阅事件中
    

    打印结果:

    observerA: 1
    observerB: 1
    observerA: 2
    observerB: 2
    

    subject也是一个observer,所以也可以observable.subscribe(subject)

    Muticasted Observables

    multicast返回的的Observable的subscribe方法和Subject的subscribe方法作用相同(即类似其他语言的addListener),connect方法调用的是observable的subscribe

    var source = Rx.Observable.from([1, 2, 3]);
    var subject = new Rx.Subject();
    var multicasted = source.multicast(subject);
    
    // These are, under the hood, `subject.subscribe({...})`:
    multicasted.subscribe({
      next: (v) => console.log('observerA: ' + v)
    });
    multicasted.subscribe({
      next: (v) => console.log('observerB: ' + v)
    });
    
    // This is, under the hood, `source.subscribe(subject)`:
    multicasted.connect();
    

    BehaviorSubject

    BehaviorSubject表示“随着时间变化的值”,例如人的生日是不变的时间流,使用Subject,那么人的年龄就是随着时间变化的事件流,用BehaviorSubject表示

    var subject = new Rx.BehaviorSubject(0); // 0 is the initial value
    
    subject.subscribe({
      next: (v) => console.log('observerA: ' + v)
    });
    
    subject.next(1);
    subject.next(2);
    
    subject.subscribe({
      next: (v) => console.log('observerB: ' + v)
    });
    
    subject.next(3);
    

    输出

    observerA: 0
    observerA: 1
    observerA: 2
    observerB: 2
    observerA: 3
    observerB: 3
    

    如果将上述的BehaviorSubject换成Subject,输出将变为

    observerA: 1
    observerA: 2
    observerA: 3
    observerB: 3
    

    ReplaySubject

    ReplaySubject记录多个来自Observable excution的值,并将它们分配给新的subscribes

    var subject = new Rx.ReplaySubject(3); // buffer 3 values for new subscribers
    
    subject.subscribe({
      next: (v) => console.log('observerA: ' + v)
    });
    
    subject.next(1);
    subject.next(2);
    subject.next(3);
    subject.next(4);
    
    subject.subscribe({
      next: (v) => console.log('observerB: ' + v)
    });
    
    subject.next(5);
    

    输出:

    observerA: 1
    observerA: 2
    observerA: 3
    observerA: 4
    observerB: 2
    observerB: 3
    observerB: 4
    observerA: 5
    observerB: 5
    

    设置一个windowTime来决定,分配多少个并且最近windowTime时间内来自Observable excution的值

    // Rx.ReplaySubject(10, 1000),observerB记录1开始至结束的值
    // Rx.ReplaySubject(10, 500),observerB记录3开始至结束(500ms~1000ms之前的buffer)的值
    // Rx.ReplaySubject(3, 1000),observerB记录3开始至结束(只subscribe3个buffer)的值
    var subject = new Rx.ReplaySubject(10, 1000 /* windowTime */);
    
    subject.subscribe({
      next: (v) => console.log('observerA: ' + v)
    });
    
    var i = 1;
    setInterval(() => subject.next(i++), 200);
    
    setTimeout(() => {
      subject.subscribe({
        next: (v) => console.log('observerB: ' + v)
      });
    }, 1000);
    
    setTimeout(() => {
      subject.unsubscribe();
    }, 5000);
    

    AsyncSubject

    AsyncSubject中,只有最后一个值会传递给observers

    var subject = new Rx.AsyncSubject();
    
    subject.subscribe({
      next: (v) => console.log('observerA: ' + v)
    });
    
    subject.next(1);
    subject.next(2);
    subject.next(3);
    subject.next(4);
    
    subject.subscribe({
      next: (v) => console.log('observerB: ' + v)
    });
    
    subject.next(5);
    subject.complete();
    

    输出

    observerA: 5
    observerB: 5
    

    Operator

    Operator是函数,这个函数会根据原来的Observable创建一个新的Observable,并且不会改变原来的Observable。.map(...), .filter(...), .merge(...)都是Operator

    function multiplyByTen(input) {
      var output = Rx.Observable.create(function subscribe(observer) {
        input.subscribe({
          next: (v) => observer.next(10 * v),
          error: (err) => observer.error(err),
          complete: () => observer.complete()
        });
      });
      return output;
    }
    
    var input = Rx.Observable.from([1, 2, 3, 4]);
    var output = multiplyByTen(input);
    output.subscribe(x => console.log(x));
    

    输出

    10
    20
    30
    40
    

    转化成js(简易版,很多漏洞,望指教)

    
    var observerOrigin = function(nextSelf) {
      this.nextSelf = nextSelf ? nextSelf : null
    }
    observerOrigin.closed = false
    observerOrigin.prototype.next = function(val) {
      if (observerOrigin.closed) {
        return
      }
      this.nextSelf ? this.nextSelf.call(this, val) : console.log(val)
    }
    observerOrigin.prototype.error = function(error) {
      if (observerOrigin.closed) {
        return
      }
      observerOrigin.closed = true
      console.error(error)
    }
    observerOrigin.prototype.complete = function() {
      if (observerOrigin.closed) {
        return
      }
      observerOrigin.closed = true
      console.log('complete')
    }
    
    var observable = function(subscribeFun) {
      this.subscribeFun = subscribeFun
    }
    observable.prototype.subscribe = function(observer) {
      let observerInner = new observerOrigin(observer) // a observer
      this.subscribeFun.call(this, observerInner)
      return this
    }
    /* subscribeFunEx: Executing Observables
       Executing Observables执行的过程中,如果执行了observer的error或者complete,后续的其他操作就不会执行
    */
    var subscribeFunEx = function(observer) {
      observer.next(1)
      observer.next(2)
      observer.next(3)
      observer.next(4)
    }
    
    function multiplyByTen(input) {
      var output = new observable(function subscribe(observer) {
        input.subscribe(v => observer.next(10 * v))
      });
      return output;
    }
    // Creating Observables
    var input = new observable(subscribeFunEx)
    var output = multiplyByTen(input);
    output.subscribe(v => console.log(v))
    
    

    由上可以看出:output的subscribe会导致input的subscribe,这叫做“operator subscription chain”

    Instance operators versus static operators(实例运算符与静态运算符)

    在Instance operators中,this关键字是输入的Observable,通过input
    observable创建一个observable。static operators是通过Observable对象从头开始创建一个Observable

    Scheduler

    一个scheduler可以定义在什么样的执行环境中,observable会把通知传递给observer

    var observable = Rx.Observable.create(function (proxyObserver) {
      proxyObserver.next(1);
      proxyObserver.next(2);
      proxyObserver.next(3);
      proxyObserver.complete();
    })
    .observeOn(Rx.Scheduler.async);
    
    var finalObserver = {
      next: x => console.log('got value ' + x),
      error: err => console.error('something wrong occurred: ' + err),
      complete: () => console.log('done'),
    };
    
    console.log('just before subscribe');
    observable.subscribe(finalObserver);
    console.log('just after subscribe');
    

    proxyObserver是在observeOn(Rx.Scheduler.async)中创建的,scheduler在Observable.create和最终的Observer之间创建了一个proxyObserver,proxyObserver实质上通过setTimeout或者setInterval操作来实现一个延迟执行(delay)

    Scheduler种类

    Scheduler 目的
    null 消息同步的递归的传递,
    Rx.Scheduler.queue 在当前时间框架的队列中按时间表传递,用于迭代操作
    Rx.Scheduler.asap 在微型任务队列中按时间表传递,例如NodeJs的nextTick()、Web Worker的MessageChannel、setTimeout()或者其他的,用于转换成异步操作
    Rx.Scheduler.async Scheduler通过setInterval工作,用于基于事件的操作

    使用 Schedulers

    Static creation operators通常有一个Scheduler作为最后一个函数参数,如 from(array, scheduler)
    Scheduler.subscribeOn决定subscribe()在什么环境中执行
    Scheduler.observeOn决定在什么环境中传递通知
    Instance operators有一个Scheduler作为函数参数

    相关文章

      网友评论

          本文标题:rxjs

          本文链接:https://www.haomeiwen.com/subject/acqadxtx.html