美文网首页
深入浅出Rxjs笔记 四

深入浅出Rxjs笔记 四

作者: 月半女那 | 来源:发表于2019-10-09 18:28 被阅读0次

    合并数据流

    将多个数据流中的数据会和到一个数据流中,途中只展示了两个上游数据流

    需求 操作符
    将多个数据流以首位项链方式合并 concat / concatAll
    将多个数据流中的数据以先到先得方式合并 merge / mergeAll
    将多个数据流中的数据以一一对应的方式合并 zip/ zipAll
    持续合并多个数据流中最新产生的数据 combineLatest/ combineAll / withLatestFrom
    从多个数据流中选取第一个产生内容的数据流 race
    在数据流前面添加一个指定数据 startWith
    从高阶数据流中切换数据源 switch / exhaust

    1.concat:首尾相连

    因为concat开始从下一个Observable对象抽取数据只能在前一个Observable对象完结之后,所以参与到这个concat之中的Observable对象应该都是完结的,如果Observable对象永不完结,那排在后面的Observable对象永远没有上场的机会

    import { of, concat ,range } from 'rxjs';
    const data = of(1, 2, 3);
    const data2 = of(4, 5, 6);
    concat(data, data2).subscribe(
        value => console.log(value),
        err => { },
        () => console.log('...and it is done!')
    )
    
    // 1
    // 2
    // 3
    // 4
    // 5
    // 6
    // ...and it is done!
    concat(range(1,2),range(30,3)).subscribe(
        value => console.log(value),
        err => { },
        () => console.log('range...and it is done!')
    )
    // 1
    // 2
    // 30
    // 31
    // 32
    // range...and it is done!
    

    2.merge:先到先得

    merge会第一时间订阅所有的上游Observable,然后对上游的数据采取‘先到先得’的策略。任何一个Observable只要数据推下来,就会立刻转给下游Observable对象
    如果上游某个Observable对象不能完结,并不影响其他Observable对象的数据传给merge的下游,merge只有在上游数据d都完结的情况下才会完结自己产生Observable对象。
    一般用来合并异步数据流,如果是同步的效果和concat相似,

    // 异步
    import { merge, interval , of  } from 'rxjs';
    import { map } from 'rxjs/operators';
    const timer1 = interval(1000).pipe(map(x => x * 2 + 2));
    const timer2 = interval(1000).pipe(map(x => x * 3 + 3 ));
    merge(timer1, timer2).subscribe(
        value => console.log(value),
        err => { },
        () => console.log('...and it is done!')
    )
    //2
    //3
    //4
    //.....
    const timer3 = interval(500).pipe(map(x => x * 11 +'c' ));
    merge(timer1, timer2,timer3).subscribe(
        value => console.log(value),
        err => { },
        () => console.log('...and it is done!')
    )
    // 0c
    // 0a
    // 0b
    // 11c
    // 22c
    
    // 同步
    const of1 = of(1, 2, 3);
    const of2 = of(4, 5, 6);
    merge(of1, of2).subscribe(
        value => console.log(value),
        err => { },
        () => console.log('...and it is done!')
    )
    // 1
    // 2
    // 3
    // 4
    // 5
    // 6
    // ...and it is done!
    

    3.zip:拉链式组合

    一对一的合并
    zip和merge/concat不同,zip会将上游的数据转化成数组形式,在zip执行时,他会把上游的数据转化为数组形式,如果对应的一个Observable么有吐出数据,则会一直等待,直到吐出数据,然后将这两个数据合并,传递给下游,
    由于a3是一个定时器,所以a1,a2会一直等待,知道a3吐出数据,会造成数据挤压的问题,对于数据量比较小的Observable对象,还是可以忍受的,但是一旦数据量变大,就需要考虑潜在的内存压力问题

    import { zip, of ,interval} from 'rxjs';
    import {map} from 'rxjs/operators'
    const a1 = of(1,2,3);
    const a2 = of('a','b','c');
    const a3 = interval(1000).pipe(map(x => x +'true'))
    zip(a1,a2,a3).subscribe(x => console.log(x))
    // [ 1, 'a', '0true' ]
    // [ 2, 'b', '1true' ]
    // [ 3, 'c', '2true' ]
    

    4. combineLatest:合并最后一个数据

    • combineLatest会反复使用上游产生的最新数据,只要上游不产生新数据,就会被反复使用这个上游最后一次产生的数据
    • combineLatest会顺序订阅所有上游的Observable对象,只有所有上游Observable对象都已经吐出数据了,才会给下游传递所有上游最新数据组合的数据。
    • 多重依赖问题
      combineLatest会有这样的缺陷,是由于多个上游Observable同时吐出一个数据,但是输出的结果和预想的不一样,这是因为他们是同一个数据源引发的,
    import { combineLatest, timer, of } from 'rxjs';
    import { map } from 'rxjs/operators';
    const firsttimer = timer(0, 100);
    const secondtimer = timer(300, 500);
    const combine = combineLatest(firsttimer, secondtimer);
    combine.subscribe(value => console.log(value))
    // [ 2, 0 ]
    // [ 3, 0 ]
    // [ 4, 0 ]
    // [ 5, 0 ]
    // [ 6, 0 ]
    // [ 7, 0 ]
    // [ 7, 1 ]
    // [ 8, 1 ]
    // [ 9, 1 ]
    
    const a = of(1,2,3);
    const b = of(4,5,6);
    const c = combineLatest(a,b);
    c.subscribe(value => console.log(value))
    // [ 3, 4 ]
    // [ 3, 5 ]
    // [ 3, 6 ]
    
    // 多重依赖问题
    const source1 = timer(0,100);
    const d = source1.pipe(map( x => x * 10))
    const e = source1.pipe(map( x => x * 20))
    const f = combineLatest(d ,e);
    f.subscribe(value => console.log(value))
    // [ 0, 0 ]
    // [ 10, 0 ]
    // [ 10, 20 ]
    // [ 20, 20 ]
    

    5. withLatestFrom

    withLatestFrom给下游推送数据只能有一个上游Observable对象驱动。由于concat,merge,zip和combineLatest作为输入的Observable对象地位都是对等的,但是withLastetFrom却不是这样的,调用withlatestForm的Observable对象有主导数据产生节奏的作用,而作为参数的Observable对象只能贡献数据,不能控制产生数据的时机.

    import { withLatestFrom, map } from 'rxjs/operators';
    import { timer, of } from 'rxjs';
    const a = timer(0, 100);
    const c = a.pipe(map(x => x ));
    const d = a.pipe(withLatestFrom(c));
    d.subscribe(value => console.log(value))
    // [ 0, 0 ]
    // [ 1, 1 ]
    // [ 2, 2 ]
    // [ 3, 3 ]
    
    const e = of(1, 2, 3);
    const f = of(4, 5, 6);
    e.pipe(withLatestFrom(f)).subscribe(value => console.log(value))
    // [ 1, 6 ]
    // [ 2, 6 ]
    // [ 3, 6 ]
    

    如果要合并完全独立的Observable对象,使用combineLatest,当要求吧一个Observable对象映射成新的数据流,并且从其他Observable对象获取最新数据就使用withLatestFrom

    <!DOCTYPE html>
    <html lang="en">
    
    <head>
        <meta charset="UTF-8">
        <meta name="viewport" content="width=device-width, initial-scale=1.0">
        <meta http-equiv="X-UA-Compatible" content="ie=edge">
        <title>Document</title>
        <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.0-beta.12/Rx.min.js"></script>
    </head>
    
    <body>
        <div id='test'></div>
    </body>
    <script>
        // import { fromEvent } from 'rxjs';
        // import { combineLatest, timer, of } from 'rxjs';
        const clicks = Rx.Observable.fromEvent(document, 'click');
        const x = clicks.map(e => e.x);
        const y = clicks.map(e => e.y);
        const res = x.combineLatest(y, (x,y) => `combineLatest--x:${x}--y:${y}`);
        res.subscribe(value => {
            console.log(value);
            document.getElementById('test').innerText += value;
        })
    
        x.withLatestFrom(y, (x,y) => `withLatestFrom--x:${x}--y:${y}`).subscribe(value => {
            console.log(value);
            document.getElementById('test').innerText += value;
        });
    <!-- combineLatest--x:782--y:383
    index.html:28 withLatestFrom--x:782--y:383
    index.html:23 combineLatest--x:646--y:383
    index.html:23 combineLatest--x:646--y:341
    index.html:28 withLatestFrom--x:646--y:341 -->
    </script>
    
    </html>
    
    

    相关文章

      网友评论

          本文标题:深入浅出Rxjs笔记 四

          本文链接:https://www.haomeiwen.com/subject/bxpfpctx.html