rxjs-observable/observer/subject

作者: bugWriter_y | 来源:发表于2019-05-27 06:54 被阅读28次

    核心概念1:observable

    observable是一个可观察对象,它会产生数据流,并push到观察者

    import { Observable } from 'rxjs';
    //创建一个可观察对象
    let observable=Observable.create(x=>{
        try{
            x.next(1)//发射数据
            x.next(2)//发射数据
            x.next(3)//发射数据
            x.complete()//结束可观察对象,后面的代码将不会执行
            x.next(4)//永远不会发生
        }catch(err){
            x.error(err)//如果发生错误,发射错误数据
        }
    })
    
    
    

    核心概念2:observer

    observer是一个观察者,观察来自observable(可观察对象的数据)。订阅observable就产生了一个observer。有三个参数,第一个是正常数据的执行逻辑,第二个是发生错误后的执行逻辑,第三个是可观察对象结束(complete)后的执行逻辑

    //let observer=observable.subscribe(x=>console.log(x))
    //订阅可观察对象产生一个观察者
    let observer=observable.subscribe(x=>{
        console.log(x)
    },err=>{
        console.log(err)
    },()=>{
        console.log('complete')
    })
    
    1. 取消订阅
    import { Observable } from 'rxjs';
    let observable=Observable.create(x=>{
        x.next(1)
        x.next(2)
        setInterval(()=>{//每秒钟发射一个数据
            x.next(1)
        },1000)
    })
    
    let observer=observable.subscribe(x=>{
        console.log(`第一个订阅者:${x}`)
    })
    let observer2=observable.subscribe(x=>{
        console.log(`第二个订阅者:${x}`)
    })
    setTimeout(()=>{//6秒后结束1订阅者结束订阅
        observer.unsubscribe();
    },6000)
    
    1. 同时取消订阅

      上面的例子只能取消的订阅,2继续订阅数据。如果像同时取消订阅者订阅数据,可以将2加入到1中。

    import { Observable } from 'rxjs';
    let observable=Observable.create(x=>{
        x.next(1)
        x.next(2)
        setInterval(()=>{//每秒钟发射一个数据
            x.next(1)
        },1000)
    })
    
    let observer=observable.subscribe(x=>{
        console.log(`第一个订阅者:${x}`)
    })
    let observer2=observable.subscribe(x=>{
        console.log(`第二个订阅者:${x}`)
    })
    //将observer2加入都observer1中。
    observer.add(observer2)
    setTimeout(()=>{//6秒后结束1订阅者结束订阅,同时2订阅者也结束订阅
        observer.unsubscribe();
    },6000)
    
    hot observable/cold observable
    1. cold observable

    上面我们创建的observable都是cold observable,它的特点是后续的订阅者能收到可观察对象之前的数据

    1. hot observable

    hot observable的特点是只能接收之后的数据。我们可以通过share函数来将cold observable转换成hot observable。当然也有专门的hot observable创建方式,例如fromEvent函数

    • share函数
    import { Observable } from 'rxjs';
    import {share} from 'rxjs/operators';//导入share操作符
    let observable=Observable.create(x=>{
        x.next(1)
        x.next(2)
        setInterval(()=>{
            x.next(1)
        },1000)
    }).pipe(share())//调用share函数将cold observable专程hot observable
    
    let observer=observable.subscribe(x=>{
        console.log(`第一个订阅者:${x}`)
    })
    setTimeOut(()=>{
        let observer2=observable.subscribe(x=>{
            console.log(`第二个订阅者:${x}`)
        })  
        observer.add(observer2)
    },3000)
    setTimeout(()=>{
        observer.unsubscribe();
    },6000)
    
    • fromEvent
    import {fromEvent} from 'rxjs'//导入fromEvent
    let observable=fromEvent(document,"click");//通过fromEvent创建一个页面点击数据源
    
    let observer=observable.subscribe(x=>{
        console.log(`第一个订阅者:${x}`)
    })
    setTimeout(()=>{
        let observer2=observable.subscribe(x=>{
            console.log(`第二个订阅者:${x}`)
        })  
    },3000)
    

    可以看到刚开始点击只会产生一条数据,3秒后2订阅开始结束点击数据,但是之前点击的数据2订阅并接受不到。所以fromEvent产色生的是一个hot observable

    核心概念3:subject

    subject-主题-是一类特殊的observable。subject和observable的特点就是普通observable创建出来后只能被读取数据(可能描述的不是很好,实际上是push),而subject能写入数据。

    import {Subject} from 'rxjs'
    
    const subject=new Subject()
    let observer1=subject.subscribe(x=>{
        console.log(`第一个订阅者:${x}`)
    })
    subject.next(1)
    subject.next(2)
    let observer2=subject.subscribe(x=>{
        console.log(`第二个订阅者:${x}`)
    })
    subject.next(3)
    observer1.add(observer2)
    observer1.unsubscribe()
    subject.next(4)
    

    上传例子创建了一个主题,然后订阅1,然后往主题中发射了2个数据,然后订阅2,然后在发射了1个数据,然后将2加入1,然后取消订阅1同时取消订阅2,最后又发射了一个数据。

    利用subject做一个搜索的案例

    不考虑后台的优化,这里专注于前端。需求是在输入框中输入文字,能检测文字变化然后自动搜索相关信息展示在搜索框下面。要求,尽可能的减少重复请求。

    此处设计了操作符的概念(debounceTime,distinctUntilChanged,switchMap),后面会具体展开讲

    截图1558845936.png
    1. 后台接口

    后台接口这里使用json-server来创建一个简单的restapi

    #db.json
    {
        "users":[
            {"name":"1"},
            {"name":"12"},
            {"name":"123"},
            {"name":"1234"},
            {"name":"12345"},
            {"name":"123456"},
            {"name":"1234567"},
            {"name":"12345678"}
        ]
    }
    
    #cmd
    json-server --watch db.json
    
    截图1558846216.png
    1. html
    <!--搜索框,只要一输入就调用search方法,并把值传递过去-->
    <input type="text" #searchBox (input)="search(searchBox.value)">
    <div class='users'>
      <!--使用angular的同步管道,将后台查询到的可观察数据展示出来-->
      <div class="user" *ngFor="let user of users$ | async">
        {{user.name}}
      </div>
    </div>
    
    1. scss
    .users{
        padding: 10px;
        background-color: #ddd;
        .user{
            background-color: #fff;
            margin-bottom: 10px;
        }
    }
    
    1. ts
    import { Component } from '@angular/core';
    import { Subject, Observable } from 'rxjs';
    import { switchMap, debounceTime, distinctUntilChanged } from 'rxjs/operators';
    import { HttpClient } from '@angular/common/http';
    @Component({
      selector: 'app-root',
      templateUrl: './app.component.html',
      styleUrls: ['./app.component.scss']
    })
    export class AppComponent {
      subject = new Subject()//创建主题
      users$:Observable<any[]>;//生命查询到的数据,它不是一个传统的list,而是一个obervable,订阅的操作交给了angular的async异步管道
      constructor(private http: HttpClient) {//注入http客户端
        this.users$=this.subject.pipe(//此处订阅主题,并将输入值通过switchMap转换后变成需要的数据
          debounceTime(300),//debounceTime操作标识延迟300毫秒发射数据(当用户连续输入一些文字的时候只会发射最后的一次数据。因为通常用户在输入的时候会一下子输入好多的文字,而只有最后一次他停下来的时候才是需要搜索的关键词。所以此处延迟300毫米。这个值不能太大,太大了延迟影响用户体验,太小了起不到优化的作用)
          distinctUntilChanged(),//distinctUntilChanged操作符只有值改变了才往下发射数据(假设用户快速数输入了123,然后又快速删除了刚刚输入的数据,这个时候查询结果应该是和之前是一样的,所以不需要再次查询后台了。)
          switchMap(x => {//switchMap操作符拦截输入数据字符串,然后将后台查询得到的observable对象发射出去
            return this.http.get<any>(`http://localhost:3000/users?q=${x}`)//具体查询数据使用json-server提供的全文检索q
          })
        )
      }
      search(value) {//此处对用应了html的输入调用方法。这里输入后将数据发射到主题中。有主题的订阅者(构造函数中)去处理具体的逻辑
        this.subject.next(value)
      }
    }
    

    [图片上传失败...(image-a30688-1558880758639)]

    subject /behaviorSubject/
    1. subject

    普通的主题的订阅者只能收到它创建后的数据

    import { Subject } from 'rxjs';
    
    const subject=new Subject()
    let o1=subject.subscribe(x=>{
        console.log(`1:${x}`)
    })
    subject.next(1)
    let o2=subject.subscribe(x=>{
        console.log(`2:${x}`)
    })
    subject.next(2)
    
    
    1. behaviorSubject

    behaviourSubject能收到它创建之前的一个数据。它需要接受一个参数,这个参数作为第一个数据发射进主题,同时指定了数据类型,这里传入0,说明只接受number型

    import { BehaviorSubject } from 'rxjs';
    
    const subject=new BehaviorSubject(0)//这里传入0,说明只接受number型,并且发射了第一个数据0
    let o1=subject.subscribe(x=>{
        console.log(`1:${x}`)
    })
    subject.next(1)
    let o2=subject.subscribe(x=>{
        console.log(`2:${x}`)
    })
    subject.next(2)
    
    1. replaySubject

    behaviorSubject能收到创建之前的一个数据,而replaySubject能指定一个数字,表示新创建的订阅者能收到之前的最多n个数据

    import { ReplaySubject } from 'rxjs';
    const subject=new ReplaySubject(2)//此处我们指定了数字2,代表能收到之前的2个数据
    let o1=subject.subscribe(x=>{
        console.log(`1:${x}`)
    })
    subject.next(1)
    subject.next(2)
    let o2=subject.subscribe(x=>{
        console.log(`2:${x}`)
    })
    subject.next(3)
    

    replaySubject创建过程还可以指定第二个参数,表示多少毫秒内的数据

    import { ReplaySubject } from 'rxjs';
    const subject=new ReplaySubject(10,200)//最大接受10个数据,但是只接受200毫秒内的数据
    let i=0
    setInterval(()=>{
        subject.next(i++)
    },100)
    setTimeout(()=>{
        subject.subscribe(x=>{
            console.log(x)
        })
    },1000)
    
    1. asyncSubject

    asyncSubject只接受最后一个数据,并且是在主题结束了以后才会发射。

    import { AsyncSubject } from 'rxjs';
    let subject=new AsyncSubject()
    subject.subscribe(x=>{
      console.log(x)
    })
    
    subject.next(1)
    subject.next(2)
    subject.next(3)
    subject.next(4)
    subject.complete()//只有调用了主题结束了,订阅者才能收到消息,并且只收到最后一个消息
    

    相关文章

      网友评论

        本文标题:rxjs-observable/observer/subject

        本文链接:https://www.haomeiwen.com/subject/qgkqtctx.html