美文网首页RxJava
Rxjava--Single.create与Observable

Rxjava--Single.create与Observable

作者: 冉桓彬 | 来源:发表于2019-12-10 16:27 被阅读0次

一、Single.create

1.1 singleTest
public void singleTest() {
    Single.create(new SingleOnSubscribe<String>() {
        @Override
        public void subscribe(SingleEmitter<String> emitter) {

        }
    }).subscribe(new SingleObserver<String>() {
        @Override
        public void onSubscribe(Disposable d) {}

        @Override
        public void onSuccess(String s) {}

        @Override
        public void onError(Throwable e) {}
    });
}
1.2 Single.create
public static <T> Single<T> create(SingleOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new SingleCreate<T>(source));
}

  Single.create返回SingleCreate对象, 该对象持有SingleOnSubscribe的引用, 继续向下查看SingleCreate.subscribe的流程.

1.2 SingleCreate.subscribe
public final void subscribe(SingleObserver<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");

    observer = RxJavaPlugins.onSubscribe(this, observer);

    ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null SingleObserver. Please check the handler provided to RxJavaPlugins.setOnSingleSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

    try {
        subscribeActual(observer);
    } catch (NullPointerException ex) {
        throw ex;
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        NullPointerException npe = new NullPointerException("subscribeActual failed");
        npe.initCause(ex);
        throw npe;
    }
}

  触发SingleCreate.subscribeActual的执行.

1.3 SingleCreate.subscribeActual
@Override
protected void subscribeActual(SingleObserver<? super T> observer) {
    Emitter<T> parent = new Emitter<T>(observer);
    observer.onSubscribe(parent);
    try {
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

  Single与Observable的区别就在subscribeActual具体实现中的Emitter实现类的不同.

二、Observable.create

  同样的方式打开observableTest的流程

public void observableTest() {
    Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) {

        }
    }).subscribe(new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {}

        @Override
        public void onNext(String s) {}

        @Override
        public void onError(Throwable e) {}

        @Override
        public void onComplete() {}
    });
}

ObservableCreate.subscribeActual
@Override
protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    observer.onSubscribe(parent);
    try {
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

  从上面代码看出, Single.create与Observable.create的区别在于Emitter上面.

三、SingleCreate.Emitter与ObservableCreate.CreateEmitter

static final class Single.Emitter<T> extends AtomicReference<Disposable> implements SingleEmitter<T>, Disposable {

    final SingleObserver<? super T> downstream;

    Emitter(SingleObserver<? super T> downstream) {
        this.downstream = downstream;
    }
    @Override
    public void onSuccess(T value) {
        if (get() != DisposableHelper.DISPOSED) {
            Disposable d = getAndSet(DisposableHelper.DISPOSED);
            if (d != DisposableHelper.DISPOSED) {
                try {
                    if (value == null) {
                        downstream.onError(new NullPointerException("onSuccess called with null. Null values are generally not allowed in 2.x operators and sources."));
                    } else {
                        downstream.onSuccess(value);
                    }
                } finally {
                    if (d != null) {
                        d.dispose();
                    }
                }
            }
        }
    }
}

static final class ObservableCreate.CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {
    final Observer<? super T> observer;
    CreateEmitter(Observer<? super T> observer) {
        this.observer = observer;
    }
    @Override
    public void onNext(T t) {
        if (t == null) {
            onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
            return;
        }
        if (!isDisposed()) {
            observer.onNext(t);
        }
    }
}

  onSuccess与onNext在执行时, 都会首先通过isDispose()判断下游是否中断了与上游的连接.
  不同点: SingleCreate.Emitter在执行完onSuccess()之后会执行dispose()来中断上下游之间的连接, 表明上游发射器只发送一次消息, 下游也只接收一次消息. 而Observable.CreateEmitter在调用onNext()之后, 并没有调用dispose(), 而是当调用onComplete()之后才会执行dispose()操作来中断上下游, 所以也就是说如果是Observable.create方式, 则上游可以连续发送多次消息, 同理下游也可以多次接收消息.

相关文章

网友评论

    本文标题:Rxjava--Single.create与Observable

    本文链接:https://www.haomeiwen.com/subject/npuggctx.html