Rxjs

作者: 北风吹_yfy | 来源:发表于2019-12-18 19:08 被阅读0次

响应式编程简介

响应式编程是一种面向数据流和变化传播的编程方式,即可以在编程中很方便的表达静态或动态的数据流,相关的计算模型会自动将变化的值通过数据流进行传播。
Rxjs就是一个透过Observable组合各种非同步行为的Library,它是一个使用可观察数据流进行异步编程的编程接口,结合了观察者模式、迭代器模式和函数式编程。

Rxjs概念

Rxjs全称Reactive Extension for JavaScript,JavaScript的响应式扩展,响应式的思路就是把随时间不断变化的数据、状态、事件等等转成可被观察的序列(Observable Sequence),然后订阅序列中那些Observable对象的变化,一旦有变化,就会执行事先安排好的各种转换和操作。

核心:可观察序列(Observable),订阅(subscribe),操作符(operators),调度(schedulers)

适用场景

  • 同步和异步
  • 获取与订阅
  • 查询数据的组合
  • 已有数据和未来数据的更新策略

Promise异步编程的缺点:1.单一数据模式;2.对异步事件的控制力很弱

Observable (建立与订阅)

observable在未被订阅之前只是个实例对象,不会有任何输出。在被订阅(subscribe)时进行数据操作,具有多种operators。operator是Observable的方法,可以对元素进行封装处理。一般是返回一个新的observable实例。

observer:用来订阅observable的实例对象,内含next,complete,error三个参数,next一定要有,其他两项可选。

subscription:observable订阅返回的实例,可以用来退订(unsubscribe),也可以与其他订阅合并。

// 建立(发布)
let mouseMove = Observable.fromEvent(DOM,'mousemove');
// 订阅
let subscription = mouseMove.subscribe(x => console.log(x));
// 移除
subscription.unsubscribe();

如果有一个observable内部还是一个observable,可以用mergeAll()将其展开,类似吧二维数组转成一位数组,一般会和map一起使用,所以通常简写为mergeMap,但在序列元素处理过程中我只想去最新处理的元素可以使用switchMap(),如果是需要保留正在处理的,让新的即将处理暂时等待旧的处理完再执行可以用exhaustMap()

Observable
  .fromEvent(DOM,'click')
  .switchMap(()=>
    ajax('url')
  )

对于请求的处理,可以设置在多少次请求失败之后再抛出异常。

const defaultData = {success:false,data:[]};
const getPostObservable = () => 
    Observable.ajax('url')
      .retry(3) // 若三次都请求失败再进入到catch
      .catch(() => Observable.of(defaultData));
      // 简化catch
      .catch(() => [defaultData]);
  • Rxjs使得我们能够完全依赖于声明语句来获取动态的值。
// js常规代码
let a = 3;
let b = a * 10;

a = 9
b = a * 10 
console.log(b)

// rxjs代码
import { interval } from 'rxjs'; 
import { map } from 'rxjs/operators';

const a = interval(1000).pipe(
  map(_ => Math.random().toFixed(1)),
);

const b = a.pipe(
  map(item => Number(item) * 10)
)
// 实时监听a依赖的变化
b.subscribe(v => console.log(v))
  • Rxjs监听DOM事件

fromEvent将指定元素上的事件转换成事件流

const btn = document.getElementById('btn');

// js常规代码
let pre = new Date().getTime();

btn.addEventListener('click',function(event){
  const now = new Date().getTime();
  const silence = now - pre;
  if(silence > 1000){
    pre = now;
    console.log(event);
  }
})

// rxjs代码
// fromEvent将制定元素上的事件转换成事件流
import { fromEvent } from 'rxjs'; 
import { throttleTime } from 'rxjs/operators';

const obs = fromEvent(btn,'click').pipe(
  throttleTime(1000),
);
obs.subscribe(v => console.log(v))
  • Rxjs处理网络请求
import { fromEvent,interval,from,merge } from 'rxjs'; 
import { mergeMap } from 'rxjs/operators';

const btn = document.getElementById('btn')
// DOM事件流---
const click = fromEvent(btn,'click');
// 程序逻辑流---
const inner = interval(3000);

const request = from(fetch('https://api.github.com/users').then(res => res.json()))
// 合并事件流
const response = merge(
  click,
  inner,
).pipe(
  mergeMap(_ => request)
)
response.subscribe(v => console.log(v))

操作符(Operators)

操作符是 Observable 类型上的方法,比如 .map(...)、.filter(...)、.merge(...),等等。其本质上是纯函数,当操作符被调用时,它们不会改变已经存在的 Observable 实例,是一个无副作用的操作。相反,它们返回一个新的 Observable ,它的 subscription 逻辑基于第一个 Observable 。

操作符有着不同的用途,它们可作如下分类:创建、转换、过滤、组合、错误处理、工具,等等。这里介绍一些常用的操作符。

一、创造observabl类

  • create
// RxJS v6+
import { Observable } from 'rxjs'; 
/*
  创建在订阅函数中发出 'Hello' 和 'World' 的 observable 。
*/
const hello = Observable.create(function(observer) {
  observer.next('Hello');
  observer.next('World');
});

// 输出: 'Hello'...'World'
const subscribe = hello.subscribe(val => console.log(val));
  • of

of类似于一个迭代器,将参数迭代然后发出。它接收任意多个参数,参数可以是任意类型,然后它会把这些参数逐个放入流中。

// RxJS v6+
import { of } from 'rxjs';
// 依次发出提供的任意数量的值
const source = of(1, 2, 3, 4, 5, { name: 'Brian' }, [1, 2, 3], function hello() {
  return 'Hello';
});
// 输出: 1,2,3,4,5,{name: 'Brian}, [1,2,3], function hello() { return 'Hello' }
const subscribe = source.subscribe(val => console.log(val));
  • from

from的参数必须是一个类数组(set,iterator,promise等),其他和of一样

// RxJS v6+
import { from } from 'rxjs';

// 将数组作为值的序列发出
const arraySource = from([1, 2, 3, 4, 5]);
// 输出: 1,2,3,4,5
const subscribe = arraySource.subscribe(val => console.log(val));
  • fromEvent

将事件转换成 observable 序列。

// RxJS v6+
import { fromEvent } from 'rxjs';
import { map } from 'rxjs/operators';

// 创建发出点击事件的 observable
const source = fromEvent(document, 'click');
// 映射成给定的事件时间戳
const example = source.pipe(map(event => `Event time: ${event.timeStamp}`));
// 输出 (示例中的数字以运行时为准): 'Event time: 7276.390000000001'
const subscribe = example.subscribe(val => console.log(val));
  • fromPromise

创建由 promise 转换而来的 observable,并发出 promise 的结果。

import { of } from 'rxjs/observable/of';
import { fromPromise } from 'rxjs/observable/fromPromise';
import { mergeMap, catchError } from 'rxjs/operators';

// 基于输入来决定是 resolve 还是 reject 的示例 promise
const myPromise = willReject => {
  return new Promise((resolve, reject) => {
    if (willReject) {
      reject('Rejected!');
    }
    resolve('Resolved!');
  });
};
// 先发出 true,然后是 false
const source = of(true, false);
const example = source.pipe(
  mergeMap(val =>
    fromPromise(myPromise(val)).pipe(
      // 捕获并优雅地处理 reject 的结果
      catchError(error => of(`Error: ${error}`))
    )
  )
);
// 输出: 'Error: Rejected!', 'Resolved!'
const subscribe = example.subscribe(val => console.log(val));
  • timer

timer有两个参数,第一个参数表示到发送第一个值的间隔时间,第二个参数表示从发送第二个参数开始,没发送一个值的间隔时间,如果第二个参数为空则发送第一个参数后,终止,执行complete函数。

// RxJS v6+
import { timer } from 'rxjs';
/*
  timer 接收第二个参数,它决定了发出序列值的频率,在本例中我们在1秒发出第一个值,
  然后每2秒发出序列值
*/
const source = timer(1000, 2000);
// 输出: 0,1,2,3,4,5......
const subscribe = source.subscribe(val => console.log(val));
  • interval

基于给定时间间隔发出数字序列。相当于 timer(1000, 1000),也就是说初始等待时间和间隔时间是一样的。

// RxJS v6+
import { interval } from 'rxjs';

// 每1秒发出数字序列中的值
const source = interval(1000);
// 数字: 0,1,2,3,4,5....
const subscribe = source.subscribe(val => console.log(val));

二、合并创建器

一般有DOM事件流和程序逻辑流,我们可以对这些流进行不同形式的合并,创建一个新的流,常见的合并方式有三种:并联、串联、拉链。

  • merge - 并联
// RxJS v6+
import { merge } from 'rxjs/operators';
import { interval, merge } from 'rxjs';

// 每2.5秒发出值
const first = interval(2500);
// 每1秒发出值
const second = interval(1000);
// 作为实例方法使用
const example = first.pipe(merge(second));
// 输出: 0,1,0,2....
const subscribe = example.subscribe(val => console.log(val));

// 从一个 observable 中发出输出值
const example1 = merge(
  first.pipe(mapTo('FIRST!')),
  second.pipe(mapTo('SECOND!')),
);
// 输出: "SECOND!", "FIRST!"
const subscribe1 = example1.subscribe(val => console.log(val));
  • concat - 串联

例如;两个流中的内容被按照顺序放进了输出流中,前面的流尚未结束时,后面的流就会一直等待。

// RxJS v6+
import { of, concat } from 'rxjs';

// 发出 1,2,3
const sourceOne = of(1, 2, 3);
// 发出 4,5,6
const sourceTwo = of(4, 5, 6);

// 作为静态方法使用
const example = concat(sourceOne, sourceTwo);
// 输出: 1,2,3,4,5,6
const subscribe = example.subscribe(val => console.log(val));
  • zip - 拉链

在所有 observables 发出后,将它们的值作为数组发出,通常用于合并两个数据有对应关系的数据源。

// RxJS v6+
import { delay } from 'rxjs/operators';
import { of, zip } from 'rxjs';

const sourceOne = of('Hello');
const sourceTwo = of('World!');
// 一直等到所有 observables 都发出一个值,才将所有值作为数组发出
const example = zip(
  sourceOne,
  sourceTwo.pipe(delay(1000)),
);
// 输出: ["Hello", "World!"]
const subscribe = example.subscribe(val => console.log(val));

三、错误处理

  • retry - 失败时重试

有些错误是可以通过重试进行恢复的,比如临时性的网络丢包。甚至一些流程的设计还会故意借助重试机制,比如当你发起请求时,如果后端发现你没有登录过,就会给你一个 401 错误,然后你可以完成登录并重新开始整个流程。

retry 操作符就是负责在失败时自动发起重试的,它可以接受一个参数,用来指定最大重试次数。

// RxJS v6+
import { interval, of, throwError } from 'rxjs';
import { mergeMap, retry } from 'rxjs/operators';

// 每1秒发出值
const source = interval(1000);
const example = source.pipe(
  mergeMap(val => {
    // 抛出错误以进行演示
    if (val > 5) {
      return throwError('Error!');
    }
    return of(val);
  }),
  // 出错的话可以重试2次
  retry(2)
);
/*
  输出:
  0..1..2..3..4..5..
  0..1..2..3..4..5..
  0..1..2..3..4..5..
  "Error!: Retried 2 times then quit!"
*/
const subscribe = example.subscribe({
  next: val => console.log(val),
  error: val => console.log(`${val}: Retried 2 times then quit!`)
});

四、选择器类

  • take

有的时候我门希望获取Observable前几个数然后结束(执行complete方法)

var source = interval(1000);
var example = source.pipe(take(3));

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});
// 0
// 1
// 2
// complete

取得首次点击的坐标

<div id="locationDisplay">
  Where would you click first?
</div>

// RxJS v6+
import { fromEvent } from 'rxjs';
import { take, tap } from 'rxjs/operators';

const oneClickEvent = fromEvent(document, 'click').pipe(
  take(1),
  tap(v => {
    document.getElementById(
      'locationDisplay'
    ).innerHTML = `Your first click was on location ${v.screenX}:${v.screenY}`;
  })
);

const subscribe = oneClickEvent.subscribe();
  • takeUntil

发出值,直到提供的 observable 发出值,它便完成。

// RxJS v6+
import { interval, timer } from 'rxjs';
import { takeUntil } from 'rxjs/operators';

// 每1秒发出值
const source = interval(1000);
// 5秒后发出值
const timer$ = timer(5000);
// 当5秒后 timer 发出值时, source 则完成
const example = source.pipe(takeUntil(timer$));
// 输出: 0,1,2,3
const subscribe = example.subscribe(val => console.log(val));
  • filter

过滤出符合给定条件的值。

// RxJS v6+
import { from } from 'rxjs';
import { filter } from 'rxjs/operators';

// 发出 ({name: 'Joe', age: 31}, {name: 'Bob', age:25})
const source = from([{ name: 'Joe', age: 31 }, { name: 'Bob', age: 25 }]);
// 过滤掉年龄小于30岁的人
const example = source.pipe(filter(person => person.age >= 30));
// 输出: "Over 30: Joe"
const subscribe = example.subscribe(val => console.log(`Over 30: ${val.name}`));
  • first

发出第一个值或第一个通过给定表达式的值,与之相对应的还有last。

// RxJS v6+
import { from } from 'rxjs';
import { first } from 'rxjs/operators';

const source = from([1, 2, 3, 4, 5]);
// 没有参数则发出第一个值
const example = source.pipe(first());
// 发出通过测试的第一项
const example = source.pipe(first(num => num === 5));
// 输出: "First to pass test: 5"
// 没有值通过的话则发出默认值
const example = source.pipe(first(val => val > 5, 'Nothing'));
// 输出: 'Nothing'
const subscribe = example.subscribe(val =>
  console.log(`First to pass test: ${val}`)
);

  • skip

跳过N个(由参数提供)发出值。

// RxJS v6+
import { from } from 'rxjs';
import { skip, filter } from 'rxjs/operators';

const numArrayObs = from([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);

// 3,4,5...
const skipObs = numArrayObs.pipe(skip(2)).subscribe(console.log);

// 3,4,5...
const filterObs = numArrayObs
  .pipe(filter((val, index) => index > 1))
  .subscribe(console.log);
// 同样的输出!
  • takeWhile

发出值,直到提供的表达式结果为 false 。

takeWhile() 和 filter() 的区别

// RxJS v6+
import { of } from 'rxjs';
import { takeWhile, filter } from 'rxjs/operators';

// 发出 3, 3, 3, 9, 1, 4, 5, 8, 96, 3, 66, 3, 3, 3
const source = of(3, 3, 3, 9, 1, 4, 5, 8, 96, 3, 66, 3, 3, 3);

// 允许值通过直到源发出的值不等于3,然后完成
// 输出: [3, 3, 3]
source
  .pipe(takeWhile(it => it === 3))
  .subscribe(val => console.log('takeWhile', val));

// 输出: [3, 3, 3, 3, 3, 3, 3]
source
  .pipe(filter(it => it === 3))
  .subscribe(val => console.log('filter', val));

  • reduce

将源 observalbe 的值归并为单个值,当源 observable 完成时将这个值发出。

// RxJS v6+
import { of } from 'rxjs';
import { reduce } from 'rxjs/operators';

const source = of(1, 2, 3, 4);
const example = source.pipe(reduce((acc, val) => acc + val));
// 输出: Sum: 10'
const subscribe = example.subscribe(val => console.log('Sum:', val));
  • scan

随着时间的推移进行归并。

// RxJS v6+
import { Subject } from 'rxjs';
import { scan } from 'rxjs/operators';

const subject = new Subject();
// scan 示例,随着时间的推移构建对象
const example = subject.pipe(
  scan((acc, curr) => Object.assign({}, acc, curr), {})
);
// 输出累加值
const subscribe = example.subscribe(val =>
  console.log('Accumulated object:', val)
);
// subject 发出的值会被添加成对象的属性
// {name: 'Joe'}
subject.next({ name: 'Joe' });
// {name: 'Joe', age: 30}
subject.next({ age: 30 });
// {name: 'Joe', age: 30, favoriteLanguage: 'JavaScript'}
subject.next({ favoriteLanguage: 'JavaScript' });
  • concatMap 和 mergeMap

concatMap将值映射成内部 observable,并按顺序订阅和发出。mergeMap映射成 observable 并发出值。

concatMap 和 mergeMap 之间的区别

// RxJS v6+
import { of } from 'rxjs';
import { concatMap, delay, mergeMap } from 'rxjs/operators';

// 发出延迟值
const source = of(2000, 1000);
// 将内部 observable 映射成 source,当前一个完成时发出结果并订阅下一个
const example = source.pipe(
  concatMap(val => of(`Delayed by: ${val}ms`).pipe(delay(val)))
);
// 输出: With concatMap: Delayed by: 2000ms, With concatMap: Delayed by: 1000ms
const subscribe = example.subscribe(val =>
  console.log(`With concatMap: ${val}`)
);

// 展示 concatMap 和 mergeMap 之间的区别
const mergeMapExample = source
  .pipe(
    // 只是为了确保 meregeMap 的日志晚于 concatMap 示例
    delay(5000),
    mergeMap(val => of(`Delayed by: ${val}ms`).pipe(delay(val)))
  )
  .subscribe(val => console.log(`With mergeMap: ${val}`));

使用 promise 进行 mergeMap

// RxJS v6+
import { of } from 'rxjs';
import { mergeMap } from 'rxjs/operators';

// 发出 'Hello'
const source = of('Hello');
// mergeMap 还会发出 promise 的结果
const myPromise = val =>
  new Promise(resolve => resolve(`${val} World From Promise!`));
// 映射成 promise 并发出结果
const example = source.pipe(mergeMap(val => myPromise(val)));
// 输出: 'Hello World From Promise'
const subscribe = example.subscribe(val => console.log(val));

  • switchMap

映射成 observable,完成前一个内部 observable,发出值。switchMap 和其他打平操作符的主要区别是它具有取消效果。在每次发出时,会取消前一个内部 observable (你所提供函数的结果) 的订阅,然后订阅一个新的 observable 。你可以通过短语切换成一个新的 observable来记忆它。

每次点击时重置

// RxJS v6+
import { interval, fromEvent } from 'rxjs';
import { switchMap, mapTo } from 'rxjs/operators';

// 发出每次点击
const source = fromEvent(document, 'click');
// 如果3秒内发生了另一次点击,则消息不会被发出
const example = source.pipe(
  switchMap(val => interval(3000).pipe(mapTo('Hello, I made it!')))
);
// (点击)...3s...'Hello I made it!'...(点击)...2s(点击)...
const subscribe = example.subscribe(val => console.log(val));

  • exhaustMap

映射成内部 observable,忽略其他值直到该 observable 完成。

// RxJS v6+
import { interval, merge, of } from 'rxjs';
import { delay, take, exhaustMap } from 'rxjs/operators';

const sourceInterval = interval(1000);
const delayedInterval = sourceInterval.pipe(delay(10), take(4));

const exhaustSub = merge(
  // 延迟10毫秒,然后开始 interval 并发出4个值
  delayedInterval,
  // 立即发出
  of(true)
)
  .pipe(exhaustMap(_ => sourceInterval.pipe(take(5))))
  /*
   *  第一个发出的值 (of(true)) 会被映射成每秒发出值、 
   *  5秒后完成的 interval observable 。
   *  因为 delayedInterval 的发送是晚于前者的,虽然 observable 
   *  仍然是活动的,但它们会被忽略。
   *
   *  与类似的操作符进行下对比:
   *  concatMap 会进行排队
   *  switchMap 会在每次发送时切换成新的内部 observable
   *  mergeMap 会为每个发出值维护新的 subscription
   */
  // 输出: 0, 1, 2, 3, 4
  .subscribe(val => console.log(val));

五、工具类

  • delay

根据给定时间延迟发出值。

  • let

拥有完整的 observable 。

// 将数组作为序列发出
const source = Rx.Observable.from([1, 2, 3, 4, 5]);

// 传入你自己的函数来将操作符添加到 observable 
const obsArrayPlusYourOperators = yourAppliedOperators => {
  return source.map(val => val + 1).let(yourAppliedOperators);
};
const addTenThenTwenty = obs => obs.map(val => val + 10).map(val => val + 20);
const subscribe = obsArrayPlusYourOperators(addTenThenTwenty)
    // 输出: 32, 33, 34, 35, 36
    .subscribe(val => console.log('let FROM FUNCTION:', val));
  • toPromise

将 observable 转换成 promise 。

// 返回基础的 observable
const sample = val => Rx.Observable.of(val).delay(5000);
// 将基础的 observable 转换成 promise
const example = sample('First Example')
  .toPromise()
  // 输出: 'First Example'
  .then(result => {
    console.log('From Promise:', result);
  });

总结:

响应式优点:使用声明语句来获取动态的值,语义化,状态隔离;

响应式思维:Everything is observable; 无论是数组,DOM事件还是网络请求,都可以被抽象成按照时间序列发生的事件。

相关文章

网友评论

      本文标题:Rxjs

      本文链接:https://www.haomeiwen.com/subject/oypunctx.html