美文网首页
RxJava从使用到原理

RxJava从使用到原理

作者: 帝王鲨kingcp | 来源:发表于2018-12-27 15:52 被阅读0次
    创建操作符

    操作符使用

    1. 基本创建
      create() 完整创建1个被观察者对象(Observable)
    2. 快速创建,发送事件
      just() 快速创建1个被观察者对象(Observable),发送事件的特点:直接发送 传入的事件
      fromArray() 快速创建1个被观察者对象(Observable),发送事件的特点:直接发送 传入的数组数据
      fromIterable() 快速创建1个被观察者对象(Observable),发送事件的特点:直接发送 传入的集合List数据
      测试使用 empty() error() never()
    • 延迟创建
      defer() 直到有观察者(Observer )订阅时,才动态创建被观察者对象(Observable) & 发送事件
      timer() 快速创建1个被观察者对象(Observable),发送事件的特点:延迟指定时间后,发送1个数值0(Long类型)
      interval() 快速创建1个被观察者对象(Observable),发送事件的特点:每隔指定时间 就发送 事件,发送的事件序列 = 从0开始、无限递增1的的整数序列
      intervalRange() 快速创建1个被观察者对象(Observable),发送事件的特点:每隔指定时间 就发送 事件,可指定发送的数据的数量
      range() 快速创建1个被观察者对象(Observable), 发送事件的特点:连续发送 1个事件序列,可指定范围, 作用类似于intervalRange(),但区别在于:无延迟发送事件
      rangeLong() 类似于range(),区别在于该方法支持数据类型 = Long
    创建操作符.png
    Android RxJava:最基础的操作符详解 - 创建操作符
    变化操作符
    1. map() 对被观察者发送的每1个事件都通过指定的函数处理,从而变换成另外一种事件即,将被观察者发送的事件转换为任意的类型事件。
    2. flatmap() 将被观察者发送的事件序列进行拆分 & 单独转换,再合并成一个新的事件序列,最后再进行发送。新合并生成的事件序列顺序是无序的,即与旧序列发送事件的顺序无关。
    3. ConcatMap() 类似FlatMap()操作符,拆分 & 重新合并生成的事件序列的顺序 = 被观察者旧序列生产的顺序
    4. Buffer() 定期从被观察者(Obervable)需要发送的事件中获取一定数量的事件 & 放到缓存区中,最终发送
      Android RxJava:图文详解 变换操作符
    组合/合并操作符
    1. concat() / concatArray() 组合多个被观察者一起发送数据,合并后按发送顺序串行执行
    2. merge() / mergeArray() 组合多个被观察者一起发送数据,合并后按时间线并行执行
    3. concatDelayError() / mergeDelayError() onError事件推迟到其他被被观察者发送事件结束后触发
    4. zip() 合并多个被观察者(Observable)发送的事件,生成一个新的事件序列(即组合过后的事件序列),并最终发送。事件组合方式 = 严格按照原先事件序列进行对位合并,最终合并的事件数量 = 多个被观察者(Observable)中数量最少的数量。
    5. combineLatest() 当两个Observables中的任何一个发送了数据后,将先发送了数据的Observables 的最新(最后)一个数据与另外一个Observable发送的每个数据结合,最终基于该函数的结果发送数据。
    6. combineLatestDelayError() 作用类似于concatDelayError() / mergeDelayError() ,即错误处理,此处不作过多描述。
    7. reduce() 把被观察者需要发送的事件聚合成1个事件 & 发送
    8. startWith() / startWithArray() 在一个被观察者发送事件前,追加发送一些数据 / 一个新的被观察者
    9. count() 统计被观察者发送事件的数量
      Android RxJava:组合 / 合并操作符 详细教程
    功能性操作符
    1. subscribe() 订阅,即连接观察者&被观察者
    2. 线程调度
    3. 延迟操作
    4. do() 在事件的生命周期中操作


      do操作符.png
    5. 错误处理


      错误操作符.png
    6. 重复发送
      repeat() 无条件地、重复发送 被观察者事件
      repeatWhen() 有条件地、重复发送 被观察者事件
      image.png
      Android RxJava:功能性操作符 全面讲解

    原理

    Single:如下面的代码做最简单的操作,创建被观察者,观察者以及相互之间订阅。
    //创建被观察者
    Single<String> single = Single.just("1");
    //创建观察者
    SingleObserver<String> observer = new SingleObserver<String>() {
        @Override
        public void onSubscribe(Disposable d) {
    
        }
    
        @Override
        public void onSuccess(String s) {
    
        }
    
        @Override
        public void onError(Throwable e) {
    
        }
    };
      //发生订阅关系
      single.subscribe(observer);
    

    just方法,返回一个包裹了Single的SingleJust。SingleJust继承Single类。

    public static <T> Single<T> just(final T item) {
            ObjectHelper.requireNonNull(item, "value is null");
            return RxJavaPlugins.onAssembly(new SingleJust<T>(item));
    }
    

    被观察者订阅观察者,subscribe方法回去实现subscribeActual(subscriber),这个方法就在SingleJust中

     public final void subscribe(SingleObserver<? super T> subscriber) {
            ObjectHelper.requireNonNull(subscriber, "subscriber is null");
    
            subscriber = RxJavaPlugins.onSubscribe(this, subscriber);
    
            ObjectHelper.requireNonNull(subscriber, "subscriber returned by the RxJavaPlugins hook is null");
    
            try {
                subscribeActual(subscriber);
            } catch (NullPointerException ex) {
                throw ex;
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                NullPointerException npe = new NullPointerException("subscribeActual failed");
                npe.initCause(ex);
                throw npe;
            }
        }
    

    subscribleActual会去调用观察者SingleObserver的onSubscribe和onSuccess方法。其中onSubscribe还返回一个已经丢弃的丢弃对象Disposables,Disposable会在下面讲。

    public final class SingleJust<T> extends Single<T> {
    
        final T value;
    
        public SingleJust(T value) {
            this.value = value;
        }
    
        @Override
        protected void subscribeActual(SingleObserver<? super T> s) {
            s.onSubscribe(Disposables.disposed());
            s.onSuccess(value);
        }
    
    }
    
    
    Create作为和just一样的创建操作符,其实流程是相似的,下面是流程图
    create创建原理.png
    Map操作符使用如下,能将just中的内容进行变换后往下传递。
            SingleJust.just(1)
                    .map(new Function<Integer, String>() {
                        @Override
                        public String apply(Integer integer) throws Exception {
                            return String.valueOf(integer);
                        }
                    })
                    .subscribe(new SingleObserver<String>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onSuccess(String integer) {
    
                        }
    
                        @Override
                        public void onError(Throwable e) {
    
                        }
                    });
    

    进入map方法,让后会返回一个新创建的SingleMap。

     public final <R> Single<R> map(Function<? super T, ? extends R> mapper) {
            ObjectHelper.requireNonNull(mapper, "mapper is null");
            return RxJavaPlugins.onAssembly(new SingleMap<T, R>(this, mapper));
    }
    

    在SingleMap类当中,subscribeActual中上游source订阅一个新的观察者MapSingleObserver,source.subscribe(new MapSingleObserver<T, R>(t, mapper)),这样就是让SingleJust去调用它自己的subscribeActual(),这样整个过程启动了。MapSingleObserver包裹一个我们创建的SingleObserver(t),看MapSingleObserver类,内部做的就是一个桥接和自己apply的工作。代码和整体流程图在下面。

    public final class SingleMap<T, R> extends Single<R> {
        final SingleSource<? extends T> source;
    
        final Function<? super T, ? extends R> mapper;
    
        public SingleMap(SingleSource<? extends T> source, Function<? super T, ? extends R> mapper) {
            this.source = source;
            this.mapper = mapper;
        }
    
        @Override
        protected void subscribeActual(final SingleObserver<? super R> t) {
            source.subscribe(new MapSingleObserver<T, R>(t, mapper));
        }
    
        static final class MapSingleObserver<T, R> implements SingleObserver<T> {
    
            final SingleObserver<? super R> t;
    
            final Function<? super T, ? extends R> mapper;
    
            MapSingleObserver(SingleObserver<? super R> t, Function<? super T, ? extends R> mapper) {
                this.t = t;
                this.mapper = mapper;
            }
    
            @Override
            public void onSubscribe(Disposable d) {
                t.onSubscribe(d);
            }
    
            @Override
            public void onSuccess(T value) {
                R v;
                try {
                    v = ObjectHelper.requireNonNull(mapper.apply(value), "The mapper function returned a null value.");
                } catch (Throwable e) {
                    Exceptions.throwIfFatal(e);
                    onError(e);
                    return;
                }
    
                t.onSuccess(v);
            }
    
            @Override
            public void onError(Throwable e) {
                t.onError(e);
            }
        }
    }
    
    
    带map操作符的流程图.png
    Disposable:作用主要就是让上游停止工作。实现方式主要有两种方式:桥接,替换。这两种方式我会具体讲。用法一般就是全局定义一个Disposable,在onSubscribe(Disposable d)中获得disposable对象,在生命周期的onDestroy中去丢弃。
    1. Single.just()没有延迟没有后续,直接传一个已经丢弃的丢弃对象,相当于传了一个没有用的对象。
    public final class SingleJust<T> extends Single<T> {
    
        @Override
        protected void subscribeActual(SingleObserver<? super T> s) {
            s.onSubscribe(Disposables.disposed());
            s.onSuccess(value);
        }
    
    }
    
    2. delay(),有的延迟丢弃,看看是如何完成的。
        public final Single<T> delay(long time, TimeUnit unit, boolean delayError) {
            return delay(time, unit, Schedulers.computation(), delayError);
        }
    

    创建返回一个SingleDelay

        public final Single<T> delay(final long time, final TimeUnit unit, final Scheduler scheduler, boolean delayError) {
            ObjectHelper.requireNonNull(unit, "unit is null");
            ObjectHelper.requireNonNull(scheduler, "scheduler is null");
            return RxJavaPlugins.onAssembly(new SingleDelay<T>(this, time, unit, scheduler, delayError));
        }
    

    进入SingleDelay类,重点还是看subscribeActual,创建一个SequentialDisposable,通过 s.onSubscribe(sd)方法,让下游的SingleObserver能够拿到sd。上游source,用subscribe(new Delay(sd, s))进行启动事件。重点看一下Delay类,当上游调用onSubscribe(Disposable d)方法时,sd.replace(d)将sd替换成上游的d。当上游调用onSuccess(final T value)方法时, sd.replace(scheduler.scheduleDirect(new OnSuccess(value), time, unit))将sd替换成在执行延迟的disposable。同样onError也是一样。
    总结来说,delay用替换的方式去传递丢弃事件disposable。

    public final class SingleDelay<T> extends Single<T> {
    
        final SingleSource<? extends T> source;
        final long time;
        final TimeUnit unit;
        final Scheduler scheduler;
        final boolean delayError;
    
        public SingleDelay(SingleSource<? extends T> source, long time, TimeUnit unit, Scheduler scheduler, boolean delayError) {
            this.source = source;
            this.time = time;
            this.unit = unit;
            this.scheduler = scheduler;
            this.delayError = delayError;
        }
    
        @Override
        protected void subscribeActual(final SingleObserver<? super T> s) {
    
            final SequentialDisposable sd = new SequentialDisposable();
            s.onSubscribe(sd);
            source.subscribe(new Delay(sd, s));
        }
    
        final class Delay implements SingleObserver<T> {
            private final SequentialDisposable sd;
            final SingleObserver<? super T> s;
    
            Delay(SequentialDisposable sd, SingleObserver<? super T> s) {
                this.sd = sd;
                this.s = s;
            }
    
            @Override
            public void onSubscribe(Disposable d) {
                sd.replace(d);
            }
    
            @Override
            public void onSuccess(final T value) {
                sd.replace(scheduler.scheduleDirect(new OnSuccess(value), time, unit));
            }
    
            @Override
            public void onError(final Throwable e) {
                sd.replace(scheduler.scheduleDirect(new OnError(e), delayError ? time : 0, unit));
            }
    
            final class OnSuccess implements Runnable {
                private final T value;
    
                OnSuccess(T value) {
                    this.value = value;
                }
    
                @Override
                public void run() {
                    s.onSuccess(value);
                }
            }
    
            final class OnError implements Runnable {
                private final Throwable e;
    
                OnError(Throwable e) {
                    this.e = e;
                }
    
                @Override
                public void run() {
                    s.onError(e);
                }
            }
        }
    }
    
    带延迟的disposable流程.png
    3. subscribeOn,observeOn线程转换操作符,subscribeOn是切换上游的线程,observeOn是切换下游的线程。

    不断跟进SubscribeOn(),最后会到SingleSubscribeOn类中,看一下subscribeActual方法,scheduler.scheduleDirect(parent)切线执行上游任务。如果没有线程的再次切换,后续任务将在这个线程中一直执行下去。

    public final class SingleSubscribeOn<T> extends Single<T> {
        final SingleSource<? extends T> source;
    
        final Scheduler scheduler;
    
        public SingleSubscribeOn(SingleSource<? extends T> source, Scheduler scheduler) {
            this.source = source;
            this.scheduler = scheduler;
        }
    
        @Override
        protected void subscribeActual(final SingleObserver<? super T> s) {
            final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s, source);
            s.onSubscribe(parent);
    
            Disposable f = scheduler.scheduleDirect(parent);
    
            parent.task.replace(f);
    
        }
    
        static final class SubscribeOnObserver<T>
        extends AtomicReference<Disposable>
        implements SingleObserver<T>, Disposable, Runnable {
    
            private static final long serialVersionUID = 7000911171163930287L;
    
            final SingleObserver<? super T> actual;
    
            final SequentialDisposable task;
    
            final SingleSource<? extends T> source;
    
            SubscribeOnObserver(SingleObserver<? super T> actual, SingleSource<? extends T> source) {
                this.actual = actual;
                this.source = source;
                this.task = new SequentialDisposable();
            }
    
            @Override
            public void onSubscribe(Disposable d) {
                DisposableHelper.setOnce(this, d);
            }
    
            @Override
            public void onSuccess(T value) {
                actual.onSuccess(value);
            }
    
            @Override
            public void onError(Throwable e) {
                actual.onError(e);
            }
    
            @Override
            public void dispose() {
                DisposableHelper.dispose(this);
                task.dispose();
            }
    
            @Override
            public boolean isDisposed() {
                return DisposableHelper.isDisposed(get());
            }
    
            @Override
            public void run() {
                source.subscribe(this);
            }
        }
    
    }
    

    同样不断跟进observeOn(),最后进入SingleObserveOn类,查看subscribeActual()方法,source.subscribe(new ObserveOnSingleObserver<T>(s, scheduler))具体查看ObserveOnSingleObserver,在onSuccess方法中,进行线程切换scheduler.scheduleDirect(this)。所以observeOn用来切换下游方法。

    public final class SingleObserveOn<T> extends Single<T> {
    
        final SingleSource<T> source;
    
        final Scheduler scheduler;
    
        public SingleObserveOn(SingleSource<T> source, Scheduler scheduler) {
            this.source = source;
            this.scheduler = scheduler;
        }
    
        @Override
        protected void subscribeActual(final SingleObserver<? super T> s) {
            source.subscribe(new ObserveOnSingleObserver<T>(s, scheduler));
        }
    
        static final class ObserveOnSingleObserver<T> extends AtomicReference<Disposable>
        implements SingleObserver<T>, Disposable, Runnable {
            private static final long serialVersionUID = 3528003840217436037L;
    
            final SingleObserver<? super T> actual;
    
            final Scheduler scheduler;
    
            T value;
            Throwable error;
    
            ObserveOnSingleObserver(SingleObserver<? super T> actual, Scheduler scheduler) {
                this.actual = actual;
                this.scheduler = scheduler;
            }
    
            @Override
            public void onSubscribe(Disposable d) {
                if (DisposableHelper.setOnce(this, d)) {
                    actual.onSubscribe(this);
                }
            }
    
            @Override
            public void onSuccess(T value) {
                this.value = value;
                Disposable d = scheduler.scheduleDirect(this);
                DisposableHelper.replace(this, d);
            }
    
            @Override
            public void onError(Throwable e) {
                this.error = e;
                Disposable d = scheduler.scheduleDirect(this);
                DisposableHelper.replace(this, d);
            }
    
            @Override
            public void run() {
                Throwable ex = error;
                if (ex != null) {
                    actual.onError(ex);
                } else {
                    actual.onSuccess(value);
                }
            }
    
            @Override
            public void dispose() {
                DisposableHelper.dispose(this);
            }
    
            @Override
            public boolean isDisposed() {
                return DisposableHelper.isDisposed(get());
            }
        }
    }
    
    

    箭头颜色代表线程


    线程切换.png

    三件套缺一不可:

    OkHttp从使用到原理
    Retrofit从使用到原理
    RxJava从使用到原理

    真诚推荐

    下面的三篇文章我觉的分析的很好,特别是下面这张图流程很到位。
    基本流程及Rxjava中的设计模式
    线程切换subscribeOn
    线程切换observerOn

    线程切换subscribeOn

    相关文章

      网友评论

          本文标题:RxJava从使用到原理

          本文链接:https://www.haomeiwen.com/subject/kaobdftx.html