美文网首页
RxJava的一些理解

RxJava的一些理解

作者: 北雁南飞_8854 | 来源:发表于2017-11-19 21:35 被阅读0次

    一、Observable的创建

    //Observable的创建, 这里的Object类可以替换为任意类型。
    Observable<Object> observable =
            Observable.create(new ObservableOnSubscribe<Object>() {
                @Override
                public void subscribe(ObservableEmitter<Object> e) throws Exception {
                    e.onNext(/*Object*/value1);
                    e.onNext(/*Object*/value2);
                    ...
                    e.onComplete();
                }
            });
    
    //Observer订阅事件
    observable.subscribe(new Observer<Object>() {
        @Override
        public void onSubscribe(Disposable d) {
        }
        @Override
        public void onNext(Object o) {
        }
        @Override
        public void onError(Throwable e) {
        }
        @Override
        public void onComplete() {
        }
    });
    

    1.1 被观察者(Observable)接口及实现类:

    (1) 接口ObservableSource

    public interface ObservableSource<T> {
        void subscribe(@NonNull Observer<? super T> observer);
    }
    

    (2) 接口ObservableSource的抽象实现类Observable

    public abstract class Observable<T> implements ObservableSource<T> {
    
        /* 实现ObservableSource接口。*/
        @SchedulerSupport(SchedulerSupport.NONE)
        @Override
        public final void subscribe(Observer<? super T> observer) {
            ObjectHelper.requireNonNull(observer, "observer is null");
            try {
                observer = RxJavaPlugins.onSubscribe(this, observer);
    
                ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
    
                subscribeActual(observer);
            } catch (NullPointerException e) { // NOPMD
                throw e;
            } catch (Throwable e) {
                ...
            }
        }
    
        protected abstract void subscribeActual(Observer<? super T> observer);
    
        /* 静态方法,用于创建Observable实例。*/
        @CheckReturnValue
        @SchedulerSupport(SchedulerSupport.NONE)
        public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
            ObjectHelper.requireNonNull(source, "source is null");
            return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
        }
    

    (3) Observable的具体实现类ObservableCreate

    public final class ObservableCreate<T> extends Observable<T> {
        final ObservableOnSubscribe<T> source;
    
        public ObservableCreate(ObservableOnSubscribe<T> source) {
            this.source = source;
        }
    
        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            CreateEmitter<T> parent = new CreateEmitter<T>(observer);
            observer.onSubscribe(parent);
    
            try {
                source.subscribe(parent);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                parent.onError(ex);
            }
        }
    
        static final class CreateEmitter<T> extends AtomicReference<Disposable>
                implements ObservableEmitter<T>, Disposable {
            private static final long serialVersionUID = -3434801548987643227L;
            final Observer<? super T> observer;
            CreateEmitter(Observer<? super T> observer) { this.observer = observer;}
             /* Emitter接口的实现。*/
            @Override
            public void onNext(T t) {
                ...
                observer.onNext(t);
            }
            @Override
            public void onError(Throwable t) {
                ...
                observer.onError(t);
            }
            @Override
            public void onComplete() {
                ...
                observer.onComplete();
            }
    
            /* Disposable接口的实现。*/
            @Override
            public void dispose() { ...}
            @Override
            public boolean isDisposed() {...}
        }
    }    
    

    1.2 被观察者(Observable)和观察者(Observer)之间的桥梁

    被观察者(Observable)持有ObservableOnSubscribe实例的引用, 参数ObservableEmitter持有观察者(Observer)实例的引用,这样通过ObservableOnSubscribe的subscribe(ObservableEmitter<T> e)方法,Observable和Observer之间就建立了联系。
    (1) 接口ObservableOnSubscribe

    /**
     * A functional interface that has a {@code subscribe()} method that receives
     * an instance of an {@link ObservableEmitter} instance that allows pushing
     * events in a cancellation-safe manner.
     *
     * @param <T> the value type pushed
     */
    public interface ObservableOnSubscribe<T> {
        void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;
    }
    

    二、总结

    RxJava的使用可以概括为:

    1. 通过Observable.create(ObservableOnSubscribe<T> source)创建一个Observable实例,实际类型为ObservableCreate, 它持有对source的引用;
    2. 通过已创建的Observable实例,调用subscribe(Observer<? super T> observer)方法,在该方法中完成的主要工作:
    • 使用observer作参数构建CreateEmitter类的实例, 记为parent,parent持有对observer的引用;
    • 调用observer.onSubscribe(parent);
    • 通过ObservableCreate类的source成员, 调用source.subscribe(parent),该方法由用户自定义,一般包含:
      parent.onNext(Object);
      parent.onError(Throwable);
      parent.onComplete();
      最终分别会调用observer的onNext(Object),onComplete()和onError(Throwable)方法。

    三、操作符

    concatMap

    @CheckReturnValue
    @BackpressureSupport(BackpressureKind.FULL)
    @SchedulerSupport(SchedulerSupport.NONE)
    public final <R> Flowable<R> concatMap(Function<? super T, ? extends Publisher<? extends R>> mapper);
    

    说明:
    对源Publisher发射的每一个元素应用一个转换函数(transformation function),来生成一个新的Flowable实例,然后将新的Flowable实例按原序发给订阅者。
    举例:

    private void timeoutWithRetry() {
            Flowable
                    .just("red", "dark", "yellow", "green", "black", "blue")
                    .concatMap(new Function<String, Publisher<? extends String>>() {
                        @Override
                        public Publisher<? extends String> apply(String color) throws Exception {
                            Log.d(TAG, "applying " + color + ", thread: " + Thread.currentThread().getName());
                            return Flowable.just(color)
                                    .delay(color.length(), TimeUnit.SECONDS)
                                    .timeout(5, TimeUnit.SECONDS) //超时时间, 如果超时、且没有添加重试,则抛出TimeoutException.
                                    .retry(2); //Publisher在出现onError()时重试的次数, 重试之后再决定是否调用onError().
                        }
                    })
                    .subscribeOn(Schedulers.computation())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Consumer<String>() {
                        @Override
                        public void accept(String s) throws Exception {
                            Log.d(TAG, String.format("Received {%s} delaying for {%d}, thread: %s", s, s.length(), Thread.currentThread().getName()));
                        }
                    }, new Consumer<Throwable>() {
                        @Override
                        public void accept(Throwable throwable) throws Exception {
                            Log.e(TAG, "accept exception:  ", throwable);
                        }
                    });
        }
    
    
    

    四、RxJava的线程控制

    1. 产生事件的代码(ObservableOnSubscribe接口)和doOnSubscribe()分别在它们后面最近的一个subscribeOn() 指定的Scheduler上执行,如果后面没有找到subscribeOn(),则在subscribe()的调用者所在的线程执行;
    2. 普通操作(map、filter等)和消费事件的代码(Consumer、Observer接口)在它们前面最近的一个observeOn指定的Scheduler上执行;如果它们前面没有observeOn了,那么它们就在整个调用链的第一个subscribeOn指定的Scheduler上执行;如果没找到subscribeOn调用,则在subscribe()的调用者所在的线程执行。

    相关文章

      网友评论

          本文标题:RxJava的一些理解

          本文链接:https://www.haomeiwen.com/subject/wvppvxtx.html