响应式编程简介
响应式编程是一种面向数据流和变化传播的编程方式,即可以在编程中很方便的表达静态或动态的数据流,相关的计算模型会自动将变化的值通过数据流进行传播。
Rxjs就是一个透过Observable组合各种非同步行为的Library,它是一个使用可观察数据流进行异步编程的编程接口,结合了观察者模式、迭代器模式和函数式编程。
Rxjs概念
Rxjs全称Reactive Extension for JavaScript,JavaScript的响应式扩展,响应式的思路就是把随时间不断变化的数据、状态、事件等等转成可被观察的序列(Observable Sequence),然后订阅序列中那些Observable对象的变化,一旦有变化,就会执行事先安排好的各种转换和操作。
核心:可观察序列(Observable),订阅(subscribe),操作符(operators),调度(schedulers)
适用场景
- 同步和异步
- 获取与订阅
- 查询数据的组合
- 已有数据和未来数据的更新策略
Promise异步编程的缺点:1.单一数据模式;2.对异步事件的控制力很弱
Observable (建立与订阅)
observable在未被订阅之前只是个实例对象,不会有任何输出。在被订阅(subscribe)时进行数据操作,具有多种operators。operator是Observable的方法,可以对元素进行封装处理。一般是返回一个新的observable实例。
observer:用来订阅observable的实例对象,内含next,complete,error三个参数,next一定要有,其他两项可选。
subscription:observable订阅返回的实例,可以用来退订(unsubscribe),也可以与其他订阅合并。
// 建立(发布)
let mouseMove = Observable.fromEvent(DOM,'mousemove');
// 订阅
let subscription = mouseMove.subscribe(x => console.log(x));
// 移除
subscription.unsubscribe();
如果有一个observable内部还是一个observable,可以用mergeAll()将其展开,类似吧二维数组转成一位数组,一般会和map一起使用,所以通常简写为mergeMap,但在序列元素处理过程中我只想去最新处理的元素可以使用switchMap(),如果是需要保留正在处理的,让新的即将处理暂时等待旧的处理完再执行可以用exhaustMap()
Observable
.fromEvent(DOM,'click')
.switchMap(()=>
ajax('url')
)
对于请求的处理,可以设置在多少次请求失败之后再抛出异常。
const defaultData = {success:false,data:[]};
const getPostObservable = () =>
Observable.ajax('url')
.retry(3) // 若三次都请求失败再进入到catch
.catch(() => Observable.of(defaultData));
// 简化catch
.catch(() => [defaultData]);
- Rxjs使得我们能够完全依赖于声明语句来获取动态的值。
// js常规代码
let a = 3;
let b = a * 10;
a = 9
b = a * 10
console.log(b)
// rxjs代码
import { interval } from 'rxjs';
import { map } from 'rxjs/operators';
const a = interval(1000).pipe(
map(_ => Math.random().toFixed(1)),
);
const b = a.pipe(
map(item => Number(item) * 10)
)
// 实时监听a依赖的变化
b.subscribe(v => console.log(v))
- Rxjs监听DOM事件
fromEvent将指定元素上的事件转换成事件流
const btn = document.getElementById('btn');
// js常规代码
let pre = new Date().getTime();
btn.addEventListener('click',function(event){
const now = new Date().getTime();
const silence = now - pre;
if(silence > 1000){
pre = now;
console.log(event);
}
})
// rxjs代码
// fromEvent将制定元素上的事件转换成事件流
import { fromEvent } from 'rxjs';
import { throttleTime } from 'rxjs/operators';
const obs = fromEvent(btn,'click').pipe(
throttleTime(1000),
);
obs.subscribe(v => console.log(v))
- Rxjs处理网络请求
import { fromEvent,interval,from,merge } from 'rxjs';
import { mergeMap } from 'rxjs/operators';
const btn = document.getElementById('btn')
// DOM事件流---
const click = fromEvent(btn,'click');
// 程序逻辑流---
const inner = interval(3000);
const request = from(fetch('https://api.github.com/users').then(res => res.json()))
// 合并事件流
const response = merge(
click,
inner,
).pipe(
mergeMap(_ => request)
)
response.subscribe(v => console.log(v))
操作符(Operators)
操作符是 Observable 类型上的方法,比如 .map(...)、.filter(...)、.merge(...),等等。其本质上是纯函数,当操作符被调用时,它们不会改变已经存在的 Observable 实例,是一个无副作用的操作。相反,它们返回一个新的 Observable ,它的 subscription 逻辑基于第一个 Observable 。
操作符有着不同的用途,它们可作如下分类:创建、转换、过滤、组合、错误处理、工具,等等。这里介绍一些常用的操作符。
一、创造observabl类
- create
// RxJS v6+
import { Observable } from 'rxjs';
/*
创建在订阅函数中发出 'Hello' 和 'World' 的 observable 。
*/
const hello = Observable.create(function(observer) {
observer.next('Hello');
observer.next('World');
});
// 输出: 'Hello'...'World'
const subscribe = hello.subscribe(val => console.log(val));
- of
of类似于一个迭代器,将参数迭代然后发出。它接收任意多个参数,参数可以是任意类型,然后它会把这些参数逐个放入流中。
// RxJS v6+
import { of } from 'rxjs';
// 依次发出提供的任意数量的值
const source = of(1, 2, 3, 4, 5, { name: 'Brian' }, [1, 2, 3], function hello() {
return 'Hello';
});
// 输出: 1,2,3,4,5,{name: 'Brian}, [1,2,3], function hello() { return 'Hello' }
const subscribe = source.subscribe(val => console.log(val));
- from
from的参数必须是一个类数组(set,iterator,promise等),其他和of一样
// RxJS v6+
import { from } from 'rxjs';
// 将数组作为值的序列发出
const arraySource = from([1, 2, 3, 4, 5]);
// 输出: 1,2,3,4,5
const subscribe = arraySource.subscribe(val => console.log(val));
- fromEvent
将事件转换成 observable 序列。
// RxJS v6+
import { fromEvent } from 'rxjs';
import { map } from 'rxjs/operators';
// 创建发出点击事件的 observable
const source = fromEvent(document, 'click');
// 映射成给定的事件时间戳
const example = source.pipe(map(event => `Event time: ${event.timeStamp}`));
// 输出 (示例中的数字以运行时为准): 'Event time: 7276.390000000001'
const subscribe = example.subscribe(val => console.log(val));
- fromPromise
创建由 promise 转换而来的 observable,并发出 promise 的结果。
import { of } from 'rxjs/observable/of';
import { fromPromise } from 'rxjs/observable/fromPromise';
import { mergeMap, catchError } from 'rxjs/operators';
// 基于输入来决定是 resolve 还是 reject 的示例 promise
const myPromise = willReject => {
return new Promise((resolve, reject) => {
if (willReject) {
reject('Rejected!');
}
resolve('Resolved!');
});
};
// 先发出 true,然后是 false
const source = of(true, false);
const example = source.pipe(
mergeMap(val =>
fromPromise(myPromise(val)).pipe(
// 捕获并优雅地处理 reject 的结果
catchError(error => of(`Error: ${error}`))
)
)
);
// 输出: 'Error: Rejected!', 'Resolved!'
const subscribe = example.subscribe(val => console.log(val));
- timer
timer有两个参数,第一个参数表示到发送第一个值的间隔时间,第二个参数表示从发送第二个参数开始,没发送一个值的间隔时间,如果第二个参数为空则发送第一个参数后,终止,执行complete函数。
// RxJS v6+
import { timer } from 'rxjs';
/*
timer 接收第二个参数,它决定了发出序列值的频率,在本例中我们在1秒发出第一个值,
然后每2秒发出序列值
*/
const source = timer(1000, 2000);
// 输出: 0,1,2,3,4,5......
const subscribe = source.subscribe(val => console.log(val));
- interval
基于给定时间间隔发出数字序列。相当于 timer(1000, 1000),也就是说初始等待时间和间隔时间是一样的。
// RxJS v6+
import { interval } from 'rxjs';
// 每1秒发出数字序列中的值
const source = interval(1000);
// 数字: 0,1,2,3,4,5....
const subscribe = source.subscribe(val => console.log(val));
二、合并创建器
一般有DOM事件流和程序逻辑流,我们可以对这些流进行不同形式的合并,创建一个新的流,常见的合并方式有三种:并联、串联、拉链。
- merge - 并联
// RxJS v6+
import { merge } from 'rxjs/operators';
import { interval, merge } from 'rxjs';
// 每2.5秒发出值
const first = interval(2500);
// 每1秒发出值
const second = interval(1000);
// 作为实例方法使用
const example = first.pipe(merge(second));
// 输出: 0,1,0,2....
const subscribe = example.subscribe(val => console.log(val));
// 从一个 observable 中发出输出值
const example1 = merge(
first.pipe(mapTo('FIRST!')),
second.pipe(mapTo('SECOND!')),
);
// 输出: "SECOND!", "FIRST!"
const subscribe1 = example1.subscribe(val => console.log(val));
- concat - 串联
例如;两个流中的内容被按照顺序放进了输出流中,前面的流尚未结束时,后面的流就会一直等待。
// RxJS v6+
import { of, concat } from 'rxjs';
// 发出 1,2,3
const sourceOne = of(1, 2, 3);
// 发出 4,5,6
const sourceTwo = of(4, 5, 6);
// 作为静态方法使用
const example = concat(sourceOne, sourceTwo);
// 输出: 1,2,3,4,5,6
const subscribe = example.subscribe(val => console.log(val));
- zip - 拉链
在所有 observables 发出后,将它们的值作为数组发出,通常用于合并两个数据有对应关系的数据源。
// RxJS v6+
import { delay } from 'rxjs/operators';
import { of, zip } from 'rxjs';
const sourceOne = of('Hello');
const sourceTwo = of('World!');
// 一直等到所有 observables 都发出一个值,才将所有值作为数组发出
const example = zip(
sourceOne,
sourceTwo.pipe(delay(1000)),
);
// 输出: ["Hello", "World!"]
const subscribe = example.subscribe(val => console.log(val));
三、错误处理
- retry - 失败时重试
有些错误是可以通过重试进行恢复的,比如临时性的网络丢包。甚至一些流程的设计还会故意借助重试机制,比如当你发起请求时,如果后端发现你没有登录过,就会给你一个 401 错误,然后你可以完成登录并重新开始整个流程。
retry 操作符就是负责在失败时自动发起重试的,它可以接受一个参数,用来指定最大重试次数。
// RxJS v6+
import { interval, of, throwError } from 'rxjs';
import { mergeMap, retry } from 'rxjs/operators';
// 每1秒发出值
const source = interval(1000);
const example = source.pipe(
mergeMap(val => {
// 抛出错误以进行演示
if (val > 5) {
return throwError('Error!');
}
return of(val);
}),
// 出错的话可以重试2次
retry(2)
);
/*
输出:
0..1..2..3..4..5..
0..1..2..3..4..5..
0..1..2..3..4..5..
"Error!: Retried 2 times then quit!"
*/
const subscribe = example.subscribe({
next: val => console.log(val),
error: val => console.log(`${val}: Retried 2 times then quit!`)
});
四、选择器类
- take
有的时候我门希望获取Observable前几个数然后结束(执行complete方法)
var source = interval(1000);
var example = source.pipe(take(3));
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// 0
// 1
// 2
// complete
取得首次点击的坐标
<div id="locationDisplay">
Where would you click first?
</div>
// RxJS v6+
import { fromEvent } from 'rxjs';
import { take, tap } from 'rxjs/operators';
const oneClickEvent = fromEvent(document, 'click').pipe(
take(1),
tap(v => {
document.getElementById(
'locationDisplay'
).innerHTML = `Your first click was on location ${v.screenX}:${v.screenY}`;
})
);
const subscribe = oneClickEvent.subscribe();
- takeUntil
发出值,直到提供的 observable 发出值,它便完成。
// RxJS v6+
import { interval, timer } from 'rxjs';
import { takeUntil } from 'rxjs/operators';
// 每1秒发出值
const source = interval(1000);
// 5秒后发出值
const timer$ = timer(5000);
// 当5秒后 timer 发出值时, source 则完成
const example = source.pipe(takeUntil(timer$));
// 输出: 0,1,2,3
const subscribe = example.subscribe(val => console.log(val));
- filter
过滤出符合给定条件的值。
// RxJS v6+
import { from } from 'rxjs';
import { filter } from 'rxjs/operators';
// 发出 ({name: 'Joe', age: 31}, {name: 'Bob', age:25})
const source = from([{ name: 'Joe', age: 31 }, { name: 'Bob', age: 25 }]);
// 过滤掉年龄小于30岁的人
const example = source.pipe(filter(person => person.age >= 30));
// 输出: "Over 30: Joe"
const subscribe = example.subscribe(val => console.log(`Over 30: ${val.name}`));
- first
发出第一个值或第一个通过给定表达式的值,与之相对应的还有last。
// RxJS v6+
import { from } from 'rxjs';
import { first } from 'rxjs/operators';
const source = from([1, 2, 3, 4, 5]);
// 没有参数则发出第一个值
const example = source.pipe(first());
// 发出通过测试的第一项
const example = source.pipe(first(num => num === 5));
// 输出: "First to pass test: 5"
// 没有值通过的话则发出默认值
const example = source.pipe(first(val => val > 5, 'Nothing'));
// 输出: 'Nothing'
const subscribe = example.subscribe(val =>
console.log(`First to pass test: ${val}`)
);
- skip
跳过N个(由参数提供)发出值。
// RxJS v6+
import { from } from 'rxjs';
import { skip, filter } from 'rxjs/operators';
const numArrayObs = from([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
// 3,4,5...
const skipObs = numArrayObs.pipe(skip(2)).subscribe(console.log);
// 3,4,5...
const filterObs = numArrayObs
.pipe(filter((val, index) => index > 1))
.subscribe(console.log);
// 同样的输出!
- takeWhile
发出值,直到提供的表达式结果为 false 。
takeWhile() 和 filter() 的区别
// RxJS v6+
import { of } from 'rxjs';
import { takeWhile, filter } from 'rxjs/operators';
// 发出 3, 3, 3, 9, 1, 4, 5, 8, 96, 3, 66, 3, 3, 3
const source = of(3, 3, 3, 9, 1, 4, 5, 8, 96, 3, 66, 3, 3, 3);
// 允许值通过直到源发出的值不等于3,然后完成
// 输出: [3, 3, 3]
source
.pipe(takeWhile(it => it === 3))
.subscribe(val => console.log('takeWhile', val));
// 输出: [3, 3, 3, 3, 3, 3, 3]
source
.pipe(filter(it => it === 3))
.subscribe(val => console.log('filter', val));
- reduce
将源 observalbe 的值归并为单个值,当源 observable 完成时将这个值发出。
// RxJS v6+
import { of } from 'rxjs';
import { reduce } from 'rxjs/operators';
const source = of(1, 2, 3, 4);
const example = source.pipe(reduce((acc, val) => acc + val));
// 输出: Sum: 10'
const subscribe = example.subscribe(val => console.log('Sum:', val));
- scan
随着时间的推移进行归并。
// RxJS v6+
import { Subject } from 'rxjs';
import { scan } from 'rxjs/operators';
const subject = new Subject();
// scan 示例,随着时间的推移构建对象
const example = subject.pipe(
scan((acc, curr) => Object.assign({}, acc, curr), {})
);
// 输出累加值
const subscribe = example.subscribe(val =>
console.log('Accumulated object:', val)
);
// subject 发出的值会被添加成对象的属性
// {name: 'Joe'}
subject.next({ name: 'Joe' });
// {name: 'Joe', age: 30}
subject.next({ age: 30 });
// {name: 'Joe', age: 30, favoriteLanguage: 'JavaScript'}
subject.next({ favoriteLanguage: 'JavaScript' });
- concatMap 和 mergeMap
concatMap将值映射成内部 observable,并按顺序订阅和发出。mergeMap映射成 observable 并发出值。
concatMap 和 mergeMap 之间的区别
// RxJS v6+
import { of } from 'rxjs';
import { concatMap, delay, mergeMap } from 'rxjs/operators';
// 发出延迟值
const source = of(2000, 1000);
// 将内部 observable 映射成 source,当前一个完成时发出结果并订阅下一个
const example = source.pipe(
concatMap(val => of(`Delayed by: ${val}ms`).pipe(delay(val)))
);
// 输出: With concatMap: Delayed by: 2000ms, With concatMap: Delayed by: 1000ms
const subscribe = example.subscribe(val =>
console.log(`With concatMap: ${val}`)
);
// 展示 concatMap 和 mergeMap 之间的区别
const mergeMapExample = source
.pipe(
// 只是为了确保 meregeMap 的日志晚于 concatMap 示例
delay(5000),
mergeMap(val => of(`Delayed by: ${val}ms`).pipe(delay(val)))
)
.subscribe(val => console.log(`With mergeMap: ${val}`));
使用 promise 进行 mergeMap
// RxJS v6+
import { of } from 'rxjs';
import { mergeMap } from 'rxjs/operators';
// 发出 'Hello'
const source = of('Hello');
// mergeMap 还会发出 promise 的结果
const myPromise = val =>
new Promise(resolve => resolve(`${val} World From Promise!`));
// 映射成 promise 并发出结果
const example = source.pipe(mergeMap(val => myPromise(val)));
// 输出: 'Hello World From Promise'
const subscribe = example.subscribe(val => console.log(val));
- switchMap
映射成 observable,完成前一个内部 observable,发出值。switchMap 和其他打平操作符的主要区别是它具有取消效果。在每次发出时,会取消前一个内部 observable (你所提供函数的结果) 的订阅,然后订阅一个新的 observable 。你可以通过短语切换成一个新的 observable来记忆它。
每次点击时重置
// RxJS v6+
import { interval, fromEvent } from 'rxjs';
import { switchMap, mapTo } from 'rxjs/operators';
// 发出每次点击
const source = fromEvent(document, 'click');
// 如果3秒内发生了另一次点击,则消息不会被发出
const example = source.pipe(
switchMap(val => interval(3000).pipe(mapTo('Hello, I made it!')))
);
// (点击)...3s...'Hello I made it!'...(点击)...2s(点击)...
const subscribe = example.subscribe(val => console.log(val));
- exhaustMap
映射成内部 observable,忽略其他值直到该 observable 完成。
// RxJS v6+
import { interval, merge, of } from 'rxjs';
import { delay, take, exhaustMap } from 'rxjs/operators';
const sourceInterval = interval(1000);
const delayedInterval = sourceInterval.pipe(delay(10), take(4));
const exhaustSub = merge(
// 延迟10毫秒,然后开始 interval 并发出4个值
delayedInterval,
// 立即发出
of(true)
)
.pipe(exhaustMap(_ => sourceInterval.pipe(take(5))))
/*
* 第一个发出的值 (of(true)) 会被映射成每秒发出值、
* 5秒后完成的 interval observable 。
* 因为 delayedInterval 的发送是晚于前者的,虽然 observable
* 仍然是活动的,但它们会被忽略。
*
* 与类似的操作符进行下对比:
* concatMap 会进行排队
* switchMap 会在每次发送时切换成新的内部 observable
* mergeMap 会为每个发出值维护新的 subscription
*/
// 输出: 0, 1, 2, 3, 4
.subscribe(val => console.log(val));
五、工具类
- delay
根据给定时间延迟发出值。
- let
拥有完整的 observable 。
// 将数组作为序列发出
const source = Rx.Observable.from([1, 2, 3, 4, 5]);
// 传入你自己的函数来将操作符添加到 observable
const obsArrayPlusYourOperators = yourAppliedOperators => {
return source.map(val => val + 1).let(yourAppliedOperators);
};
const addTenThenTwenty = obs => obs.map(val => val + 10).map(val => val + 20);
const subscribe = obsArrayPlusYourOperators(addTenThenTwenty)
// 输出: 32, 33, 34, 35, 36
.subscribe(val => console.log('let FROM FUNCTION:', val));
- toPromise
将 observable 转换成 promise 。
// 返回基础的 observable
const sample = val => Rx.Observable.of(val).delay(5000);
// 将基础的 observable 转换成 promise
const example = sample('First Example')
.toPromise()
// 输出: 'First Example'
.then(result => {
console.log('From Promise:', result);
});
总结:
响应式优点:使用声明语句来获取动态的值,语义化,状态隔离;
响应式思维:Everything is observable; 无论是数组,DOM事件还是网络请求,都可以被抽象成按照时间序列发生的事件。
网友评论