美文网首页
Rx-Java: create方法的源码阅读

Rx-Java: create方法的源码阅读

作者: 木头与琉璃 | 来源:发表于2019-12-17 09:22 被阅读0次

    在现实中我们经常会说,问题应该在何时,何地,由谁 做点什么来解决。假设发生了一个事故,事故(Observable)应该由有关部门(emitter)的具体人员(Observer)来按照某种流程(onNext)来处理,结果是处理成功(onComplate)或者处理失败(onError)。而事故和有关部门要有联系(subscribe)才能处理。

    源码整理

    • create方法的入参是个ObservableOnSubscribe的函数接口,出参是Observable这个函数接口的实现类ObservableCreate,而ObservableCreate中实现了父类Observable的抽象方法subscribeActual,subscribeActual方法主要使用了CreateEmitter这个内部类来实现。而subscribeActual是Observable.subscribe(Observer)时会调用来使Observable将元素发送出去。
        public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
            ObjectHelper.requireNonNull(source, "source is null");
            return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
        }
    
       public final class ObservableCreate<T> extends Observable<T> {
        final ObservableOnSubscribe<T> source;
    
        public ObservableCreate(ObservableOnSubscribe<T> source) {
            this.source = source;
        }
    
        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            CreateEmitter<T> parent = new CreateEmitter<T>(observer);
            observer.onSubscribe(parent);
            try {
                source.subscribe(parent);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                parent.onError(ex);
            }
        }
      }
    
    • CreateEmitter
     static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {
    
            private static final long serialVersionUID = -3434801548987643227L;
    
            final Observer<? super T> observer;
    
            CreateEmitter(Observer<? super T> observer) {
                this.observer = observer;
            }
    
            @Override
            public void onNext(T t) {
                if (t == null) {
                    onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                    return;
                }
                if (!isDisposed()) {
                    observer.onNext(t);
                }
            }
    
            @Override
            public void onError(Throwable t) {
                if (!tryOnError(t)) {
                    RxJavaPlugins.onError(t);
                }
            }
    
            @Override
            public boolean tryOnError(Throwable t) {
                if (t == null) {
                    t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
                }
                if (!isDisposed()) {
                    try {
                        observer.onError(t);
                    } finally {
                        dispose();
                    }
                    return true;
                }
                return false;
            }
    
            @Override
            public void onComplete() {
                if (!isDisposed()) {
                    try {
                        observer.onComplete();
                    } finally {
                        dispose();
                    }
                }
            }
        }
    
    • ObservableOnSubscribe的subscribe入参是ObservableEmitter,ObservableEmitter继承于io.reactivex.Emitter。这些都是函数式接口。 其实现类主要是上面的CreateEmitter,作用是用来下发数据。
    public interface ObservableOnSubscribe<T> {
    
        /**
         * Called for each Observer that subscribes.
         * @param emitter the safe emitter instance, never null
         * @throws Exception on error
         */
        void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
    }
    public interface ObservableEmitter<T> extends Emitter<T> {
    
        /**
         * Sets a Disposable on this emitter; any previous {@link Disposable}
         * or {@link Cancellable} will be disposed/cancelled.
         * @param d the disposable, null is allowed
         */
        void setDisposable(@Nullable Disposable d);
    
        /**
         * Sets a Cancellable on this emitter; any previous {@link Disposable}
         * or {@link Cancellable} will be disposed/cancelled.
         * @param c the cancellable resource, null is allowed
         */
        void setCancellable(@Nullable Cancellable c);
    
        /**
         * Returns true if the downstream disposed the sequence or the
         * emitter was terminated via {@link #onError(Throwable)}, {@link #onComplete} or a
         * successful {@link #tryOnError(Throwable)}.
         * <p>This method is thread-safe.
         * @return true if the downstream disposed the sequence or the emitter was terminated
         */
        boolean isDisposed();
    
        /**
         * Ensures that calls to onNext, onError and onComplete are properly serialized.
         * @return the serialized ObservableEmitter
         */
        @NonNull
        ObservableEmitter<T> serialize();
    
        /**
         * Attempts to emit the specified {@code Throwable} error if the downstream
         * hasn't cancelled the sequence or is otherwise terminated, returning false
         * if the emission is not allowed to happen due to lifecycle restrictions.
         * <p>
         * Unlike {@link #onError(Throwable)}, the {@code RxJavaPlugins.onError} is not called
         * if the error could not be delivered.
         * <p>History: 2.1.1 - experimental
         * @param t the throwable error to signal if possible
         * @return true if successful, false if the downstream is not able to accept further
         * events
         * @since 2.2
         */
        boolean tryOnError(@NonNull Throwable t);
    }
    
    public interface Emitter<T> {
    
        /**
         * Signal a normal value.
         * @param value the value to signal, not null
         */
        void onNext(@NonNull T value);
    
        /**
         * Signal a Throwable exception.
         * @param error the Throwable to signal, not null
         */
        void onError(@NonNull Throwable error);
    
        /**
         * Signal a completion.
         */
        void onComplete();
    }
    

    相关文章

      网友评论

          本文标题:Rx-Java: create方法的源码阅读

          本文链接:https://www.haomeiwen.com/subject/eoprnctx.html