美文网首页程序员
从一个简单的rxjs demo来看rxjs部分源码

从一个简单的rxjs demo来看rxjs部分源码

作者: Yard | 来源:发表于2018-10-18 17:18 被阅读219次

    demo 地址 https://github.com/YardWill/beginner-reactive-programming-with-rxjs

    我们通过四个例子来看rxjs.

    1. example-1

    const promise = new Promise((resolve) => {
      setTimeout(() => {
        resolve('Hello from a Promise!');
      }, 2000);
    });
    
    promise.then(value => console.log(value));
    

    这是一个简单的promise实现2000ms之后的输出。我们接下来使用rxjs来试试如何实现这个功能。

    2. example-2

    import { Observable } from 'rxjs/Observable';
    
    const observable = new Observable((observer) => {
      setTimeout(() => {
        observer.next('Hello from a Observable!');
      }, 2000);
    });
    
    observable.subscribe(value => console.log(value));
    

    我们先通过Observable的接口来注册一个可观测对象,(如果对观察者模式不太熟悉的话可以先看这篇文章观察者简单实现

    之后我们使用subscribe来订阅一个事件(subscribe函数内部执行在observer内注册的函数),当observer触发next的时候就执行subscribe内注册的函数。

    我们来看看源码。
    以下是Observable的构造函数,可以看出,除了绑定subscribe函数之外,什么都没有做。

    constructor(subscribe?: (this: Observable<T>, subscriber: Subscriber<T>) => TeardownLogic) {
      if (subscribe) {
        this._subscribe = subscribe;
      }
    }
    

    下面是Observable类上的subscribe方法。简单来讲就是去执行了在constructor内注册的subscribe函数,并将observerOrNext push到subscriptions内。然后return的对象是一个subscriptions。

      subscribe(observerOrNext?: PartialObserver<T> | ((value: T) => void),
                error?: (error: any) => void,
                complete?: () => void): Subscription {
    
        const { operator } = this;
        const sink = toSubscriber(observerOrNext, error, complete);
    
        if (operator) {
          operator.call(sink, this.source);
        } else {
          sink.add(
            this.source || (config.useDeprecatedSynchronousErrorHandling && !sink.syncErrorThrowable) ?
            this._subscribe(sink) :
            this._trySubscribe(sink)
          );
        }
    
        if (config.useDeprecatedSynchronousErrorHandling) {
          if (sink.syncErrorThrowable) {
            sink.syncErrorThrowable = false;
            if (sink.syncErrorThrown) {
              throw sink.syncErrorValue;
            }
          }
        }
    
        return sink;
      }
    

    你一定觉得很疑惑Observable、subscribe、subscriptions这些名字都代表什么意思。我们下面来讲一个通俗的故事。

    从前有一家鲜奶公司,提供不同种类的鲜奶,每天都会送货一次。有一天,小明花了20/天的钱去订了这家公司的进口鲜奶,小花花了10元/天的钱订了国产鲜奶。那么公司收到的订单列表就是这两个,公司会在固定时间去把鲜奶送到用户的手上。

    我们看看这个故事里面的observable、subscribe、subscriptions分别是什么?

    1. observable: 毫无疑问,鲜奶公司是一个可订阅对象,我们可以向鲜奶公司订阅我们需要的鲜奶。
    2. subscribe: subscribe这是一个动作,小明和小花去定了这家鲜奶公司的牛奶。
    3. subscriptions: 订完牛奶后,小明和小花的订单就已经在鲜奶公司的订单列表上了,这个订单列表就是subscriptions。
    4. 另外,我们将setTimeout改成setInterval,这时我们就可以想象鲜奶公司每天都会触发发货的工作,也就是执行next方法。next方法可以当做是鲜奶公司对照着订单列表对小明和小花进行发货。

    这样看起来,理解这几个对象应该不难了吧。我们把这个故事改编成代码。

    import { Observable } from 'rxjs/Observable';
    
    // 鲜奶公司
    const interval$ = new Observable((observer) => {
      let count = 0;
      const interval = setInterval(() => {
        console.log('鲜奶公司准时发货');
        observer.next(count += 1);
      }, 1000);
    
      return () => {
        clearInterval(interval);
      };
    });
    
    // 小明订奶
    const littleMing = count => console.log('小明收到', count, '瓶奶');
    // 小花订奶
    const littleHua = count => console.log('小花收到', count, '瓶奶');
    const subscription1 = interval$.subscribe(littleMing);
    const subscription2 = interval$.subscribe(littleHua);
    

    subscriptions包含[subscription1, subscription2]。
    那么接下来我们再来思考一个问题,如果小明不想继续订牛奶了,他应该怎么通知鲜奶公司不再发货?我们来看 example-3

    3. example-3

    我们通过上面的故事来修改一下我们的代码。

    import { Observable } from 'rxjs/Observable';
    
    // 鲜奶公司
    const interval$ = new Observable((observer) => {
      let count = 0;
      const interval = setInterval(() => {
        console.log('鲜奶公司准时发货');
        observer.next(count += 1);
      }, 1000);
    
      return () => {
        clearInterval(interval);
      };
    });
    
    // 小明订奶
    const littleMing = count => console.log('小明收到', count, '瓶奶');
    // 小花订奶
    const littleHua = count => console.log('小花收到', count, '瓶奶');
    const subscription1 = interval$.subscribe(littleMing);
    const subscription2 = interval$.subscribe(littleHua);
    setTimeout(() => subscription1.unsubscribe(), 3000);
    

    我们可以看到在最后我们把subscription1给unsubscribe了,我们看看unsubscribe函数内做了什么?

      /**
       * Disposes the resources held by the subscription. May, for instance, cancel
       * an ongoing Observable execution or cancel any other type of work that
       * started when the Subscription was created.
       * @return {void}
       */
      unsubscribe(): void {
        let hasErrors = false;
        let errors: any[];
    
        if (this.closed) {
          return;
        }
    
        let { _parent, _parents, _unsubscribe, _subscriptions } = (<any> this);
    
        this.closed = true;
        this._parent = null;
        this._parents = null;
        // null out _subscriptions first so any child subscriptions that attempt
        // to remove themselves from this subscription will noop
        this._subscriptions = null;
    
        let index = -1;
        let len = _parents ? _parents.length : 0;
    
        // if this._parent is null, then so is this._parents, and we
        // don't have to remove ourselves from any parent subscriptions.
        // 移除subscription
        while (_parent) {
          _parent.remove(this);
          // if this._parents is null or index >= len,
          // then _parent is set to null, and the loop exits
          _parent = ++index < len && _parents[index] || null;
        }
    
        if (isFunction(_unsubscribe)) {
          let trial = tryCatch(_unsubscribe).call(this);
          if (trial === errorObject) {
            hasErrors = true;
            errors = errors || (
              errorObject.e instanceof UnsubscriptionError ?
                flattenUnsubscriptionErrors(errorObject.e.errors) : [errorObject.e]
            );
          }
        }
        
        if (isArray(_subscriptions)) {
    
          index = -1;
          len = _subscriptions.length;
    
          while (++index < len) {
            const sub = _subscriptions[index];
            if (isObject(sub)) {
              let trial = tryCatch(sub.unsubscribe).call(sub);
              if (trial === errorObject) {
                hasErrors = true;
                errors = errors || [];
                let err = errorObject.e;
                if (err instanceof UnsubscriptionError) {
                  errors = errors.concat(flattenUnsubscriptionErrors(err.errors));
                } else {
                  errors.push(err);
                }
              }
            }
          }
        }
    
        if (hasErrors) {
          throw new UnsubscriptionError(errors);
        }
      }
    

    这里的代码理解起来也不难,最后去执行了clearInterval(interval)将定时器去掉,并把当前的订单(subscription)移除出订单列表(subscriptions)。

    4. example-4

    接下来小明和小花都有各自的需求改变,比如小明想要每天两瓶奶,而小花需要隔天收到一瓶奶,那么我们应该怎么做呢?看下面代码。

    import { Observable } from 'rxjs/Observable';
    import 'rxjs/add/operator/map';
    import 'rxjs/add/operator/filter';
    
    // 鲜奶公司
    const interval$ = new Observable<number>((observer) => {
      let count = 0;
      const interval = setInterval(() => {
        console.log('鲜奶公司准时发货');
        observer.next(count += 1);
      }, 1000);
    
      return () => {
        clearInterval(interval);
      };
    });
    
    // 小明订奶
    const littleMing = count => console.log('小明收到', count, '瓶奶');
    // 小花订奶
    const littleHua = count => console.log('小花收到', count, '瓶奶');
    
    // 小明打算每天多订一份的鲜奶
    const subscription1 = interval$
      .map(value => value * 2)
      .subscribe(littleMing);
    
    // ----1----2----3----4--->
    //      map => x * 2
    // ----2----4----6----8--->
    
    // 小花打算让鲜奶公司隔天送一瓶
    const subscription2 = interval$
      .filter(value => value % 2 === 0)
      .map(value => value / 2)
      .subscribe(littleHua);
    
    // ----1----2----3----4--->
    //      filter & map
    // ---------1---------2--->
    

    在这里我们引入了map和filter这两个rx的操作符,来实现我们需要变更的需求,看起来是不是很简单。当然还有更多的操作符(我们就不一一介绍了),操作符也是rxjs内的很大一部分组成,可以把它比作是lodash内的工具类。

    5. example-5

    我们最后再来看看rxjs在前端事件监听上的用法。

    import { Observable } from 'rxjs/Observable';
    import 'rxjs/add/observable/fromEvent';
    import 'rxjs/add/observable/merge';
    import 'rxjs/add/operator/scan';
    import 'rxjs/add/operator/map';
    
    const incrementClicks$ = Observable.fromEvent(document.getElementById('increment'), 'click');
    const decrementClicks$ = Observable.fromEvent(document.getElementById('decrement'), 'click');
    
    Observable
      .merge(incrementClicks$, decrementClicks$)
      .map((event: any) => parseInt(event.target.value, 10))
      .scan((total, value) => total + value, 0)
      .subscribe((total) => {
        document.getElementById('counter').innerText = total.toString();
      });
    

    merge将两个Observable对象合成一个Observable对象,然后map对数据进行操作。
    scan可以比作一个reducer函数,每一次click之后会对数据进行处理,并保留之前的数据。
    最后我们去subscribe这个事件,然后做出相应修改。

    个人博客 https://www.yardwill.com/

    相关文章

      网友评论

        本文标题:从一个简单的rxjs demo来看rxjs部分源码

        本文链接:https://www.haomeiwen.com/subject/tepjzftx.html