美文网首页
rxjs 自定义 operator

rxjs 自定义 operator

作者: chenjinchao1997 | 来源:发表于2020-06-24 09:31 被阅读0次

    查看 rxjs 源码,我们取一个比较简单的操作符 every 作为例子。从 every 的实现可以看到,every 函数调用 source 的 lift 函数,传入自定义的两个类 EveryOperator 和 EverySubscriber。接下来看代码注释。

    // Observable.js
    import { canReportError } from "./util/canReportError";
    import { toSubscriber } from "./util/toSubscriber";
    import { observable as Symbol_observable } from "./symbol/observable";
    import { pipeFromArray } from "./util/pipe";
    import { config } from "./config";
    export class Observable {
      constructor(subscribe) {
        this._isScalar = false;
        if (subscribe) {
          this._subscribe = subscribe;
        }
      }
      // 返回一个新的Observable,将source置为自身,operator置为输入的operator
      lift(operator) {
        const observable = new Observable();
        observable.source = this;
        observable.operator = operator;
        return observable;
      }
      subscribe(observerOrNext, error, complete) {
        const { operator } = this;
        const sink = toSubscriber(observerOrNext, error, complete);
        if (operator) {
          // 下面设置的EveryOperator call方法就是在这里使用
          // 这里传入Subscriber 并且把自身的上一个Observable传过去
          sink.add(operator.call(sink, this.source));
        } else {
          sink.add(
            this.source ||
              (config.useDeprecatedSynchronousErrorHandling &&
                !sink.syncErrorThrowable)
              ? this._subscribe(sink)
              : this._trySubscribe(sink)
          );
        }
        if (config.useDeprecatedSynchronousErrorHandling) {
          if (sink.syncErrorThrowable) {
            sink.syncErrorThrowable = false;
            if (sink.syncErrorThrown) {
              throw sink.syncErrorValue;
            }
          }
        }
        return sink;
      }
      ...
    }
    
    // every.js
    
    import { Subscriber } from "../Subscriber";
    export function every(predicate, thisArg) {
      // 返回一个生成新Observable(source为传入参数,operator为every)的函数
      return (source) => source.lift(new EveryOperator(predicate, thisArg, source));
    }
    class EveryOperator {
      constructor(predicate, thisArg, source) {
        this.predicate = predicate;
        this.thisArg = thisArg;
        this.source = source;
      }
      // 当拥有当前Operator的Observable被subscribe的时候 会调用call(subscriber, this.source)
      // observer subscribe函数传入参数构建的
      // 这里的source是当前Observable的上一个Observable
      call(observer, source) {
        // 这里订阅了上一个Observable
        return source.subscribe(
          // 这里实际上就会生成一个标准的Observer/Subscriber 有next error complete
          // 这里的this.source 跟source是同一个source(应该)
          new EverySubscriber(observer, this.predicate, this.thisArg, this.source)
        );
      }
    }
    // 继承Subscriber
    class EverySubscriber extends Subscriber {
      // destination 就是给我们调用next 传入处理后的值
      // 从source进行订阅
      constructor(destination, predicate, thisArg, source) {
        super(destination);
        this.predicate = predicate;
        this.thisArg = thisArg;
        this.source = source;
        this.index = 0;
        this.thisArg = thisArg || this;
      }
      notifyComplete(everyValueMatch) {
        // every的性质导致他只会调用一次next 就马上到complete
        this.destination.next(everyValueMatch);
        this.destination.complete();
      }
      // 每次有值过来 就会调用到_next
      _next(value) {
        let result = false;
        try {
          result = this.predicate.call(
            this.thisArg,
            value,
            this.index++,
            this.source
          );
        } catch (err) {
          this.destination.error(err);
          return;
        }
        if (!result) {
          this.notifyComplete(false);
        }
      }
      // 上游complete 触发_complete
      _complete() {
        this.notifyComplete(true);
      }
    }
    //# sourceMappingURL=every.js.map
    

    当然,我们自己去封装操作符的时候不一定要按照它的逻辑。我们通过上面代码的分析,我们可以一个操作符函数为

    const myoperator = (myargs) => (source) => new Observable();
    

    其中,你要返回一个 Observable,在构造 Observable 时通过 subscribe 上一个 Observable 即 source,进行 next 向下游发出值。

    这个 Observable 的构造方式可以是通过 Observable.prototype.lift (source.lift)构建,通过传入一个带有 call 方法的 Operator 类,而这个类的 call 方法返回一个 Subscription。

    下面做一个简单的 square 函数示例。

    // 使用Observable.create
    const square = () => (source) =>
      Observable.create((subscriber) => {
        const subscription = source.subscribe(
          (value) => {
            subscriber.next(value * value);
          },
          (err) => {
            subscriber.error(err);
          },
          () => {
            subscriber.complete();
          }
        );
        // 这里要返回subscription 参考TeardownLogic
        return subscription;
      });
    // 使用lift
    class SquareOperator {
      constructor(thisArg, source) {
        this.source = source;
      }
      call(subscriber, source) {
        return source.subscribe({
          next: (value) => {
            subscriber.next(value * value);
          },
          error: (error) => {
            subscriber.error(error);
          },
          complete: () => {
            subscriber.complete;
          },
        });
      }
    }
    const square = () => (source, thisArg) =>
      source.lift(new SquareOperator(thisArg, source));
    

    相关文章

      网友评论

          本文标题:rxjs 自定义 operator

          本文链接:https://www.haomeiwen.com/subject/mnglfktx.html