Observable - 观察者模式的理解

作者: lolivialucky | 来源:发表于2017-12-01 13:50 被阅读83次

    创建一个Observable的代码

    const hello = Rx.Observable.create(function(observer) {
       observer.next('Hello');
       observer.next('World');
    });
    
    • create传入的参数为一个subscriber函数,该函数有一个参数为observer,observer是一个观察者对象,该对象有三个属性,属性值为函数。如下:
    var observer = {
      next: x => console.log('Observer got a next value: ' + x),
      error: err => console.error('Observer got an error: ' + err),
      complete: () => console.log('Observer got a complete notification'),
    };
    
    • 当observable被订阅的时候,即observable.subscribe(function next(x), function error(x), function complete(x)), 创建该observable时传入的回调函数即被执行,observer调用next方法,并传入参数(即发射值),发射出去的值则作为subscribe中回调函数的参数,即传入function next(x)中的x,并执行该回调函数。 自此只要被观察者发生了变化,就会调用create的回调函数将变化的值通知观察者,并执行对应observer的next()方法。

    Observable - 被观察者
    Observable.subscribe(observer) - 对被观察者进行订阅
    Observer - 观察者
    创建被观察者时,传入一个订阅函数,当观察者对被观察者进行订阅时,就调用这个订阅函数,订阅函数的作用是向观察者发射数据,通知数据(流)的变化。观察者根据发射过来的不同数据,自行处理对应处理函数里的逻辑。

    源码

    看看源码做了什么:

    创建Observable的过程:

    (1) 将create()的参数(一个匿名函数function(observer){})作为参数传入构造函数,并保存该匿名函数到类变量this._subscribe。

    Observable.create = function (subscribe) {
            return new Observable(subscribe);
        };
        return Observable;
    
    function Observable(subscribe) {
            this._isScalar = false;
            if (subscribe) {
                this._subscribe = subscribe;
            }
    
    调用observable.subscribe的过程:

    (1) 将subscribe()里的回调函数传入。
    (2) 通过toSubscriber_1.toSubscriber包装成一个Subsciber对象即下文中的sink。该对象有一些属性,其中包括next, error, complete属性分别指向subscribe中对应的回调函数。并通过this._subscribe(sink),将sink传值给Rx.Observable.create(function(observer) {..})中的observer参数。
    (3) 通过this._subscribe(sink)调用了create()里的回调函数即function(observer) { observer.next('Hello');},并执行对应的next()函数,并传入对应的参数。

    Observable.prototype.subscribe = function (observerOrNext, error, complete) {
            var operator = this.operator;
            var sink = toSubscriber_1.toSubscriber(observerOrNext, error, complete);
            if (operator) {
                operator.call(sink, this.source);
            }
            else {
                sink.add(this.source ? this._subscribe(sink) : this._trySubscribe(sink));
            }
            if (sink.syncErrorThrowable) {
                sink.syncErrorThrowable = false;
                if (sink.syncErrorThrown) {
                    throw sink.syncErrorValue;
                }
            }
            return sink;
        };
    Observable.prototype._trySubscribe = function (sink) {
            try {
                return this._subscribe(sink);
            }
            catch (err) {
                sink.syncErrorThrown = true;
                sink.syncErrorValue = err;
                sink.error(err);
            }
        };
    

    相关文章

      网友评论

        本文标题:Observable - 观察者模式的理解

        本文链接:https://www.haomeiwen.com/subject/jktwbxtx.html