核心概念1:observable
observable是一个可观察对象,它会产生数据流,并push到观察者
import { Observable } from 'rxjs';
//创建一个可观察对象
let observable=Observable.create(x=>{
try{
x.next(1)//发射数据
x.next(2)//发射数据
x.next(3)//发射数据
x.complete()//结束可观察对象,后面的代码将不会执行
x.next(4)//永远不会发生
}catch(err){
x.error(err)//如果发生错误,发射错误数据
}
})
核心概念2:observer
observer是一个观察者,观察来自observable(可观察对象的数据)。订阅observable就产生了一个observer。有三个参数,第一个是正常数据的执行逻辑,第二个是发生错误后的执行逻辑,第三个是可观察对象结束(complete)后的执行逻辑
//let observer=observable.subscribe(x=>console.log(x))
//订阅可观察对象产生一个观察者
let observer=observable.subscribe(x=>{
console.log(x)
},err=>{
console.log(err)
},()=>{
console.log('complete')
})
- 取消订阅
import { Observable } from 'rxjs';
let observable=Observable.create(x=>{
x.next(1)
x.next(2)
setInterval(()=>{//每秒钟发射一个数据
x.next(1)
},1000)
})
let observer=observable.subscribe(x=>{
console.log(`第一个订阅者:${x}`)
})
let observer2=observable.subscribe(x=>{
console.log(`第二个订阅者:${x}`)
})
setTimeout(()=>{//6秒后结束1订阅者结束订阅
observer.unsubscribe();
},6000)
-
同时取消订阅
上面的例子只能取消的订阅,2继续订阅数据。如果像同时取消订阅者订阅数据,可以将2加入到1中。
import { Observable } from 'rxjs';
let observable=Observable.create(x=>{
x.next(1)
x.next(2)
setInterval(()=>{//每秒钟发射一个数据
x.next(1)
},1000)
})
let observer=observable.subscribe(x=>{
console.log(`第一个订阅者:${x}`)
})
let observer2=observable.subscribe(x=>{
console.log(`第二个订阅者:${x}`)
})
//将observer2加入都observer1中。
observer.add(observer2)
setTimeout(()=>{//6秒后结束1订阅者结束订阅,同时2订阅者也结束订阅
observer.unsubscribe();
},6000)
hot observable/cold observable
- cold observable
上面我们创建的observable都是cold observable,它的特点是后续的订阅者能收到可观察对象之前的数据
- hot observable
hot observable的特点是只能接收之后的数据。我们可以通过share函数来将cold observable转换成hot observable。当然也有专门的hot observable创建方式,例如fromEvent函数
- share函数
import { Observable } from 'rxjs';
import {share} from 'rxjs/operators';//导入share操作符
let observable=Observable.create(x=>{
x.next(1)
x.next(2)
setInterval(()=>{
x.next(1)
},1000)
}).pipe(share())//调用share函数将cold observable专程hot observable
let observer=observable.subscribe(x=>{
console.log(`第一个订阅者:${x}`)
})
setTimeOut(()=>{
let observer2=observable.subscribe(x=>{
console.log(`第二个订阅者:${x}`)
})
observer.add(observer2)
},3000)
setTimeout(()=>{
observer.unsubscribe();
},6000)
- fromEvent
import {fromEvent} from 'rxjs'//导入fromEvent
let observable=fromEvent(document,"click");//通过fromEvent创建一个页面点击数据源
let observer=observable.subscribe(x=>{
console.log(`第一个订阅者:${x}`)
})
setTimeout(()=>{
let observer2=observable.subscribe(x=>{
console.log(`第二个订阅者:${x}`)
})
},3000)
可以看到刚开始点击只会产生一条数据,3秒后2订阅开始结束点击数据,但是之前点击的数据2订阅并接受不到。所以fromEvent产色生的是一个hot observable
核心概念3:subject
subject-主题-是一类特殊的observable。subject和observable的特点就是普通observable创建出来后只能被读取数据(可能描述的不是很好,实际上是push),而subject能写入数据。
import {Subject} from 'rxjs'
const subject=new Subject()
let observer1=subject.subscribe(x=>{
console.log(`第一个订阅者:${x}`)
})
subject.next(1)
subject.next(2)
let observer2=subject.subscribe(x=>{
console.log(`第二个订阅者:${x}`)
})
subject.next(3)
observer1.add(observer2)
observer1.unsubscribe()
subject.next(4)
上传例子创建了一个主题,然后订阅1,然后往主题中发射了2个数据,然后订阅2,然后在发射了1个数据,然后将2加入1,然后取消订阅1同时取消订阅2,最后又发射了一个数据。
利用subject做一个搜索的案例
截图1558845936.png不考虑后台的优化,这里专注于前端。需求是在输入框中输入文字,能检测文字变化然后自动搜索相关信息展示在搜索框下面。要求,尽可能的减少重复请求。
此处设计了操作符的概念(debounceTime,distinctUntilChanged,switchMap),后面会具体展开讲
- 后台接口
后台接口这里使用json-server来创建一个简单的restapi
#db.json
{
"users":[
{"name":"1"},
{"name":"12"},
{"name":"123"},
{"name":"1234"},
{"name":"12345"},
{"name":"123456"},
{"name":"1234567"},
{"name":"12345678"}
]
}
#cmd
json-server --watch db.json
截图1558846216.png
- html
<!--搜索框,只要一输入就调用search方法,并把值传递过去-->
<input type="text" #searchBox (input)="search(searchBox.value)">
<div class='users'>
<!--使用angular的同步管道,将后台查询到的可观察数据展示出来-->
<div class="user" *ngFor="let user of users$ | async">
{{user.name}}
</div>
</div>
- scss
.users{
padding: 10px;
background-color: #ddd;
.user{
background-color: #fff;
margin-bottom: 10px;
}
}
- ts
import { Component } from '@angular/core';
import { Subject, Observable } from 'rxjs';
import { switchMap, debounceTime, distinctUntilChanged } from 'rxjs/operators';
import { HttpClient } from '@angular/common/http';
@Component({
selector: 'app-root',
templateUrl: './app.component.html',
styleUrls: ['./app.component.scss']
})
export class AppComponent {
subject = new Subject()//创建主题
users$:Observable<any[]>;//生命查询到的数据,它不是一个传统的list,而是一个obervable,订阅的操作交给了angular的async异步管道
constructor(private http: HttpClient) {//注入http客户端
this.users$=this.subject.pipe(//此处订阅主题,并将输入值通过switchMap转换后变成需要的数据
debounceTime(300),//debounceTime操作标识延迟300毫秒发射数据(当用户连续输入一些文字的时候只会发射最后的一次数据。因为通常用户在输入的时候会一下子输入好多的文字,而只有最后一次他停下来的时候才是需要搜索的关键词。所以此处延迟300毫米。这个值不能太大,太大了延迟影响用户体验,太小了起不到优化的作用)
distinctUntilChanged(),//distinctUntilChanged操作符只有值改变了才往下发射数据(假设用户快速数输入了123,然后又快速删除了刚刚输入的数据,这个时候查询结果应该是和之前是一样的,所以不需要再次查询后台了。)
switchMap(x => {//switchMap操作符拦截输入数据字符串,然后将后台查询得到的observable对象发射出去
return this.http.get<any>(`http://localhost:3000/users?q=${x}`)//具体查询数据使用json-server提供的全文检索q
})
)
}
search(value) {//此处对用应了html的输入调用方法。这里输入后将数据发射到主题中。有主题的订阅者(构造函数中)去处理具体的逻辑
this.subject.next(value)
}
}
[图片上传失败...(image-a30688-1558880758639)]
subject /behaviorSubject/
- subject
普通的主题的订阅者只能收到它创建后的数据
import { Subject } from 'rxjs';
const subject=new Subject()
let o1=subject.subscribe(x=>{
console.log(`1:${x}`)
})
subject.next(1)
let o2=subject.subscribe(x=>{
console.log(`2:${x}`)
})
subject.next(2)
- behaviorSubject
behaviourSubject能收到它创建之前的一个数据。它需要接受一个参数,这个参数作为第一个数据发射进主题,同时指定了数据类型,这里传入0,说明只接受number型
import { BehaviorSubject } from 'rxjs';
const subject=new BehaviorSubject(0)//这里传入0,说明只接受number型,并且发射了第一个数据0
let o1=subject.subscribe(x=>{
console.log(`1:${x}`)
})
subject.next(1)
let o2=subject.subscribe(x=>{
console.log(`2:${x}`)
})
subject.next(2)
- replaySubject
behaviorSubject能收到创建之前的一个数据,而replaySubject能指定一个数字,表示新创建的订阅者能收到之前的最多n个数据
import { ReplaySubject } from 'rxjs';
const subject=new ReplaySubject(2)//此处我们指定了数字2,代表能收到之前的2个数据
let o1=subject.subscribe(x=>{
console.log(`1:${x}`)
})
subject.next(1)
subject.next(2)
let o2=subject.subscribe(x=>{
console.log(`2:${x}`)
})
subject.next(3)
replaySubject创建过程还可以指定第二个参数,表示多少毫秒内的数据
import { ReplaySubject } from 'rxjs';
const subject=new ReplaySubject(10,200)//最大接受10个数据,但是只接受200毫秒内的数据
let i=0
setInterval(()=>{
subject.next(i++)
},100)
setTimeout(()=>{
subject.subscribe(x=>{
console.log(x)
})
},1000)
- asyncSubject
asyncSubject只接受最后一个数据,并且是在主题结束了以后才会发射。
import { AsyncSubject } from 'rxjs';
let subject=new AsyncSubject()
subject.subscribe(x=>{
console.log(x)
})
subject.next(1)
subject.next(2)
subject.next(3)
subject.next(4)
subject.complete()//只有调用了主题结束了,订阅者才能收到消息,并且只收到最后一个消息
网友评论