美文网首页
RxJava->onCreate()与subscribe(

RxJava->onCreate()与subscribe(

作者: 冉桓彬 | 来源:发表于2017-09-03 22:22 被阅读69次

    1. example:

    Observable
        .create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onComplete();
            }
        })
        .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                LogUtils.log(getClass(), "onSubscribe()");
            }
    
            @Override
            public void onNext(Integer value) {
                LogUtils.log(getClass(), "onNext()");
            }
    
            @Override
            public void onError(Throwable e) {
                LogUtils.log(getClass(), "onError()");
            }
    
            @Override
            public void onComplete() {
                LogUtils.log(getClass(), "onComplete()");
            }
    });
    
    • 单线程中使用rxjava示例. 下面结合源码分析每个api到底做了哪些事, 以及单线程中这些api的关系如何;

    继承关系:

    public abstract class Observable<T> implements ObservableSource<T>;
    public interface ObservableSource<T>;
    public interface Observer<T>;
    public interface ObservableOnSubscribe<T>;
    public interface ObservableEmitter<T> extends Emitter<T>;
    public interface Emitter<T>;
    

    onCreate():

    public abstract class Observable<T> implements ObservableSource<T> {
        @CheckReturnValue
        @SchedulerSupport(SchedulerSupport.NONE)
        public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
            return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
        }
    }
    
    public final class RxJavaPlugins {
        public static <T> Observable<T> onAssembly(Observable<T> source) {
            return source;
        }
    }
    
    public interface ObservableOnSubscribe<T> {
        void subscribe(ObservableEmitter<T> emitter) throws Exception;
    }
    
    public final class ObservableCreate<T> extends Observable<T> {
        final ObservableOnSubscribe<T> source;
    
        public ObservableCreate(ObservableOnSubscribe<T> source) {
            this.source = source;
        }
    }
    

    上面代码做了下面几件事:

    • 1、创建ObervableOnSubscirbe对象, 并将引用指向ObservableCreate;
    • 2、创建ObservableCreate对象, ObservalbeCreate继承自Observable, 并且ObservableCreate持有ObservableOnSubscribe对象的引用;
    public abstract class Observable<T> implements ObservableSource<T>;
    public interface ObservableSource<T> {
        void subscribe(Observer<? super T> observer);
    }
    
    public interface Observer<T> {
        void onSubscribe(Disposable d);
    
        void onNext(T value);
    
        void onError(Throwable e);
    
        void onComplete();
    }
    
    • 从上面代码可以看出create()做得事仅仅是初始化的作用;

    subscribe():

    .subscribe(new Observer<Integer>() {
         @Override
         public void onSubscribe(Disposable d) {}
    
         @Override
         public void onNext(Integer value) {}
    
         @Override
         public void onError(Throwable e) {}
    
         @Override
         public void onComplete() {}
    });
    
    public abstract class Observable<T> implements ObservableSource<T> {
        @SchedulerSupport(SchedulerSupport.NONE)
        @Override
        public final void subscribe(Observer<? super T> observer) {
            observer = RxJavaPlugins.onSubscribe(this, observer);
            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
            subscribeActual(observer);
        }
    }
    
    public final class RxJavaPlugins {
        public static <T> Observer<? super T> onSubscribe(Observable<T> source, Observer<? super T> observer) {
            BiFunction<Observable, Observer, Observer> f = onObservableSubscribe;
            if (f != null) {
                return apply(f, source, observer);
            }
            return observer;
        }
    }
    
    • 1、经过测试, f = null;
    • 2、从create()知道, subscribeActual()实际是被Observable的子类ObservableCreate所实现;
    public final class ObservableCreate<T> extends Observable<T> {
        final ObservableOnSubscribe<T> source;
        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            CreateEmitter<T> parent = new CreateEmitter<T>(observer);
            observer.onSubscribe(parent);
            source.subscribe(parent);
        }
    }
    
    static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable ;
    
    public interface ObservableOnSubscribe<T> {
    
        void subscribe(ObservableEmitter<T> emitter) throws Exception;
    }
    

    被观察者Observable与观察者Observer之间的关联实际就是在subscribeActual方法中建立起来的

    • 1、CreateEmitter实现于Disposable;
    • 2、observer持有CreateEmitter的引用, 并将CreateEmitter引用指向其内部的Disposable;
    • 3、ObservableOnSubscribe参股体验CreateEmitter的引用, 并将CreateEmitter引用指向其内部的ObservableEmitter;
    • 4、onSubscribe(Disposable disposable)首先被调用;

    CreateEmitter:

    static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {
    
        private static final long serialVersionUID = -3434801548987643227L;
    
        final Observer<? super T> observer;
    
        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }
        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }
    
        @Override
        public void onError(Throwable t) {
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
            } else {
                RxJavaPlugins.onError(t);
            }
        }
    
        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }
    
        @Override
        public void setDisposable(Disposable d) {
            DisposableHelper.set(this, d);
        }
    
        @Override
        public void setCancellable(Cancellable c) {
             setDisposable(new CancellableDisposable(c));
        }
    
        @Override
        public ObservableEmitter<T> serialize() {
            return new SerializedEmitter<T>(this);
        }
    
        @Override
        public void dispose() {
            DisposableHelper.dispose(this);
        }
    
        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }
    }
    
      1. 如果调用了dispose()方法, emitter调用onNext(), onError(), onComplete()方法会继续执行, 但是Observer里面的onSubscribe(), onNext, onError, onComplete不会被调用;
      1. 如果emitter调用了onComplete()或onErrer()方法, 则Observer内的方法onXXX方法不会再次执行;

    相关文章

      网友评论

          本文标题:RxJava->onCreate()与subscribe(

          本文链接:https://www.haomeiwen.com/subject/wcfsjxtx.html