美文网首页前端周末
Rxjs入门与初步应用

Rxjs入门与初步应用

作者: 琢磨先生lf | 来源:发表于2019-04-25 20:33 被阅读0次

    学习资料

    https://cn.rx.js.org/
    https://github.com/RxJS-CN/rxjs-articles-translation
    https://www.jianshu.com/p/eaf28d5ce6c0

    概念

    Observable 和 Observer

    Observable 可观察者对象,负责发出数据;Observer 观察者,负责接收数据
    怎么这么抽象?试着把Observable对象理解为一种数据格式,类似于数组、链表等等
    Observable实现了下⾯两种设计模式:

    • 观察者模式(Observer Pattern)
    • 迭代器模式(Iterator Pattern)

    观察者模式

    定义:观察者模式定义了对象之间的一对多依赖,这样一来,当一个对象改变状态时,它的所有依赖着都会收到通知并自动更新
    观察者模式对“治”这个问题提的解决⽅法是这样,将逻辑分为发布者(Publisher)和观察者(Observer),其中发布者只管负责产⽣事件,它会通知所有注册挂上号的观察者,⽽不关⼼这些观察者如何处理这些事件,相对的,观察者可以被注册上某个发布者,只管接收到事件之后就处理,⽽不关⼼这些数据是如何产⽣的。


    发布者和观察者的关系发布者和观察者的关系
    // Subject
    function Observable() {
      this.observers = [];
      this.data = 'data update';
      // 用于关联observer
      this.addObserve = function(observe) {
        this.observers.push(observe)
      };
      // 用于取消关联observer
      this.removeObserve = function(observe) {
        const index = this.observers.findIndex(item => item == observe);
        this.observers.splice(index,1);
      }
    
      this.notifyObservers = function () {
        for (let i=0; i<this.observers.length; i++) {
          this.observers[i].update(this.data);
        }
      }
        // 用于发送更新数据到observer
      this.updateData = function(data) {
        this.data = data;
        this.notifyObservers();
      }
    }
    
    // 所有 observer 都必须实现update方法
    function Observer() {
      this.update = function (data) {
        console.log(data);
      }
    }
    
    const observable = new Observable();
    const observer1 = new Observer();
    const observer2 = new Observer();
    // 建立关联
    observable.addObserve(observer1);
    observable.addObserve(observer2);
    observable.notifyObservers();
    observable.updateData(123);
    
    

    迭代器模式

    定义:提供一种方法顺序访问一个聚合对象中的各个元素,而又不暴露其内部的表示
    数据集合的实现⽅式很多,可以是⼀个数组,也可以是⼀个树形结构,也可以是⼀个单向链表……迭代器的作⽤就是提供⼀个通⽤的接⼜,让使⽤者完全不⽤关⼼这个数据集合的具体实现⽅式

    function Iterator(list) {
      this.list = list;
      this.porsition = 0;
      this.hasNext = function() {
        if (this.porsition > this.list.length || this.list[porsition] === null) return false;
        return true;
      }
      this.next = function() {
        const item = this.list[this.porsition];
        this.porsition++;
        return item;
      }
      this.isDone = function() {
        return this.porsition >= this.list.length;
      }
    }
    
    

    在使⽤RxJS的过程中绝对看不到类似这样的代码,实际上,你都看不到上⾯所说的三个函数,因为,上⾯所说的是“拉”式的迭代器实现,⽽RxJS实现的是“推”式的迭代器实现

    Subscription

    订阅,表示建立关联关系

    理解Observable、Observe 和 Subscribtions 之间的关系

    Observable 是信号源、生产者,Observer 观察者、消费者
    Observable 和 Observer 之间的关系:
    如果比作报社和读者,报社是 Observable,是数据源,提供报纸,读者是 Observer,负责消费(处理)数据,阅读报纸,读者向报社订阅(subscribe)报纸后,报社将读者列入他们派送名单,定期派送报纸,报纸是数据。
    可以比作前端(Observer)通过 登陆 owncloud 账号(subscribe)时时获取 UI(Observable)更新的 UI稿(数据)
    也可以是爱奇艺会员(Observer)点击播放爱奇艺视频(subscribe)观看爱奇艺网站(Observable)提供的视频(数据)
    Observable、Observe 和 Subscribtions核心就是解决分工与数据传递。

    subscribe 扩展知识

    可以通过add、remove 操作子subscription
    父subscription取消订阅,子subscription也会一起取消订阅

    var observ1 = Rx.Observable.interval(500)
    var observ2 = Rx.Observable.interval(800)
    var observ3 = Rx.Observable.interval(800)
    
    var subsc1 = observ1.subscribe(x => console.log('first: ' + x))
    var subsc2 = observ2.subscribe(x => console.log('second: ' + x))
    var subsc3 = observ3.subscribe(x => console.log('three: ' + x))
    
    subsc1.add(subsc2)
    subsc1.add(subsc3)
    
    setTimeout(() => {
      subsc1.remove(subsc2)
      subsc1.unsubscribe()
    }, 1100)
    // second: n将会一直执行下去,我们添加了 subsc1.add(subsc2) , 在1.1S后移除了它,所以在 unsubscribe() 时,我们并没有清除掉它
    
    

    unsubscribe:释放资源和取消Observable执行的功能

    Operators

    操作符,类似于管道,对数据源发出的数据进行过滤或其他处理,使数据源发出的数据更加满足Observe 的需求

    Subject

    有一些场景,需要将Cold Observable 转成 Hot Observable,在这个场景下需要⼀个“中间⼈”做串接的事情,这个中间⼈有两个职责:

    • 中间⼈要提供subscribe⽅法,让其他⼈能够订阅⾃⼰的数据源。
    • 中间⼈要能够有办法接受推送的数据,包括Cold Observable推送的数据。

    上⾯所说的第⼀个职责,相当于⼀个Observable,第⼆个⼯作,相当于⼀个Observer。在RxJS中,提供了名为Subject的类型,⼀个Subject既有Observable的接口,也具有Observer的接口,⼀个Subject就具备上述的两个职责。

    Cold Observable 和 Hot Observable的区别

    好比视频和电视频道的区别,视频没有时间限制,任何时候想看都可以看,电视有时间限制
    视频是 Cold Observable,电视是 Hot Observable
    Cold Observable:你见或者不见,我一直在,只要你愿意去取的话
    Hot Observable:过了这个村,没有这个店

    var interval$ = Rx.Observable.interval(500);
    
    // Cold Observable
    interval$.map(val=>'a'+val).subscribe(x => console.log(x));
    setTimeout(()=>{
      interval$.map(val=>'b'+val).subscribe(x => console.log(x));
    }, 2000);
    
    // Hot Observable
    const subject$ = new Rx.Subject();
    interval$.subscribe(subject$);
    subject$.map(val=>'a'+val).subscribe(x => console.log(x));
    setTimeout(() => {
      subject$.map(val=>'b'+val).subscribe(x => console.log(x));
    }, 1500);
    
    

    回过头来讲subject,subject相当于一个转换器,它将 Cold Observable 转化成 Hot Observable。这就要求subject同时是observe又是 observable。subject 既能订阅数据源,同时本身又是数据源,能发出数据。
    举个不十分恰当的例子,比如手机,它可以接收基站信号(Observe),同时也可以发出信号(Observable)

    Schedulers

    调度器
    Scheduer是⼀种数据结构,可以根据优先级或者其他某种条件来安排任务执⾏队列
    https://www.jianshu.com/p/5624c8a6bd2b

    类型 执行类型 内部调用
    queue Sync同步的方式 scheduler.schedule(task, delay) scheduler.flush()
    asap Async(异步微任务) Promise.resolve().then(() => task)
    async Async(异步宏任务) id = setInterval(task, delay) clearInterval(id)
    animationFrame Async id = requestAnimationFrame(task) cancelAnimationFrame(id)

    为什么要学

    我们学习RxJS,并不是因为RxJS是⼀项炫酷的技术,也不是因为RxJS是⼀个最新的技术,是因为RxJS的的确确能够帮助我们解决问题,⽽且这些问题长期以来⼀直在困扰我们,没有好的解决办法,这些问题包括:

    • 如何控制⼤量代码的复杂度;
    • 如何保持代码可读;
    • 如何处理异步操作。

    Rxjs 引用了两个重要的编程思想,让代码更加清爽,更加容易维护:
    函数式
    响应式

    函数式

    • 声明式

    声明式区别于命令式,命令式强调的是告诉机器怎么去做(how),一步步的告诉计算机如何完成一项工作
    声明式强调的是告诉机器你想要什么(what),不关注内部实现,声明式把通用的共性抽离出来,避免重复代码
    应用声明式的困难点:归纳和提取完备的what,是件很困难、很技术化的工作,令人望而却步声明式能够应用在特定领域如SQL中,是工具的编写者,已经归纳和提取what,替你完成了

    // 命令式
    // how:写for循环一一处理
    function double(arr) {
      const results = []
      for (let i = 0; i < arr.length; i++){
      results.push(arr[i] * 2)
      }
      return results
    }
    function addOne(arr) {
      const results = []
      for (let i = 0; i < arr.length; i++){
      results.push(arr[i] + 1)
      }
      return results
    }
    // 声明式
    // what:通过map把每一项加倍或+1,不关注内部实现
    function double(arr) {
      return arr.map(function(item) {return item * 2});
    } function addOne(arr) {
      return arr.map(function(item) {return item + 1});
    }
    
    • 纯函数:

    函数的执⾏过程完全由输⼊参数决定,不会受除参数之外的任何数据影响,只要入参不变,返回的参数也不会变
    函数不会修改任何外部状态,⽐如修改全局变量或传⼊的参数对象
    纯函数没有副作用,是稳定的,可以和其他纯函数像搭积木一样一起组合使用,获得更强的处理能力

    • 数据不可变性

    有数据,替换⽅法是通过产⽣新的数据,来实现这种"变化",也就是说,当我们需要数据状态发⽣改变时,保持原有数据不变,产⽣⼀个新的数据来体现这种变化

    JavaScript中数组的push、pop、sort函数都会改变⼀个数组的内容,由此引发的bug可不少。这些不纯的函数导致JavaScript天⽣不是⼀个纯粹意义上的函数式编程语⾔

    响应式

    EXCEL 中的公式就是典型的响应式,数据改变了公式计算结果也会跟着变
    类似于MVVM中的M->V

    体验两个小例子:

    测试鼠标按住时间

    const buttonDom = document.querySelector('#button');
    const mouseDown$ = Rx.Observable.fromEvent(buttonDom, 'mousedown');
    const mouseUp$ = Rx.Observable.fromEvent(buttonDom, 'mouseup');
    const holdTime$ = mouseUp$.timestamp().withLatestFrom(mouseDown$.timestamp(), (mouseUpEvent, mouseDownEvent)=>{
        return mouseUpEvent.timestamp - mouseDownEvent.timestamp;
    });
    holdTime$.subscribe((ms)=>{
        document.querySelector('#holdTime').innerText = ms;
    });
    
    

    takeUntil,统计5秒内用户点击数

    const click$ = Rx.Observable.fromEvent(document, 'click');
    click$.bufferWhen(()=>Rx.Observable.interval(5000)).subscribe(arr=>console.log(arr.length))
    

    弹珠图

    弹珠图可以用来表示数据流,例如:

    --a---b-c---d---X---|->
    a, b, c, d 表示发出的数据
    X 表示错误
    |表示 '结束' 信号
    ---> 是时间轴

    弹珠图在线演示:https://rxviz.com/

    操作符

    创建

    功能需求 适用的操作符
    直接操作观察者 create
    根据有限的数据产生同步数据流 of
    产生一个数值范围内的数据 range
    以循环方式产生数据 generate
    重复产生数据流中的数据 repeat 和 repeatWhen
    产生空数据流 empty
    产生直接出错的数据流 throw
    产生永远不完结的数据流 never
    间隔给定时间持续产生数据 interval 和 timer
    从数组等枚举类型数据产生数据流 from
    从Promise 对象产生数据流 fromPromise
    从外部事件对象产生数据流 fromEvent 和 fromEventPattern
    从Ajax 请求结果产生数据流 ajax
    延迟产生数据流 defer

    from 和 toArray
    from:数组转 Observable
    toArray:Observable 转数组
    fromPromise 和 toPromise
    fromPromise:promise转 Observable
    toPromise:Observable 转promise

    合并

    功能需求 使用的操作符
    把多个数据流以首位相连的方式合并 concat 和 concatAll
    把多个数据流中数据以先到先得方式合并 merge 和 mergeAll
    把多个数据流中的数据以一一对应的方式合并 zip 和 zipAll
    持续合并多个数据流中最新产生的数据 combineLatest、combineAll 和 withLatestFrom
    从多个数据流中选取第一个产生内容的数据流 race
    在数据流前面添加一个指定数据 startWith
    只获取多个数据流最后产生的那个数据 forkJoin
    从高阶数据流中切换数据源 switch 和 exhaust

    对of产生的数据进行concat和merge操作哦产生的不同结果

    例1:merge:事件的合并处理

    startWith和concat的关联关系
    zip
    拉链,一对一咬合

    QQ截图20190417140650.pngQQ截图20190417140650.png QQ图片20190417141214.pngQQ图片20190417141214.png

    例2:zip应用

    • 让of产生的数据流交叉输出
    • 实现异步队列

    forkJoin
    forkJoin就是RxJS界的Promise.all,Promise.all等待所有输⼊的Promise对象成功之后把结果合并,forkJoin等待所有输⼊的Observable对象完结之后把最后⼀个数据合并

    const testList = [
      this.httpService.post(REQUEST_URL.editCourseTestListInfo, Object.assign({testType:1},params)),
      this.httpService.post(REQUEST_URL.editCourseTestListInfo, Object.assign({testType:2},params)),
    ];
    Observable.create(observer => {
      forkJoin(testList).subscribe((data)=>{
        observer.next(data);
      });
    })subscribe((res)=>{
      if(res.some(data=>data==false)) return;
      // 1-入门测成绩, 2-出门测成绩
      this.scoreTestType = {
        1: { scoreTestDetail: res[0] },
        2: { scoreTestDetail: res[1] }
      };
    })
    

    辅助

    功能需求 使用的操作符
    统计数据流中产生的所有数据个数 count
    获得数据流中最大或最小的数据 max 和 min
    对数据流中所有数据进行规约操作 reduce
    判断是否所有数据满足某个判定条件 every
    找到第一个满足判定条件的数据 find 和 findIndex
    判断一个数据流是否不包含任何数据 isEmpty
    如果一个数据流为空就默认产生一个指定数据 defaultEmpty

    数学类操作符有四个:count、max、min、reduce
    遍历上游Observable对象中吐出的所有数据才给下游传递数据、只有在上游完结的时候,才给下游传递唯⼀数据

    过滤

    功能需求 使用的操作符
    过滤掉不满足判定条件的数据 filter
    获得满足判定条件的第一个数据 first
    获得满足判定条件的最后一个数据 last
    从数据流中选取最先出现的若干数据 take
    从数据流中选取最后出现的若干数据 takeLast
    从数据流中选取数据直到某种情况发生 takeWhile 和 takeUntil
    从数据流中忽略最先出现的若干数据 skip
    从数据流中忽略数据直到某种情况发生 skipWhile 和 skipUntil
    基于时间的数据流量筛选 throttleTime、debounceTime 和 auditTime
    基于数据内容的数据流量筛选 throttle、debounce 和 audit
    基于采样方式的数据流量筛选 sample 和 sampleTime
    删除重复的数据 distinct
    删除重复的连续数据 distinctUntil 和 distinctUntilKeyChange
    忽略数据流中的所有数据 ignoreElement
    只选取指定出现位置的数据 elementAt
    判断是否只有一个数据满足判定条件 single

    takeUntil让我们可以⽤Observable对象作为notifier来控制另⼀个Observable对象的数据产⽣,使用起来非常灵活
    有损回压控制:throttle、debounce、audit、sample、throttleTime、debounceTime、auditTime、sampleTime
    对比:https://www.jianshu.com/p/a176d28c9eb5

    例3:防抖和节流

    debounce,去抖动。策略是当事件被触发时,设定一个周期延迟执行动作,若期间又被触发,则重新设定周期,直到周期结束,执行动作。 这是debounce的基本思想,在后期又扩展了前缘debounce,即执行动作在前,然后设定周期,周期内有事件被触发,不执行动作,且周期重新设定。

    // 暴力版
    var debounce = (fn, wait) => {
        let timer, timeStamp=0;
        let context, args;
     
        let run = ()=>{
            timer= setTimeout(()=>{
                fn.apply(context,args);
            },wait);
        }
        let clean = () => {
            clearTimeout(timer);
        }
     
        return function(){
            context=this;
            args = arguments;
            let now = (new Date()).getTime();
     
            if(now - timeStamp < wait){
                console.log('reset',now);
                clean();  // clear running timer 
                run();    // reset new timer from current time
            } else{
                console.log('set',now);
                run();    // last timer alreay executed, set a new timer
            }
            timeStamp = now;
        }
    }
    
    // rxls
    let foo$ = Rx.Observable.fromEvent(document, 'click');
    foo$.debounceTime(2000).subscribe(
      console.log,
      null,
      () => console.log('complete')
    );
    
    

    throttling,节流的策略是,固定周期内,只执行一次动作,若有新事件触发,不执行。周期结束后,又有事件触发,开始新的周期。 节流策略也分前缘和延迟两种。与debounce类似,延迟是指 周期结束后执行动作,前缘是指执行动作后再开始周期。
    throttling的特点在连续高频触发事件时,动作会被定期执行,响应平滑。

    // 简单版: 定时器期间,只执行最后一次操作
    var throttling = (fn, wait) => {
        let timer;
        let context, args;
     
        let run = () => {
            timer=setTimeout(()=>{
                fn.apply(context,args);
                clearTimeout(timer);
                timer=null;
            },wait);
        }
     
        return function () {
            context=this;
            args=arguments;
            if(!timer){
                console.log("throttle, set");
                run();
            }else{
                console.log("throttle, ignore");
            }
        }
    }
    
    // rxjs
    let foo$ = Rx.Observable.fromEvent(document, 'click');
    foo$.throttleTime(2000).subscribe(
      console.log,
      null,
      () => console.log('complete')
    );
    
    

    转化

    功能需求 使用的操作符
    将每个元素用映射函数产生新的数据 map
    将数据流中每个元素映射为同一数据 mapTo
    提取数据流中每个数据的某个字段 pluck
    产生高阶 Observable 对象 windowTime、windowCount、windowToggle 和window
    产生数组构成的数据流 bufferTime、BufferCount、bufferWhen、bufferToggle 和 buffer
    映射产生高阶 Observable 对象然后合并 concatMap、mergeMap(flatMap)、switchMap、exhaustMap
    产生规约运算结果组成的数据流 scan 和 mergeScan

    scan可能是RxJS中对构建交互式应⽤程序最重要的⼀个操作符,因为它能够维持应⽤的当前状态,⼀⽅⾯可以根据数据流持续更新这些状态,另⼀⽅⾯可以持续把更新的状态传给另⼀个数据流⽤来做必要处理。
    定义:public scan(accumulator: function(acc: R, value: T, index: number): R, seed: T | R): Observable对源 Observable 使用累加器函数, 返回生成的中间值, 可选的初始值index 是赋给 acc 的初始值

    let foo$ = Rx.Observable.interval(1000);
    // acc 为上次返回值
    // cur 更新的值,此处由foo$提供
    foo$.scan((acc, cur) => {
        return cur
    }, 0).subscribe((data)=>console.log(data));
    
    

    acc 是上一个 scan 的返回值
    subscript data 显示的是当前值

    scan 和 reduce 的区别

    reduce需要数据结束才能输出结果
    scan可以输出中间状态

    无损回压控制

    数据组合成数组:bufferTime、bufferCount、bufferWhen、bufferToggle、buffer
    数据组合成Observable:windowTime、windowCount、windowToggle 和window

    bufferCount
    支持两个参数 bufferSize 和 startBufferEvery
    bufferSize 表示 缓存区长度,缓存区长度达到bufferSize的时候传新的数据给下游
    startBufferEvery 可选,表示 新的缓存区长度,即新数据个数,从上次bufferCount触发以后,上游每发出startBufferEvery个数据后向下游传出数据,数组中旧数据个数为bufferSize- startBufferEvery
    如果不填startBufferEvery,则默认值为 bufferSize,都是新数据
    如果startBufferEvery大于bufferSize,则会丢失startBufferEvery-bufferSize个数据

    例4: 判断连续输入是否正确

    召唤隐藏英雄

    const code = [
      "ArrowUp",
      "ArrowUp",
      "ArrowDown",
      "ArrowDown",
      "ArrowLeft",
      "ArrowRight",
      "ArrowLeft",
      "ArrowRight",
      "KeyB",
      "KeyA",
      "KeyB",
      "KeyA"
    ]
    
    Rx.Observable.fromEvent(document, 'keyup')
      .map(e => e.code)
      .bufferCount(12, 1)
      .subscribe(last12key => {
        if (_.isEqual(last12key, code)) {
          console.log('隐藏的彩蛋 \(^o^)/~')
        }
      });
    

    bufferToggle
    利⽤Observable来控制缓冲窗口的开和关
    有两个参数openings 和 closingSelector,openings 是一个Observable,控制每个缓冲窗口的开始时间,closingSelector是一个返回Observable的函数(这样能够灵活控制取值范围),控制每个缓冲窗口的结束时间(相对于开始时间而言)

    clipboard.pngclipboard.png
    对例4进行优化:限定3s时间内连续输入正确
     const code = [
       "ArrowUp",
       "ArrowUp",
       "ArrowDown",
       "ArrowDown",
       "ArrowLeft",
       "ArrowRight",
       "ArrowLeft",
       "ArrowRight",
       "KeyB",
       "KeyA",
       "KeyB",
       "KeyA"
     ]
    
    Rx.Observable.fromEvent(document, 'keyup')
    .map(e => e.code)
    .bufferToggle(Rx.Observable.timer(0, 3000), i=>Rx.Observable.interval(3000))
    .subscribe(last12key => {
    console.log(last12key);
    if (_.isEqual(last12key, code)) {
    console.log('隐藏的彩蛋 \(^o^)/~')
    }
    });
    

    高阶Observable

    QQ图片20190417115019.pngQQ图片20190417115019.png

    高阶Observable和一阶Observable的关系
    正如二维数组和一维数组的关系

    相关的操作符

    打平:concatAll、mergeAll、zipAll、combineAll、forkJoin、switch、exhaust
    组合:windowTime、windowCount、windowToggle 和window、groupBy分组
    (前四个是按顺序分组,最后一个打乱了顺序)
    转化:concatMap、mergeMap(flatMap)、switchMap、exhaustMap、mergeScan
    cancatMap=一对多的map+concatAll
    映射:concatMapTo、mergeMapTo、switchMapTo
    生成高阶函数

    const ho$ = Rx.Observable.interval(1000)
      .take(2)
      .concat(Rx.Observable.never())  // 添加了一个never数据流
      .map(x => Rx.Observable.interval(1500).map(y => x+':'+y).take(3));
    ho$.subscribe(
      console.log,
      null,
      () => console.log('complete')
    );
    
    // 打平
    ho$.zipAll().subscribe(
      console.log,
      null,
      () => console.log('complete')
    );
    
    

    例4:拖拽

    const box = document.querySelector('.box');
    const mousedown$ = Rx.Observable.fromEvent(box, 'mousedown');
    const mousemove$ = Rx.Observable.fromEvent(box, 'mousemove');
    const mouseup$ = Rx.Observable.fromEvent(box, 'mouseup');
    const mouseout$ = Rx.Observable.fromEvent(box, 'mouseout$');
    
    mousedown$.mergeMap((md) => {
      const stop$ = mouseup$.merge(mouseout$);
      return mousemove$.takeUntil(stop$).map((mm) =>{
        return {
          target: md.target,
          x: mm.clientX - md.offsetX,
          y: mm.clientY - md.offsetY
        }
      });
    }).subscribe((obj) => {
      console.log(obj);
      obj.target.style.top = obj.y + 'px';
      obj.target.style.left = obj.x + 'px';
    });
    

    综上:在RxJS中,创建类操作符是数据流的源头,其余所有操作符最重要的三类就是合并类、过滤类和转化类。不夸张地说,使⽤RxJS解决问题绝大部分时间就是在使⽤这三种操作符

    多播

    Observable和Observer的关系,就是前者在播放内容,后者在收听内容。播放内容的⽅式分为三种:

    • 单播(unicast):微信发给朋友,只有一个接收者
    • ⼴播(broadcast):朋友圈广告,所有人都能看得见
    • 多播(multicast):群聊天,发给一群人,只有选中的朋友才能看见
    clipboard.pngclipboard.png

    前面的例⼦⼤都是单播
    RxJS是⽀持⼀个Observable被多次subscribe的,所以,RxJS⽀持多播,但是,表⾯上看到的是多播,实质上还是单播

    const tick$ = Rx.Observable.interval(1000).take(3);
    
    tick$.subscribe(value => console.log('observer 1: ' + value));
    
    setTimeout(() => {
        tick$.subscribe(value => console.log('observer 2: ' + value));
    }, 2000);
    
    

    第⼆个Observer依然接收到了0、1、2总共三个数据。为什么会是这样的结果?因为interval这个操作符产⽣的是⼀个Cold Observable对象。
    Cold Observable,就是每次被subscribe都产⽣⼀个全新的数据序列的数据流,例如对interval产⽣的Observable对象每subscribe⼀次,都会产⽣⼀个全新的递增整数序列,从0开始产⽣Hot Observable:fromPromise、fromEvent、fromEventPattern就是异步的创建操作符真正的多播,必定是⽆论有多少Observer来subscribe,推给Observer的都是⼀样的数据源把Cold Observable变成Hot Observable,用的是Subject

    Subject

    clipboard.pngclipboard.png
    var interval$ = Rx.Observable.interval(500);
    
    const subject$ = new Rx.Subject();
        interval$.subscribe(subject$);
        subject$.map(val=>'a'+val).subscribe(x => console.log(x));
        setTimeout(() => {
          subject$.map(val=>'b'+val).subscribe(x => console.log(x));
        }, 1500);
    
    

    Subject不能重复使⽤
    Subject可以有多个上游

    例5:scan管理react状态

    class Counter extends React.Component {
      state = {count: 0}
      onIncrement() {
        this.setState({count: this.state.count + 1});
      }
      onDecrement() {
        this.setState({count: this.state.count - 1});
      }
      render() {
        return (
          <CounterView
            count={this.state.count}
            onIncrement={this.onIncrement.bind(this)}
            onDecrement={this.onDecrement.bind(this)}
          />
        );
      }
    }
    export default Counter;
    
    // subject作为桥梁进行状态维护
    class RxCounter extends React.Component {
      constructor() {
        super(...arguments);
        this.state = {count: 0};
        this.counter = new Subject();
        const observer = value => this.setState({count: value});
        this.counter.scan((result, inc) => result + inc, 0)
        .subscribe(observer);
      }
      render() {
        return <CounterView
          count={this.state.count}
          onIncrement={()=> this.counter.next(1)}
          onDecrement={()=> this.counter.next(-1)}
        />
      }
    }
    export default RxCounter;
    
    

    例6:买房放租

    const house$ = new Rx.Subject();
    const housecount$ = house$.scan((has, one) => has = has+one, 0).startWith(0);
    
    const month$ = Rx.Observable.interval(1000);
    const salary$ = month$.mapTo(1);
    const rent$ = month$.withLatestFrom(housecount$).map(arr=>arr[1]*0.5);
    
    // 月收入累加
    const income$ = salary$.merge(rent$);
    
    const cash$ = income$.scan((has, one)=>{
      has = has + one;
      if (has >= 100) {
        has -= 100;
        console.log('买房啦');
        house$.next(1);
      }
      return has;
    }, 0)
    
    cash$.subscribe(
      (data)=>{
        console.log('进账,余额:',data)
      },
      null,
      ()=>{
        console.log('complete');
      }
    )
    
    

    相关文章

      网友评论

        本文标题:Rxjs入门与初步应用

        本文链接:https://www.haomeiwen.com/subject/gkstnqtx.html