美文网首页
RxJava2 just操作符

RxJava2 just操作符

作者: 百吉猫 | 来源:发表于2018-01-16 16:11 被阅读0次

最基本的观察者模式调用,观察者,被观察者,订阅

    Observable.just("A")
            .subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                }
            });

我们需要明白三点,1.被观察者Observable何时创建?2.观察者Observer何时创建?3.被观察者与观察者如何subscribe订阅?首先查看Observable、Observer是如何定义的。

被观察者Observable为抽象类 实现 ObservableSource接口,ObservableSource接口中定义了subscribe(Observer observer)订阅观察者方法。
    public abstract class Observable<T> implements ObservableSource<T> {
      //实现ObservableSource subscribe()方法,调用自己的抽象方法subscribeActual()
        @Override
        public final void subscribe(Observer<? super T> observer) {
                observer = RxJavaPlugins.onSubscribe(this, observer);
                subscribeActual(observer);
        }
        protected abstract void subscribeActual(Observer<? super T> observer);
    }

    public interface ObservableSource<T> {
        void subscribe(@NonNull Observer<? super T> observer);
    }
观察者Observer
    public interface Observer<T> {
        //订阅时回调
        void onSubscribe(@NonNull Disposable d);
        //成功回调
        void onNext(@NonNull T t);
        //错误回调
        void onError(@NonNull Throwable e);
        //完成时回调
        void onComplete();
    }
首先看代码第一行Observable.just("A")
    public static <T> Observable<T> just (T item){
        return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
    }

先创建ObservableJust对象,然后调用RxJavaPlugins.onAssembly方法返回Observable对象

    public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {
        private final T value;
        public ObservableJust(final T value) {
            this.value = value;
        }
        @Override
        protected void subscribeActual(Observer<? super T> s) {
            ObservableScalarXMap.ScalarDisposable<T> sd = new   ObservableScalarXMap.ScalarDisposable<T>(s, value);
            s.onSubscribe(sd);
            sd.run();
        }
    }
   //onAssembly方法中 onObservableAssembly在just操作符时无意义,直接返回Observable<T> source对象
    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }
Observable<T> just方法返回Observable对象即为ObservableJust对象,也就是真实的被观察着对象,所以Observable. just方法调用开时是即创建ObservableJust被观察者对象
接着看代码第二行Observable.just("A").subscribe()
public final Disposable subscribe(Consumer<? super T> onNext) {
    return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}
//最终会调用全参subscribe方法
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
        Action onComplete, Consumer<? super Disposable> onSubscribe) {
    LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
    subscribe(ls);
    return ls;
}
//首先创建LambdaObserver对象,然后调用subscribe(Observer<? super T> observer)方法
public final class LambdaObserver<T> implements Observer<T>, Disposable {
    public LambdaObserver(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
                          Action onComplete,
                          Consumer<? super Disposable> onSubscribe) {
        super();
        //省略代码
    }
}
此时创建了LambdaObserver观察者对象,然后调用Observable.subscribe()完成订阅事件。整个观察者模式调用结束。所以在Observable.just("A").subscribe()时就创建了观察者对象并进行订阅。

那么观察者模式已经创建成功具体是怎样执行的呢?just操作符时是如何回调到Consumer.accept()方法当中的呢?
此时我们的被察者对象为ObservableJust,观察者对象为LambdaObserver,真是订阅方法为subscribe(Observer<? super T> observer)。

 public final void subscribe(Observer<? super T> observer) {
        observer = RxJavaPlugins.onSubscribe(this, observer);
        subscribeActual(observer);
        throw npe;
    }
}
//RxJavaPlugins.onSubscribe此时意义不大直接返回Observer观察者对象
public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
    BiFunction<? super Observable, ? super Observer, ? extends Observer> f = onObservableSubscribe;
    if (f != null) {
        return apply(f, source, observer);
    }
    return observer;
}

最终调用subscribeActual()方法,Observable中的subscribeActual()即是ObservableJust的实现方法,此时又回到ObservableJust中

public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {

private final T value;
public ObservableJust(final T value) {
    this.value = value;
}

@Override
protected void subscribeActual(Observer<? super T> s) {
    ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
    s.onSubscribe(sd);
    sd.run();
}

@Override
public T call() {
    return value;
}
}

此时创建ScalarDisposable对象,然后执行Observer中的onSubscribe()方法,所以观察者onSubscribe()是在订阅时被调用,也就是在事件执行之前调用。
然后执行ScalarDisposable的run()方法。

public static final class ScalarDisposable<T> implements Runnable {
    final Observer<? super T> observer;
    final T value;
    static final int START = 0;
    static final int FUSED = 1;
    static final int ON_NEXT = 2;
    static final int ON_COMPLETE = 3;
    public ScalarDisposable(Observer<? super T> observer, T value) {
        this.observer = observer;
        this.value = value;
    }
    @Override
    public void run() {
        if (get() == START && compareAndSet(START, ON_NEXT)) {
            observer.onNext(value);
            if (get() == ON_NEXT) {
                lazySet(ON_COMPLETE);
                observer.onComplete();
            }
        }
    }
}

run()方法中首先调用Observer的onNext()方法,此时观察者收到成功回调
然后调用Observer的onComplete()方法,此时观察者收到完成回调,整个观察者模式执行结束。

相关文章

网友评论

      本文标题:RxJava2 just操作符

      本文链接:https://www.haomeiwen.com/subject/fyvsoxtx.html