美文网首页
02RxJS创建Observables和Subscription

02RxJS创建Observables和Subscription

作者: learninginto | 来源:发表于2021-01-02 23:48 被阅读0次
    创建Observables

    可以用new Observable创建,但实际情况更多的是用of、from、interval等操作符创建。

    Observer

    Observer用户获取到Observable推送的值,Observers是一系列回调函数,也就是Observable.subscribe的回调函数。

    Observable.subscribe(x => console.log(x))
    

    Observable.subscribe方法有三个回调函数,上面写的是其中最常用的一个

    • Next : 接收Observable推送过来的值
    • Error :接收错误对象
    • Complete : 推送结束时触发(即使出现error),不会收到任何值
    newObservable() {
      const observable$ = new Observable(subscriber => {
        subscriber.next(1)
        // subscriber.error(new Error('出错了'))
        subscriber.next(2)
        subscriber.complete();
        subscriber.next(3)
      })
    
      observable$.subscribe(res => {
        console.log('res', res)
      }, error => {
        console.log('error', error)
      }, () => {
        console.log('complete')
      })
    }
    //res 1
    //res 2 
    //res complete
    

    注:一旦推送Error,就不会再执行后续的推送,包括complete

    • 可以用try/catch捕获错误
    const observable = new Observable(subscribe => {
        try{
            subscriber.next(1);
            subscriber.next(2);
            subscriber.next(3);
            subscriber.complete();
        }catch(err){
            subscriber.error(err)
        }
    })
    
    • 完整的写法

    这里我们可以根据需要,去掉不需要的区块

    observable$.subscribe({
      next(res) {
        console.log('res', res)
      },
      error(error) {
        console.log('error', error)
      },
      complete() {
        console.log('complete')
      }
    })
    
    • 同步操作数据
    newObservable() {
      const observable$ = new Observable(subscriber => {
        subscriber.next(1)
        subscriber.next(2)
        subscriber.next(3)
      })
      console.log('before')
      observable$.subscribe(res => {
        console.log('res', res)
      })
      console.log('after')
    }
    //before
    //res 1
    //res 2
    //res 3
    //after
    
    • 异步操作数据
    newObservable() {
      const observable$ = new Observable(subscriber => {
        subscriber.next(1)
        subscriber.next(2)
        subscriber.next(3)
        setTimeout(() => {
          subscriber.next(300);
        }, 1000)
      })
    
      console.log('before')
      observable$.subscribe(res => {
        console.log('res', res)
      })
      console.log('after')
    }
    //before
    //res 1
    //res 2
    //res 3
    //after
    //300(1秒后)
    
    
    Subscription订阅
    import {interval} from 'rxjs';
    newObservable(){
        const observable = interval(1000);
        const subscription = observable.subscribe(x => console.log(x))
        setTimeout(()=>{
            subscription.unsubscribe();
        },3000)
    }
    

    订阅可以放在一起,以便unsubscribe()对一个订阅中的一个调用可以取消多个订阅。

    import {interval} from 'rxjs'
    newObservable(){
        const observable1 = interval(800);
        const observalbe2 = interval(400);
        
        const subscription = observable1.subscribe(x => console.log('first' + x))
        const childSubscription = observable2.subscribe(x => console.log('second:' + x))
        
        subscription.add(childSubscription);
        
        setTimeout(()=>{
            subscription.unsubscribe();
        },3000)
    }
    
    

    由于通过subscription.add(childSubscription)操作将childSubscription添加到了subscirption中;所以,在取消订阅时,只需要取消父级的操作。

    • 引入

    创建类的操作符在rxjs下,功能类的操作符在rxjs/operators下

    import { AsyncSubject, BehaviorSubject, combineLatest, concat, ConnectableObservable, EMPTY, empty, forkJoin, from, fromEvent, iif, interval, merge, observable, Observable, of, partition, race, range, ReplaySubject, Subject, throwError, timer, zip } from 'rxjs';
    import { audit, auditTime, buffer, bufferCount, bufferTime, bufferToggle, bufferWhen, catchError, combineAll, concatAll, concatMap, concatMapTo, count, debounce, debounceTime, defaultIfEmpty, delay, delayWhen, distinct, distinctUntilChanged, distinctUntilKeyChanged, elementAt, endWith, every, exhaust, exhaustMap, filter, find, findIndex, first, groupBy, ignoreElements, isEmpty, last, map, mapTo, max, mergeAll, mergeMap, mergeMapTo, mergeScan, multicast, pairwise, pluck, publish, publishBehavior, publishLast, publishReplay, reduce, refCount, retry, retryWhen, sample, sampleTime, scan, share, single, skip, skipLast, skipUntil, skipWhile, startWith, switchMap, switchMapTo, take, takeLast, takeUntil, takeWhile, tap, throttle, throttleTime, timeInterval, timeout, timeoutWith, timestamp, toArray, withLatestFrom } from 'rxjs/operators';
    
    

    相关文章

      网友评论

          本文标题:02RxJS创建Observables和Subscription

          本文链接:https://www.haomeiwen.com/subject/asnyoktx.html