美文网首页js学习之随心所欲
RxJS 用法(操作符)总结

RxJS 用法(操作符)总结

作者: 摆渡侠 | 来源:发表于2019-10-23 14:41 被阅读0次

    RxJS 用法总结

    本博客只总结了常用的部分RxJS方法, RxJS可能会让你对响应式编程产生新的理解。RxJS 囊括了很多数据流处理方法,比如:scan、concat、map、debounceTime、filter等等,下面就着重介绍作者认为常用的一些操作符,以及demo:

    组合系列

    1. concat

    注意:第一个observable订阅完毕后,才会订阅第二个,如果第一个完不成,或者延时完成第二个也要一直等下去。

    import { concat } from 'RxJS/operators';
    import { of } from 'RxJS';
        
    // 发出 1,2,3
    const sourceOne = of(1, 2, 3);
    // 发出 4,5,6
    const sourceTwo = of(4, 5, 6);
    // 先发出 sourceOne 的值,当完成时订阅 sourceTwo
    const example = sourceOne.pipe(concat(sourceTwo));
    const subscribe = example.subscribe(val =>
      console.log(val)
    );
    // 输出: 1,2,3,4,5,6
    

    2. merge

    注意:此方法为合并多个observable为一个,然后订阅

    import { mapTo } from 'RxJS/operators';
    import { interval, merge } from 'RxJS';
    
    // 每2.5秒发出值
    const first = interval(2500);
    // 每2秒发出值
    const second = interval(2000);
    // 每1.5秒发出值
    const third = interval(1500);
    // 每1秒发出值
    const fourth = interval(1000);
    
    // 从一个 observable 中发出输出值
    const example = merge(
      first.pipe(mapTo('FIRST!')),
      second.pipe(mapTo('SECOND!')),
      third.pipe(mapTo('THIRD')),
      fourth.pipe(mapTo('FOURTH'))
    );
    
    const subscribe = example.subscribe(val => console.log(val));
    // 输出: "FOURTH", "THIRD", "SECOND!", "FOURTH", "FIRST!", "THIRD", "FOURTH"
    

    3. race

    注意:此方法为接收第一个返回的observable的值,其他的丢弃

    import { mapTo } from 'RxJS/operators';
    import { interval } from 'RxJS/observable/interval';
    import { race } from 'RxJS/observable/race';
    
    // 接收第一个发出值的 observable
    const example = race(
      // 每1.5秒发出值
      interval(1500),
      // 每1秒发出值
      interval(1000).pipe(mapTo('1s won!')),
      // 每2秒发出值
      interval(2000),
      // 每2.5秒发出值
      interval(2500)
    );
    const subscribe = example.subscribe(val => console.log(val));
    // 输出: "1s won!"
    

    4. startWith

    注意:给定的第一个值

    import { startWith } from 'RxJS/operators';
    import { of } from 'RxJS';
    
    // 发出 (1,2,3)
    const source = of(1, 2, 3);
    // 从0开始
    const example = source.pipe(startWith(0));
    const subscribe = example.subscribe(val => console.log(val));
    // 输出: 0,1,2,3
    

    5. zip

    注意: zip操作符会订阅所有内部 observables,然后等待每个发出一个值。一旦发生这种情况,将发出具有相应索引的所有值。这会持续进行,直到至少一个内部 observable 完成。

    import { delay } from 'RxJS/operators';
    import { of, zip } from 'RxJS';
    
    const sourceOne = of('Hello');
    const sourceTwo = of('World!');
    const sourceThree = of('Goodbye');
    const sourceFour = of('World!');
    // 一直等到所有 observables 都发出一个值,才将所有值作为数组发出
    const example = zip(
      sourceOne,
      sourceTwo.pipe(delay(1000)),
      sourceThree.pipe(delay(2000)),
      sourceFour.pipe(delay(3000))
    );
    const subscribe = example.subscribe(val => console.log(val));
    // 输出: ["Hello", "World!", "Goodbye", "World!"]
    

    条件系列

    1. defaultIfEmpty

    注意:如果在完成时没有发出任何通知,那么发出给定的值

    import { defaultIfEmpty } from 'RxJS/operators';
    import { of } from 'RxJS';
    
    // 当源 observable 为空时,发出 'Observable.of() Empty!',否则发出源的任意值
    const exampleOne = of().pipe(defaultIfEmpty('Observable.of() Empty!'));
    
    const subscribe = exampleOne.subscribe(val => console.log(val));
    // 输出: 'Observable.of() Empty!'
    

    2. every

    注意:如果完成时所有的值都能通过断言,那么发出 true,否则发出 false 。

    import { every } from 'RxJS/operators';
    import { of } from 'RxJS';
    
    // 发出5个值
    const source = of(1, 2, 3, 4, 5);
    const example = source.pipe(
      // 每个值都是偶数吗?
      every(val => val % 2 === 0)
    );
    
    const subscribe = example.subscribe(val => console.log(val));
    // 输出: false
    

    创建系列

    1. create

    注意:使用给定的订阅函数来创建 observable 。

    import { Observable } from 'RxJS';
    /*
      创建在订阅函数中发出 'Hello' 和 'World' 的 observable 。
    */
    const hello = Observable.create(function(observer) {
      observer.next('Hello');
      observer.next('World');
    });
    
    const subscribe = hello.subscribe(val => console.log(val));
    // 输出: 'Hello'...'World'
    

    2. empty

    注意: 立即完成的空的observable 。

    import { empty } from 'RxJS';
    
    const subscribe = empty().subscribe({
      next: () => console.log('Next'),
      complete: () => console.log('Complete!')
    });
    // 输出: 'Complete!'
    

    3. from

    注意:将数组、promise 或迭代器转换成 observable

    1. 数组转化
    import { from } from 'RxJS';
    
    // 将数组作为值的序列发出
    const arraySource = from([1, 2, 3, 4, 5]);
    // 输出: 1,2,3,4,5
    const subscribe = arraySource.subscribe(val => console.log(val));
    
    2. promise转化
    import { from } from 'RxJS';
    
    // 发出 promise 的结果
    const promiseSource = from(new Promise(resolve => resolve('Hello World!')));
    // 输出: 'Hello World'
    const subscribe = promiseSource.subscribe(val => console.log(val));
    

    4. interval

    注意:基于给定时间间隔发出数字序列。

    import { interval } from 'RxJS';
    
    // 每1秒发出数字序列中的值
    const source = interval(1000);
    
    const subscribe = source.subscribe(val => console.log(val));
    // 输出数字: 0,1,2,3,4,5....
    

    5. of

    注意:按顺序发出任意数量的值。

    import { of } from 'RxJS';
    // 依次发出提供的任意数量的值
    const source = of(1, 2, 3, 4, 5);
    
    const subscribe = source.subscribe(val => console.log(val));
    // 输出: 1,2,3,4,5
    

    6. throw

    注意: 在订阅上发出错误

    import { throwError } from 'RxJS';
    
    // 在订阅上使用指定值来发出错误
    const source = throwError('This is an error!');
    
    const subscribe = source.subscribe({
      next: val => console.log(val),
      complete: () => console.log('Complete!'),
      error: val => console.log(`Error: ${val}`)
    });
    // 输出: 'Error: This is an error!'
    

    7. timer

    注意:第二个参数时选传参数,传了之后,会按照第二个参数的毫秒数不停歇发送数据,如果不传,则只会发生一次

    import { timer } from 'RxJS';
    
    /*
      timer 接收第二个参数,它决定了发出序列值的频率,在本例中我们在1秒发出第一个值,
      然后每2秒发出序列值
    */
    const source = timer(1000, 2000);
    // 输出: 0,1,2,3,4,5......
    const subscribe = source.subscribe(val => console.log(val));
    

    错误处理系列

    1. catchError

    注意: 优雅地处理 observable 序列中的错误

    import { throwError, of } from 'RxJS';
    import { catchError } from 'RxJS/operators';
    // 发出错误
    const source = throwError('This is an error!');
    // 优雅地处理错误,并返回带有错误信息的 observable
    const example = source.pipe(catchError(val => of(`I caught: ${val}`)));
    
    const subscribe = example.subscribe(val => console.log(val));
    // 输出: 'I caught: This is an error'
    

    2. retry/ retryWhen

    注意:如果发生错误,以指定次数重试 observable 序列 / 当发生错误时,基于自定义的标准来重试 observable 序列。

    1. retry
    import { interval, of, throwError } from 'RxJS';
    import { mergeMap, retry } from 'RxJS/operators';
    
    // 每1秒发出值
    const source = interval(1000);
    const example = source.pipe(
      mergeMap(val => {
        // 抛出错误以进行演示
        if (val > 5) {
          return throwError('Error!');
        }
        return of(val);
      }),
      // 出错的话可以重试2次
      retry(2)
    );
    
    const subscribe = example.subscribe({
      next: val => console.log(val),
      error: val => console.log(`${val}: Retried 2 times then quit!`)
    });
    /*
      输出:
      0..1..2..3..4..5..
      0..1..2..3..4..5..   重试的第一次
      0..1..2..3..4..5..    重试的第二次
      "Error!: Retried 2 times then quit!"
    */
    
    2. retryWhen
    当发生错误时,基于自定义的标准来重试 observable 序列。
    import { timer, interval } from 'RxJS';
    import { map, tap, retryWhen, delayWhen } from 'RxJS/operators';
    
    // 每1秒发出值
    const source = interval(1000);
    const example = source.pipe(
      map(val => {
        if (val > 5) {
          // 错误将由 retryWhen 接收
          throw val;
        }
        return val;
      }),
      retryWhen(errors =>
        errors.pipe(
          // 输出错误信息
          tap(val => console.log(`Value ${val} was too high!`)),
          // 5秒后重启
          delayWhen(val => timer(val * 1000))
        )
      )
    );
    
    const subscribe = example.subscribe(val => console.log(val));
    /*
      输出:
      0
      1
      2
      3
      4
      5
      "Value 6 was too high!"
      --等待5秒后然后重复此过程
    */
    
    

    多播系列

    1. share

    注意: 在多个订阅者间共享源 observable 。

    import { timer } from 'RxJS';
    import { tap, mapTo, share } from 'RxJS/operators';
    
    // 1秒后发出值
    const source = timer(1000);
    // 输出副作用,然后发出结果
    const example = source.pipe(
      tap(() => console.log('***SIDE EFFECT***')),
      mapTo('***RESULT***')
    );
    
    const subscribe = example.subscribe(val => console.log(val));
    const subscribeTwo = example.subscribe(val => console.log(val));
    /*
      ***不共享的话,副作用会执行两次***
      输出: 
      "***SIDE EFFECT***"
      "***RESULT***"
      "***SIDE EFFECT***"
      "***RESULT***"
    */
    
    // 在多个订阅者间共享 observable
    const sharedExample = example.pipe(share());
    
    const subscribeThree = sharedExample.subscribe(val => console.log(val));
    const subscribeFour = sharedExample.subscribe(val => console.log(val));
    /*
       ***共享的话,副作用只执行一次***
      输出:
      "***SIDE EFFECT***"
      "***RESULT***"
      "***RESULT***"
    */
    

    2. shareReplay

    注意:共享源 observable 并重放指定次数的发出。
    为什么使用 shareReplay?
    通常啊,当有副作用或繁重的计算时,你不希望在多个订阅者之间重复执行时,会使用 shareReplay 。 当你知道流的后来订阅者也需要访问之前发出的值,shareReplay 在这种场景下也是有价值的。 这种在订阅过程中重放值的能力是区分 share 和 shareReplay 的关键。

    import { Subject, ReplaySubject } from 'RxJS';
    import { pluck, share, shareReplay, tap } from 'RxJS/operators';
    
    // 使用 subject 模拟 url 的变化
    const routeEnd = new Subject<{data: any, url: string}>();
    // 提取 url 并与后来订阅者共享
    const lastUrl = routeEnd.pipe(
      tap(_ => console.log('executed')),
      pluck('url'),
      // 默认为所有值,因此我们将其设置为仅保留并重播最后一个值
      shareReplay(1)
    );
    // 起始订阅者是必须的
    const initialSubscriber = lastUrl.subscribe(console.log)
    // 模拟路由变化
    // 输出: 'executed', 'my-path'
    routeEnd.next({data: {}, url: 'my-path'});
    // 输出: 'my-path'
    const lateSubscriber = lastUrl.subscribe(console.log);
    

    过滤系列

    1. debounceTime

    注意:舍弃掉在两次输出之间小于指定时间的发出值

    import { fromEvent, timer } from 'RxJS';
    import { debounceTime, map } from 'RxJS/operators';
    
    const input = document.getElementById('example');
    
    // 对于每次键盘敲击,都将映射成当前输入值
    const example = fromEvent(input, 'keyup').pipe(map(i => i.currentTarget.value));
    
    // 在两次键盘敲击之间等待0.5秒方才发出当前值,
    // 并丢弃这0.5秒内的所有其他值
    const debouncedInput = example.pipe(debounceTime(500));
    
    // 输出值
    const subscribe = debouncedInput.subscribe(val => {
      console.log(`Debounced Input: ${val}`);
    });
    

    2. distinctUntilChanged

    注意:只有当当前值与之前最后一个值不同时才将其发出。

    import { from } from 'RxJS';
    import { distinctUntilChanged } from 'RxJS/operators';
    
    // 基于最新发出的值进行比较,只输出不同的值
    const myArrayWithDuplicatesInARow = from([1, 1, 2, 2, 3, 1, 2, 3]);
    
    const distinctSub = myArrayWithDuplicatesInARow
      .pipe(distinctUntilChanged())
      .subscribe(val => console.log('DISTINCT SUB:', val));
      // 输出: 1,2,3,1,2,3
    
    const nonDistinctSub = myArrayWithDuplicatesInARow
      .subscribe(val => console.log('NON DISTINCT SUB:', val));
       // 输出 : 1,1,2,2,3,1,2,3
    

    3. filter

    注意: 发出符合给定条件的值。

    import { from } from 'RxJS';
    import { filter } from 'RxJS/operators';
    
    // 发出 (1,2,3,4,5)
    const source = from([1, 2, 3, 4, 5]);
    // 过滤掉奇数
    const example = source.pipe(filter(num => num % 2 === 0));
    // 输出: "Even number: 2", "Even number: 4"
    const subscribe = example.subscribe(val => console.log(`Even number: ${val}`));
    

    4. take

    注意:在完成前发出N个值(N由参数决定)。
    为什么使用 take?
    当只对开头的一组值感兴趣时,你想要的便是 take 操作符。也许你想看看当用户第一次进入页面时,用户首先点击的是什么,你想要订阅点击事件并只取首个值。举例来说,你想要观看赛跑,但其实你只对首先冲过终点的人感兴趣。此操作符很清晰明了,你想要取开头n个值。

    import { of } from 'RxJS';
    import { take } from 'RxJS/operators';
    
    // 发出 1,2,3,4,5
    const source = of(1, 2, 3, 4, 5);
    // 取第一个发出的值然后完成
    const example = source.pipe(take(1));
    // 输出: 1
    const subscribe = example.subscribe(val => console.log(val));
    

    5. takeUntil

    注意:发出值,直到提供的 observable 发出值,它便完成。

    import { interval, timer } from 'RxJS';
    import { takeUntil } from 'RxJS/operators';
    
    // 每1秒发出值
    const source = interval(1000);
    // 5秒后发出值
    const timer$ = timer(5000);
    // 当5秒后 timer 发出值时, source 则完成
    const example = source.pipe(takeUntil(timer$));
    // 输出: 0,1,2,3
    const subscribe = example.subscribe(val => console.log(val));
    

    转换系列

    1. bufferTime

    注意:收集发出的值,直到经过了提供的时间才将其作为数组发出。

    import { interval } from 'RxJS';
    import { bufferTime } from 'RxJS/operators';
    
    // 创建每500毫秒发出值的 observable
    const source = interval(500);
    // 2秒后,将缓冲值作为数组发出
    const example = source.pipe(bufferTime(2000));
    // 打印值到控制台
    // 输出: [0,1,2]...[3,4,5,6]
    const subscribe = example.subscribe(val =>
      console.log('Buffered with Time:', val)
    );
    

    2. concatMap

    注意:将值映射成内部 observable,并按顺序订阅和发出
    注意 concatMap 和 mergeMap 之间的区别。 因为 concatMap 之前前一个内部 observable 完成后才会订阅下一个, source 中延迟 2000ms 值会先发出。 对比的话, mergeMap 会立即订阅所有内部 observables, 延迟少的 observable (1000ms) 会先发出值,然后才是 2000ms 的 observable 。

    import { of } from 'RxJS';
    import { concatMap, delay, mergeMap } from 'RxJS/operators';
    
    // 发出延迟值
    const source = of(2000, 1000);
    // 将内部 observable 映射成 source,当前一个完成时发出结果并订阅下一个
    const example = source.pipe(
      concatMap(val => of(`Delayed by: ${val}ms`).pipe(delay(val)))
    );
    // 输出: With concatMap: Delayed by: 2000ms, With concatMap: Delayed by: 1000ms
    const subscribe = example.subscribe(val =>
      console.log(`With concatMap: ${val}`)
    );
    
    // 展示 concatMap 和 mergeMap 之间的区别
    const mergeMapExample = source
      .pipe(
        // 只是为了确保 meregeMap 的日志晚于 concatMap 示例
        delay(5000),
        mergeMap(val => of(`Delayed by: ${val}ms`).pipe(delay(val)))
      )
      .subscribe(val => console.log(`With mergeMap: ${val}`));
    

    3. map

    注意:对源 observable 的每个值应用投射函数

    import { from } from 'RxJS';
    import { map } from 'RxJS/operators';
    
    // 发出 (1,2,3,4,5)
    const source = from([1, 2, 3, 4, 5]);
    // 每个数字加10
    const example = source.pipe(map(val => val + 10));
    // 输出: 11,12,13,14,15
    const subscribe = example.subscribe(val => console.log(val));
    

    4. mergeMap / flatMap ( flatMap 是 mergeMap 的别名)

    注意:映射成 observable 并发出值。 使用场景,比如调用多个接口以后,将几个接口值进行合并等操作

    import { of } from 'RxJS';
    import { mergeMap } from 'RxJS/operators';
    
    // 发出 'Hello'
    const source = of('Hello');
    // 映射成 observable 并将其打平
    const example = source.pipe(mergeMap(val => of(`${val} World!`)));
    // 输出: 'Hello World!'
    const subscribe = example.subscribe(val => console.log(val));
    

    5. scan

    注意:随着时间的推移进行归并。用于几个接口值的累加

    import { of } from 'RxJS';
    import { scan } from 'RxJS/operators';
    
    const source = of(1, 2, 3);
    // 基础的 scan 示例,从0开始,随着时间的推移计算总数
    const example = source.pipe(scan((acc, curr) => acc + curr, 0));
    // 输出累加值
    // 输出: 1,3,6
    const subscribe = example.subscribe(val => console.log(val));
    

    6. switchMap

    注意:映射成 observable,完成前一个内部 observable,发出值。用于短时间多次重复调用一个接口,但是只读取最后一次的场景,比如实时搜索框搜索。

    import { interval, fromEvent } from 'RxJS';
    import { switchMap, mapTo } from 'RxJS/operators';
    
    // 发出每次点击
    const source = fromEvent(document, 'click');
    // 如果3秒内发生了另一次点击,则消息不会被发出
    const example = source.pipe(
      switchMap(val => interval(3000).pipe(mapTo('Hello, I made it!')))
    );
    // (点击)...3s...'Hello I made it!'...(点击)...2s(点击)...
    const subscribe = example.subscribe(val => console.log(val));
    

    工具系列

    1. do / tap

    注意:透明地执行操作或副作用,比如打印日志。

    import { of } from 'RxJS';
    import { tap, map } from 'RxJS/operators';
    
    const source = of(1, 2, 3, 4, 5);
    // 使用 tap 透明地打印 source 中的值
    const example = source.pipe(
      tap(val => console.log(`BEFORE MAP: ${val}`)),
      map(val => val + 10),
      tap(val => console.log(`AFTER MAP: ${val}`))
    );
    
    // 'tap' 并不转换值
    const subscribe = example.subscribe(val => console.log(val));
    // 输出: 11...12...13...14...15
    

    2. delay

    注意:根据给定时间延迟发出值。

    import { of, merge } from 'RxJS';
    import { mapTo, delay } from 'RxJS/operators';
    
    // 发出一项
    const example = of(null);
    // 每延迟一次输出便增加1秒延迟时间
    const message = merge(
      example.pipe(mapTo('Hello')),
      example.pipe(
        mapTo('World!'),
        delay(1000)
      ),
      example.pipe(
        mapTo('Goodbye'),
        delay(2000)
      ),
      example.pipe(
        mapTo('World!'),
        delay(3000)
      )
    );
    // 输出: 'Hello'...'World!'...'Goodbye'...'World!'
    const subscribe = message.subscribe(val => console.log(val));
    

    3. toPromise

    注意:将 observable 转换成 promise 。

    // 返回基础的 observable
    const sample = val => Rx.Observable.of(val).delay(5000);
    // 将基础的 observable 转换成 promise
    const example = sample('First Example')
      .toPromise()
      // 输出: 'First Example'
      .then(result => {
        console.log('From Promise:', result);
      });
    

    如果您想全面学习RxJS,请前往RxJS中文网

    本文作者原创,仅供学习交流使用,转载需注明出处。

    相关文章

      网友评论

        本文标题:RxJS 用法(操作符)总结

        本文链接:https://www.haomeiwen.com/subject/fokkvctx.html