创建Observables
可以用new Observable创建,但实际情况更多的是用of、from、interval等操作符创建。
Observer
Observer用户获取到Observable推送的值,Observers是一系列回调函数,也就是Observable.subscribe的回调函数。
Observable.subscribe(x => console.log(x))
Observable.subscribe方法有三个回调函数,上面写的是其中最常用的一个
- Next : 接收Observable推送过来的值
- Error :接收错误对象
- Complete : 推送结束时触发(即使出现error),不会收到任何值
newObservable() {
const observable$ = new Observable(subscriber => {
subscriber.next(1)
// subscriber.error(new Error('出错了'))
subscriber.next(2)
subscriber.complete();
subscriber.next(3)
})
observable$.subscribe(res => {
console.log('res', res)
}, error => {
console.log('error', error)
}, () => {
console.log('complete')
})
}
//res 1
//res 2
//res complete
注:一旦推送Error,就不会再执行后续的推送,包括complete
- 可以用try/catch捕获错误
const observable = new Observable(subscribe => {
try{
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
}catch(err){
subscriber.error(err)
}
})
- 完整的写法
这里我们可以根据需要,去掉不需要的区块
observable$.subscribe({
next(res) {
console.log('res', res)
},
error(error) {
console.log('error', error)
},
complete() {
console.log('complete')
}
})
- 同步操作数据
newObservable() {
const observable$ = new Observable(subscriber => {
subscriber.next(1)
subscriber.next(2)
subscriber.next(3)
})
console.log('before')
observable$.subscribe(res => {
console.log('res', res)
})
console.log('after')
}
//before
//res 1
//res 2
//res 3
//after
- 异步操作数据
newObservable() {
const observable$ = new Observable(subscriber => {
subscriber.next(1)
subscriber.next(2)
subscriber.next(3)
setTimeout(() => {
subscriber.next(300);
}, 1000)
})
console.log('before')
observable$.subscribe(res => {
console.log('res', res)
})
console.log('after')
}
//before
//res 1
//res 2
//res 3
//after
//300(1秒后)
Subscription订阅
import {interval} from 'rxjs';
newObservable(){
const observable = interval(1000);
const subscription = observable.subscribe(x => console.log(x))
setTimeout(()=>{
subscription.unsubscribe();
},3000)
}
订阅可以放在一起,以便
unsubscribe()
对一个订阅中的一个调用可以取消多个订阅。
import {interval} from 'rxjs'
newObservable(){
const observable1 = interval(800);
const observalbe2 = interval(400);
const subscription = observable1.subscribe(x => console.log('first' + x))
const childSubscription = observable2.subscribe(x => console.log('second:' + x))
subscription.add(childSubscription);
setTimeout(()=>{
subscription.unsubscribe();
},3000)
}
由于通过
subscription.add(childSubscription)
操作将childSubscription
添加到了subscirption
中;所以,在取消订阅时,只需要取消父级的操作。
- 引入
创建类的操作符在rxjs下,功能类的操作符在rxjs/operators下
import { AsyncSubject, BehaviorSubject, combineLatest, concat, ConnectableObservable, EMPTY, empty, forkJoin, from, fromEvent, iif, interval, merge, observable, Observable, of, partition, race, range, ReplaySubject, Subject, throwError, timer, zip } from 'rxjs';
import { audit, auditTime, buffer, bufferCount, bufferTime, bufferToggle, bufferWhen, catchError, combineAll, concatAll, concatMap, concatMapTo, count, debounce, debounceTime, defaultIfEmpty, delay, delayWhen, distinct, distinctUntilChanged, distinctUntilKeyChanged, elementAt, endWith, every, exhaust, exhaustMap, filter, find, findIndex, first, groupBy, ignoreElements, isEmpty, last, map, mapTo, max, mergeAll, mergeMap, mergeMapTo, mergeScan, multicast, pairwise, pluck, publish, publishBehavior, publishLast, publishReplay, reduce, refCount, retry, retryWhen, sample, sampleTime, scan, share, single, skip, skipLast, skipUntil, skipWhile, startWith, switchMap, switchMapTo, take, takeLast, takeUntil, takeWhile, tap, throttle, throttleTime, timeInterval, timeout, timeoutWith, timestamp, toArray, withLatestFrom } from 'rxjs/operators';
网友评论