美文网首页
Rx.js 入门

Rx.js 入门

作者: 风之化身呀 | 来源:发表于2019-05-26 12:57 被阅读0次

    基本概念

    1、RxJS 是基于观察者模式和迭代器模式以函数式编程思维来实现的。RxJS 中含有两个基本概念:Observable 与 Observer。
    2、Observable 作为被观察者,是一个值或事件的流集合;Observer 则作为观察者,根据 Observable 进行处理。
    3、Observable 与 Observer 之间的订阅发布关系(观察者模式) 如下:
    订阅:Observer 通过 Observable 提供的 subscribe() 方法订阅 Observable。即 observeable.subscribe(observer) 订阅 observeable
    发布:Observeable 内部通过回调 next 方法向 Observer 发布事件。即
    observer.next() 通知 Observer
    4、observer 是一个形如这样的对象,不必实现所有方法

    {
      next(x) { console.log(x); },
      error(err) { console.error(err); },
      complete() { console.log('done')}
    }
    

    1、Observable

    观察者模式 和 迭代器模式是 Observable 的基础,创建 Observable 的方式

    • 原始创建
    // 使用 Observable 创建
    $ = new Observable((observer)=>{
      // 决定何时调用 observer.next 方法
    })
    $.subscribe({
         next:()=>{},
         error:()=>{},
         complete:()=>{}
    })
    // 或者
    $.subscribe(next,err,complete)
    
    // 使用 Subject 创建,具有更多的控制权
    import { Subject } from 'rxjs';
    const subject = new Subject();
    subject.next('missed message from Subject');
    subject.subscribe(v => console.log(v));
    subject.next('hello from subject!');
    
    // 包装 ajax
    let stream = Rx.Observable.create((observer) => {
       let request = new XMLHttpRequest();
    
       request.open( ‘GET’, ‘url’ );
       request.onload =() =>{
          if(request.status === 200) {
             observer.next( request.response );
             observer.complete();
         } else {
              observer.error('error happened');
         }
       }
    
       request.onerror = () => {  
           observer.error('error happened')
       }
       request.send();
    })
    
    stream.subscribe(
       (data) => console.log( data )  
    )
    

    每次调用 subscribe 方法时,都会将其参数转换成 observer 需要的对象格式传个 Observable

    • of
    import { of } from "rxjs";
    
    const source$ = of('Semlinker', 'Lolo');
    
    source$.subscribe({
        next: function(value) {
          console.log(value);
        },
        complete: function() {
          console.log('complete!');
        },
        error: function(error) {
          console.log(error);
        }
    });
    // output 
    Semlinker
    Lolo
    complete!
    
    • from
      1、数据源为数组
    import { from } from "rxjs";
    
    const source$ = from([1, 2, 3]); // 也支持字符串,比如"Angular"
    
    source$.subscribe({
      next: function(value) {
        console.log(value);
      },
      complete: function() {
        console.log("complete!");
      },
      error: function(error) {
        console.log(error);
      }
    });
    // output
    1
    2
    3
    complete!
    

    2、数据源为 Promise 对象

    import { from } from "rxjs";
    
    const promiseSource$ = from(new Promise(resolve => resolve("Hello World!")));
    
    promiseSource$.subscribe({
      next: function(value) {
        console.log(value);
      },
      complete: function() {
        console.log("complete!");
      },
      error: function(error) {
        console.log(error);
      }
    });
    
    • fromEvent
    import { fromEvent } from "rxjs";
    import { map } from "rxjs/operators";
    
    const source$ = fromEvent(document, "click");
    const example$ = source$.pipe(map(event => `Event time: ${event.timeStamp}`));
    const subscribe = example$.subscribe(val => console.log(val));
    
    
    • timer
      timer 支持两个参数,第一个参数用于设定发送第一个值需等待的时间,第二个参数表示第一次发送后,发送其它值的间隔时间。此外,timer 也可以只传一个参数
    import { timer } from "rxjs";
    
    const source$ = timer(1000, 5000);
    const subscribe = source$.subscribe(val => console.log(val));
    // output
    0 # 1s后
    1 # 5s后
    2 # 5s后
    ...
    
    import { timer } from "rxjs";
    
    const source$ = timer(1000);
    source$.subscribe(
      val => console.log(val),
      () => console.error("error!"),
      () => console.log("complete!")
    );
    // output 
    0
    complete
    
    • interval
      interval 支持一个数值类型的参数,用于表示定时的间隔。上面代码表示每隔 1s,会输出一个递增的值,初始值从 0 开始。
    import { interval } from "rxjs";
    
    const source$ = interval(1000);
    source$.subscribe(val => console.log(val));
    // output
    0
    1
    ...
    

    2、Operators

    Operator 是一个函数,它接收一个 Observable 对象,然后返回一个新的 Observable 对象。当我们订阅新返回的 Observable 对象时,它内部会自动订阅前一个 Observable 对象。多个 Operator 函数可以用 pipe 进行连接:

    import { ajax } from 'rxjs/ajax';
    import { map, catchError } from 'rxjs/operators';
    const apiData = ajax('/api/data').pipe(
      map(res => {
        if (!res.response) {
          throw new Error('Value expected!');
        }
        return res.response;
      }),
      catchError(err => of([]))
    );
     
    apiData.subscribe({
      next(x) { console.log('data: ', x); },
      error(err) { console.log('errors already caught... will not run'); }
    });
    

    2.1、常见操作符

    • 创建类操作符
      from,fromEvent,fromPromise,of
      of 与 from 在传普通值时的区别
    Observable.of([1, 2, 3]).subscribe(x => console.log(x));  // x=[1,2,3]
    Observable.from([1, 2, 3]).subscribe(x => console.log(x)); // x=1,x=2,x=3
    
    • 组合类操作符 concat,merge
      merge 将多个 observables 转换成单个 observable
      concat 按照顺序,前一个 observable 完成了再订阅下一个 observable 并发出值
    • 过滤类操作符 debounceTime, distinctUntilChanged, filter, take, takeUntil
      debounceTime 舍弃掉在两次输出之间小于指定时间的发出值
    const example = fromEvent(input, 'keyup').pipe(map(i => i.currentTarget.value));
    
    // 在两次键盘敲击之间等待0.5秒方才发出当前值,
    // 并丢弃这0.5秒内的所有其他值
    const debouncedInput = example.pipe(debounceTime(500));
    

    distinctUntilChanged 只有当当前值与之前最后一个值不同时才将其发出

    const myArrayWithDuplicatesInARow = from([1, 1, 2, 2, 3, 1, 2, 3]);
    
    const distinctSub = myArrayWithDuplicatesInARow
      .pipe(distinctUntilChanged())
      // 输出: 1,2,3,1,2,3
      .subscribe(val => console.log('DISTINCT SUB:', val));
    

    filter 过滤掉不符合的

    // 发出 (1,2,3,4,5)
    const source = from([1, 2, 3, 4, 5]);
    // 过滤掉奇数
    const example = source.pipe(filter(num => num % 2 === 0));
    

    take 在完成前发出前N个值

    // 发出 1,2,3,4,5
    const source = of(1, 2, 3, 4, 5);
    // 取第一个发出的值然后完成
    const example = source.pipe(take(1));
    

    takeUntil 发出值,直到提供的 observable 发出值,它便完成

    // 每1秒发出值
    const source = interval(1000);
    // 5秒后发出值
    const timer$ = timer(5000);
    // 当5秒后 timer 发出值时, source 则完成
    const example = source.pipe(takeUntil(timer$));
    // 输出: 0,1,2,3
    const subscribe = example.subscribe(val => console.log(val));
    
    • 转换类操作符 map, mergeMap, switchMap,scan
      map 对源 observable 的每个值应用投射函数
    // 发出 (1,2,3,4,5)
    const source = from([1, 2, 3, 4, 5]);
    // 每个数字加10
    const example = source.pipe(map(val => val + 10));
    // 输出: 11,12,13,14,15
    const subscribe = example.subscribe(val => console.log(val));
    

    mergeMap和 flatMap 功能一致,映射成 observable 并发出值
    switchMap 映射成 observable,完成前一个内部 observable,发出值
    scan 随着时间的推移进行归并

    import { of } from 'rxjs';
    import { scan } from 'rxjs/operators';
    
    const source = of(1, 2, 3);
    // 基础的 scan 示例,从0开始,随着时间的推移计算总数
    const example = source.pipe(scan((acc, curr) => acc + curr, 0));
    // 输出累加值
    // 输出: 1,3,6
    const subscribe = example.subscribe(val => console.log(val));
    
    • 工具类操作符
      tap 用于打日志
    const source = of(1, 2, 3, 4, 5);
    // 使用 tap 透明地打印 source 中的值
    const example = source.pipe(
      tap(val => console.log(`BEFORE MAP: ${val}`)),
      map(val => val + 10),
      tap(val => console.log(`AFTER MAP: ${val}`))
    );
    

    2.2、业务中常用的操作符

    • mergeMap
      用于处理有依赖的串行请求
    this.http.get(this.apiUrl)
          .subscribe(users => {
            let username = users[6].username;
            this.http.get(`${this.apiUrl}?username=${username}`)
              .subscribe(
              user => {
                this.username = username;
                this.user = user;
              });
          });
    // 改写为
    this.http.get(this.apiUrl)
          .pipe(mergeMap(users => {
            let username = users[6].username;
            return this.http.get(`${this.apiUrl}?username=${username}`)
          })).subscribe(user => {
            this.user = user
          });
    
    • forkJoin
      该操作符与 Promise.all() 实现的功能类似,处理并行请求
    let post1$ = this.http.get(`${this.apiUrl}/1`);
        let post2$ = this.http.get(`${this.apiUrl}/2`);
    
        forkJoin([post1$, post2$])
          .subscribe(results => {
            this.post1 = results[0];
            this.post2 = results[1];
          });
    

    对比 Promise

    Promise 的缺点:
    1、不能生产多值,只能消费一次
    2、不能重试

    3、Subscription

    Subscription是一个代表可以终止资源的对象,表示一个Observable的执行过程。Subscription有一个重要的方法:unsubscribe。这个方法不需要传入参数,调用后便会终止相应的资源。

    var observable1 = Rx.Observable.interval(400);
    var observable2 = Rx.Observable.interval(300);
    
    var subscription = observable1.subscribe(x => console.log('first: ' + x));
    var childSubscription = observable2.subscribe(x => console.log('second: ' + x));
    
    subscription.add(childSubscription);
    
    setTimeout(() => {
      // 终止所有嵌套的Subscription
      subscription.unsubscribe();
    }, 1000);
    // 输出
    second: 0
    first: 0
    second: 1
    first: 1
    second: 2
    

    如果不用 subscription.unsubscribe,则会一直产生数据,容易造成内存泄露

    常用操作符

    • timer
    import { timer } from 'rxjs';
    const source = timer(1000); // 1秒后发出0,然后结束,因为没有提供第二个参数
    const subscribe = source.subscribe(val => console.log(val)); // 输出: 0
    
    /*
      timer 接收第二个参数,它决定了发出序列值的频率,在本例中我们在1秒发出第一个值,
      然后每2秒发出序列值
    */
    const source = timer(1000, 2000);
    const subscribe = source.subscribe(val => console.log(val)); // 输出: 0,1,2,3,4,5......
    
    • inverval
    import { interval } from 'rxjs';
    const source = interval(1000); // 1秒后发出0,然后每1秒发出数字序列中的其它值
    const subscribe = source.subscribe(val => console.log(val)); // 数字: 0,1,2,3,4,5....
    
    • take
      取到前 n 个值便结束
    • takeLast
      取到最后 n 个值便结束
    var source = Rx.Observable.interval(1000).take(6);
    var example = source.takeLast(2);
    
    example.subscribe({
        next: (value) => { console.log(value); },
        error: (err) => { console.log('Error: ' + err); },
        complete: () => { console.log('complete'); }
    });
    
    source : ----0----1----2----3----4----5|
                    takeLast(2)
    example: ------------------------------(45)|
    

    注意:1、takeLast 必须等source完结;2、takeLast 流中的值同步发出,如上例同时发出45,而不是先4,再5。即 console.log 连续执行两次,中间不隔1s

    • takeUntil
      接受 observeable 为参数,在 observeable 发出值之前 source 可以一直取值
    import { interval, timer } from 'rxjs';
    import { takeUntil } from 'rxjs/operators';
    
    // 每1秒发出值
    const source = interval(1000);
    // 5秒后发出值
    const timer$ = timer(5000);
    // 当5秒后 timer 发出值时, source 则完成
    const example = source.pipe(takeUntil(timer$));
    // 输出: 0,1,2,3
    const subscribe = example.subscribe(val => console.log(val));
    
    • tap
    const source = of(1, 2, 3, 4, 5);
    // 使用 tap 透明地打印 source 中的值,tap 不改变流中的值
    const example = source.pipe(
      tap(val => console.log(`BEFORE MAP: ${val}`)),
      map(val => val + 10),
      tap(val => console.log(`AFTER MAP: ${val}`))
    );
    
    
    • mergeMap
      多个内部 Observable 产生的值合并推给下游
    const button = document.querySelector('button');
    
    const click$ = fromEvent(button, 'click');
    const interval$ = interval(1000);
    
    const observable$ = click$.pipe(mergeMap(event => {
      return interval$;
    }));
    
    observable$.subscribe(num => console.log(num));
    
    • switchMap
      打平高阶 Observeable。此操作符可以取消正在进行中的网络请求!
    switchMap(project: function: Observable, resultSelector: function(outerValue, innerValue, outerIndex, innerIndex): any): Observable
    
    import { timer } from 'rxjs/observable/timer';
    import { interval } from 'rxjs/observable/interval';
    import { switchMap } from 'rxjs/operators';
    // 立即发出值, 然后每5秒发出值
    const source = timer(0, 5000);
    // 当 source 发出值时切换到新的内部 observable,调用投射函数并发出值
    const example = source.pipe(
      switchMap(
        _ => interval(2000),
        (outerValue, innerValue, outerIndex, innerIndex) => ({
          outerValue,
          innerValue,
          outerIndex,
          innerIndex
        })
      )
    );
    /*
        输出:
        {outerValue: 0, innerValue: 0, outerIndex: 0, innerIndex: 0}
        {outerValue: 0, innerValue: 1, outerIndex: 0, innerIndex: 1}
        {outerValue: 1, innerValue: 0, outerIndex: 1, innerIndex: 0}
        {outerValue: 1, innerValue: 1, outerIndex: 1, innerIndex: 1}
    */
    const subscribe = example.subscribe(val => console.log(val));
    
    • withLatestFrom
      接受一个 observable 参数,必须两个observable都至少发出过一个值整体才输出值,注意输出值是数组
    // 1、参数 observable 频率比 source 高
    import { withLatestFrom, map } from 'rxjs/operators';
    import { interval } from 'rxjs/observable/interval';
    // 每5秒发出值
    const source = interval(5000);
    // 每1秒发出值
    const secondSource = interval(1000);
    const example = source.pipe(
      withLatestFrom(secondSource),
      map(([first, second]) => {
        return `First Source (5s): ${first} Second Source (1s): ${second}`;
      })
    );
    /*
      输出:
      "First Source (5s): 0 Second Source (1s): 4"
      "First Source (5s): 1 Second Source (1s): 9"
      "First Source (5s): 2 Second Source (1s): 14"
      ...
    */
    const subscribe = example.subscribe(val => console.log(val));
    
    // 2、参数 observable 频率比 source 底
    import { withLatestFrom, map } from 'rxjs/operators';
    import { interval } from 'rxjs/observable/interval';
    // 每5秒发出值
    const source = interval(5000);
    // 每1秒发出值
    const secondSource = interval(1000);
    // withLatestFrom 的 observable 比源 observable 慢
    const example = secondSource.pipe(
      // 两个 observable 在发出值前要确保至少都有1个值 (需要等待5秒)
      withLatestFrom(source),
      map(([first, second]) => {
        return `Source (1s): ${first} Latest From (5s): ${second}`;
      })
    );
    /*
      "Source (1s): 4 Latest From (5s): 0"
      "Source (1s): 5 Latest From (5s): 0"
      "Source (1s): 6 Latest From (5s): 0"
      ...
    */
    const subscribe = example.subscribe(val => console.log(val));
    
    • forkJoin
      当有一组 observables,但你只关心每个 observable 最后发出的值时,此操作符是最适合的。只有每个 observables 都完结后,整体才会发出值,值是个数组
    import { delay, catchError } from 'rxjs/operators';
    import { forkJoin } from 'rxjs/observable/forkJoin';
    import { of } from 'rxjs/observable/of';
    import { _throw } from 'rxjs/observable/throw';
    /*
      当所有 observables 完成时,将每个 observable 
      的最新值作为数组发出
    */
    const example = forkJoin(
      // 立即发出 'Hello'
      of('Hello'),
      // 1秒后发出 'World'
      of('World').pipe(delay(1000)),
      // 抛出错误
      _throw('This will error').pipe(catchError(error => of(error)))
    );
    // 输出: ["Hello", "World", "This will error"]
    const subscribe = example.subscribe(val => console.log(val));
    
    • pairwise
      将当前值和前一个值作为数组放在一起,然后将其发出
    of(1, 2, 3)
      .pipe(
        pairwise()).subscribe(v => console.log(v))
    [1,2]
    [2,3]
    

    参考

    相关文章

      网友评论

          本文标题:Rx.js 入门

          本文链接:https://www.haomeiwen.com/subject/rwfxhqtx.html