美文网首页安卓RxJava
RxJava2学习总结——操作符和应用场景

RxJava2学习总结——操作符和应用场景

作者: Dengszzzzz | 来源:发表于2019-07-23 12:59 被阅读0次

    前言

    2017年,刚毕业的我在项目组里打下手,组里其他人决定在新项目里使用RxJava,那时网络库用的是他们自己再次封装了的noHttp,加上了RxJava后,以前简单的网络请求多写了很多代码,这是我对RxJava的第一感受。
    如今的项目基本上都是MVP+Retrofit2+Rxjava2的框架,但往往只是把Rxjava2和Retrofit2搭配进行网络请求而已,我也不例外。也曾看过好几次RxJava2的入门教程、操作符总结等,过段时间就忘了,而且不知在什么样的情景下适用恰当,所以写下这篇文章,打算带着问题出发,重新整理一下。
    问题:
    1、RxJava是什么,它优势是什么?
    2、RxJava基础知识。
    3、RxJava2操作符。
    4、背压策略。
    5、具体的使用场景。

    1、RxJava是什么,它优势是什么?

    RxJava官方介绍——一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库。

    Rxjava本质就是异步,它的优势是简洁。简洁是指相比于AsyncTask 、Handler、runOnUiThread等异步操作,RxJava的逻辑更简洁。一方面它是链式的,逻辑代码直观,二是它提供了很多强大的操作符。

    2、RxJava基础知识

    RxJava的基础知识,首先要知道它是用了可拓展的观察者模式,然后再了解这些概念,Observable、Observer、subscribe、处理事件、Scheduler。

    1)什么是观察者模式?
    观察者(Observer)不时刻盯着被观察者(Observable),而是通过注册(Register)或者称为订阅(Subscribe)的方式,告诉被观察者我需要你的某某状态,你要在它变化的时候通知我。
    举例1:
    A(观察者)和B(被观察者),A不需要每过 1s 就检查一次 B 的状态,而是B状态发生改变时,去通知A。
    举例2:
    View的点击事件,View(被观察者)->OnClickLisetener(观察者)->setOnClickListener(订阅)->OnClick(事件)。
    举例3:
    小偷在偷钱时告诉警察它要偷钱。小偷(被观察者)、警察(观察者)、小偷偷钱是某种状态、警察把小偷抓起来(事件)。

    2)RxJava的观察者模式
    Observable(被观察者)、Observer(观察者)、 subscribe(订阅)、事件。
    看代码清楚明了,ObservableEmitter是事件发射器,在onSubscribe方法里可得到Disposable,可用于取消订阅。在观察者的方法体里有4个事件,解释看注释。如果发送了onError() 或 onComplete()事件,就取消了这个订阅。
    一般用于网络请求时,都会写一个继承Observer的类来做处理,比如在onSubscribe()里做加载框显示,在onComplete()做某些共性判断,在onError()做统一失败处理等。所以Observer的4个方法肯定是要清楚的。

    /**
    * 打印结果:  只会打印1和2,3不打印
    */
    Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                //ObservableEmitter类,事件发射器,作用是定义需要发送的事件 & 向观察者发送事件
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onComplete();
                emitter.onNext(3);
            }
        }).subscribe(new Observer<Integer>() {
    
            private Disposable mDisposable;  //用于取消订阅
    
            /**
             * 订阅成功就执行的方法
             * @param d  可切断操作
             */
            @Override
            public void onSubscribe(Disposable d) {
                mDisposable = d;
            }
    
            /**
             * 接收事件
             * @param integer
             */
            @Override
            public void onNext(Integer integer) {
                KLog.d(TAG, "onNext: " + integer + "\n");
            }
    
            /**
             * 失败,不再接收事件
             * @param e
             */
            @Override
            public void onError(Throwable e) {
    
            }
    
            /**
             * 完成,不再接收事件
             */
            @Override
            public void onComplete() {
            }
        });
    

    3)线程控制Scheduler
    Scheduler是线程调度,RxJava通过它来指定每一段代码运行在什么样的线程。例如网络请求用io流线程,更新UI就用mainThread,最常用也是这两个。
    Schedulers.newThread():新线程。
    Schedulers.io():在io()内部有线程池,可以使用空闲的线程,多数情况下比newThread()效率高。
    Schedulers.computation():计算线程(没用过)。
    AndroidSchedulers.mainThread():主线程。

    4)取消订阅
    RxJava2的取消订阅用Disposable.cancel(),如果订阅事件过多,要写管理类来管理。如果不取消订阅,就会造成内存泄漏,网上有RxLifecycle第三方库,可以很方便做取消订阅,它绑定了Activity or Fragment的生命周期。

    3、RxJava2操作符

    这里重新整理了一下当初学习的操作符,原本打算整理全部的操作符的,然后发现实在是太多了,就放弃了,下面的操作符绝大多数都来自于Android Rxjava:这是一篇 清晰 & 易懂的Rxjava 入门教程,想详细学习的可看这篇文章。网上也有很多文章有对操作符做简述,这篇文章比较完整Android拾萃 - RxJava2操作符汇总。最好结合代码学习,下载地址在本文最下面。

    1)创建操作符
    包括完整&快速创建被观察者,定时操作,周期性操作,数据/集合遍历

    操作符 简述
    基本创建
    create() 普通创建,可以定义要发送的事件
    快速创建
    just() 快速创建一个Observable,最多发送10个参数
    fromArray() 和just()类似,区别是传的是数组
    fromIterable() 和fromArray()类似,区别是传入list集合
    never() 不发送事件
    empty() 仅发送complete事件
    error() 仅发送empty事件
    延迟创建
    defer() 直到订阅才创建Observable
    timer() 延迟一段时间再才发送
    interval() 周期性发送,值从0开始递增。
    intervalRange() 指定范围,周期性发送。可用于做验证码倒计时
    range() 连续发送一个时间序列,可指定范围

    2)变换操作符

    操作符 简述
    map() 将被观察者发送的事件转换为任意的类型事件。返回的是结果集,适用于一对一转换。(数据类型转换)
    flatMap() 将被观察者发送的事件序列进行 拆分 & 单独转换,再合并成一个新的事件序列,最后再进行发送。返回的是包含结果集的Observable,适用于一对多,多对多得场景。
    concatMap() 和flatMap()类似,区别是flatMap是无序的,concatMap是有序的,它的新事件序列和旧序列顺序一致。具体可以在flatMap生成事件的逻辑里加个延迟看到差异。
    buffer() 定期从被观察者需要发送的事件中获取一定数量的事件,放到缓存区中,最终发送。两个参数,count是缓存区大小,skip是步长。
    switchMap() 将Observable发射的数据集合变换为Observables集合,然后只发射这些Observables最近发射的数据
    scan() 对Observable发射的每一项数据应用一个函数,然后按顺序依次发射每一个值
    groupBy() 将Observable分拆为Observable集合,将原始Observable发射的数据按Key分组,每一个Observable发射一组不同的数据

    3)组合/合并转换符

    操作符 简述
    组合多个被观察者
    concat() / concatArray() 组合多个被观察者,按发送顺序串行执行。有Array是指传数组
    merge()/mergeArray() 组合多个被观察者,按时间线并行执行
    concatDelayError()/mergeDelayError() onError事件推迟到其他被观察者发送事件结束后才触发
    合并多个事件
    zip() 按数量合并,最终事件的数量=多个观察者中事件最少的数量。比如被观察者1有4个事件,被观察者2有5个事件,则组合起来事件数量是4。
    combineLatest() 按时间合并,和最新事件合并。当两个Observable中的任何一个发送了事件后,将先发送了数据的Obervable的最新一个事件与另一个Observable发送的每个事件结合,最终基于该函数的结果发送事件。和zip的区别是,zip()按个数合并,1对1合并;combineLastest 按时间合并,在同一时间点上合并。
    reduce() 把被观察者需要发送的事件聚合成1个事件
    collect() 将被观察者Observable发送的数据事件收集到一个数据结构里
    其他
    startWith() 在一个被观察者发送事件前,追加发送一些数据 / 一个新的被观察者
    count() 统计被观察者发送事件的数量

    4)功能性操作符

    操作符 简述
    subscribe() 订阅,即连接观察者 & 被观察者
    delay() 使得被观察者延迟一段时间再发送事件
    doXXX() 各个事件操作符,如 doOnEach、doOnNext、doComplete等。
    retry() 重试,即当出现错误时,让被观察者(Observable)重新发射数据
    retryUntil() Observable遇到错误时,在retryUntil()的方法里决定,是否让Observable重新订阅
    retryWhen() retryWhen将onError中的Throwable传递给一个函数,这个函数产生另一个Observable,由这个Observable来决定是否要重新订阅原Observable。
    repeat() 无条件地、重复发送 被观察者事件
    repeatWhen( int ) 传入参数 = 重复发送次数有限

    5)过滤操作符

    操作符 简述
    filter() 过滤 特定条件的事件
    ofType() 过滤 特定数据类型的数据
    skip( int ) 跳过n个事件
    skipLast( int ) 跳过最后的n个事件
    distinct() 过滤事件序列中重复的事件
    distinctUntilChanged() 只确保相邻元素不重复出现
    take( int ) 指定只接收n个事件
    takeLast( int ) 指定只接收最后发送的n个事件

    6)条件/布尔操作符

    操作符 简述
    all() 判断发送的每项数据是否都满足设置的函数条件,若满足,返回 true;否则,返回 false。
    takeWhile() 判断发送的每项数据是否满足设置函数条件,为true就发送,为false不发送,且终止发送事件(后面的事件也不发送了)。
    skipWhile() 判断发送的每项数据是否满足设置函数条件,为true就不发送,为false发送,且终止发送事件。
    takeUntil() 接收第一个Observable(调用takUtil的Observable)发送的数据,当第二个Observable(takUtil参数中的Observable)发送数据时,两个Obserable会同时取消订阅。
    skipUntil() 与takeUtil()正好相反,不接收第一个Observable发送的数据,直到第二个Observable发送数据时才接收第一个Observable的数据,此时第二个Observable会取消订阅。
    sequenceEqual() 判定两个Observables需要发送的数据是否相同,相同返回 true,不相同返回 false
    contains() 判断发送的数据中是否包含指定数据。
    isEmpty() 判断发送的数据是否为空
    amb() 当需要发送多个 Observable时,只发送最先发送的Observable的数据,而其余Observable则被丢弃。

    4.背压策略。

    当在异步订阅中,通过Observable发射、处理、响应数据流时,如果事件产生的速度远远快于事件消费的速度,这些没来得及处理的数据就会越积越多,这些数据不会丢失,也不会被垃圾回收机制回收,而是存放在一个异步缓存池中,缓存池的数据一直得不到处理,最终会导致OOM等异常。这就是响应式编程中的背压问题。总结一下就是: 事件产生的速度大于事件消费的速度,数据堆积,最终造成OOM等异常。
    RxJava2把对背压问题的处理逻辑从Observable中抽取出来产生了新的可观察对象Flowable,它是在Observable基础上做了优化,所以Observable能做的,它都能做,但是加了背压支持和其他的逻辑处理,它的效率比Observable慢得多,所以在需要用到背压的时候再用Flowable,其他时候还是正常使用Observable。网上找了两张图,可以很直观地看出RxJava1和2对背压的改动。


    RxJava1.png
    RxJava2.png

    简单举个例子:

    Flowable.create(new FlowableOnSubscribe<Integer>() {
                @Override
                public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
    
                    //异步订阅时,代表的是 异步缓存池中可放入数据的数量,一开始是128,当产生10个事件而没有消费时,此时这个值是128-10=118。
                    KLog.d(TAG, "异步缓存池中可放入数据的数量 = " + emitter.requested());
    
                    // 一共发送4个事件
                    KLog.d(TAG, "发送事件 1");
                    emitter.onNext(1);
                    KLog.d(TAG, "发送事件 2");
                    emitter.onNext(2);
                    KLog.d(TAG, "发送事件 3");
                    emitter.onNext(3);
                    KLog.d(TAG, "发送事件 4");
                    emitter.onNext(4);
                    KLog.d(TAG, "发送事件 onComplete()");
                    emitter.onComplete();
    
                    KLog.d(TAG, "异步缓存池中可放入数据的数量 = " + emitter.requested());
    
    //                //模拟缓存超过128
    //                for (int i = 0;i< 129; i++) {
    //                    Log.d(TAG, "发送了事件" + i);
    //                    emitter.onNext(i);
    //                }
    //                emitter.onComplete();
    //
                }
            }, BackpressureStrategy.ERROR)  //缓存区超过128,直接抛出异常
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Subscriber<Integer>() {
                        @Override
                        public void onSubscribe(Subscription s) {
                            // 在异步订阅情况下,一定要调用request,否则下流不接收事件
                            // 只接收多少个事件
                            s.request(3);
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            KLog.d(TAG, "接收到了事件" + integer);
                        }
    
                        @Override
                        public void onError(Throwable t) {
                            KLog.d(TAG, "onError:", t);
                        }
    
                        @Override
                        public void onComplete() {
                            KLog.d(TAG, "onComplete");
                        }
                    });
    

    可以看到Flowable和Subscriber的使用方式和之前的Observable和Observer极其类似,不同点如下:
    1、create方法中多了一个BackpressureStrategy类型的参数。
    2、Flowable发射数据时,使用FlowableEmitter,而Observable用的是ObservableEmitter。
    3、Subscriber中,方法onSubscribe回调的参数不是Disposable而是Subscription。
    下面对这三点进行说明。

    BackpressureStrategy
    缓存策略,也既是当缓存区存满、上流仍然继续发送下事件时,该如何处理的策略。默认缓存区大小是128(与Flowable的buffersize大小有关)。具体有哪些策略如下:
    BackpressureStrategy.ERROR: 直接抛异常。
    BackpressureStrategy.MISSING: 友好提示:缓存区满了。
    BackpressureStrategy.BUFFER: 将缓存区设为无限大。
    BackpressureStrategy.DROP: 超过缓存区大小128的事件丢弃。
    BackpressureStrategy.LATEST: 只保存最后事件,超过缓存区大小128的事件丢弃。

    Subscription
    Subscription比Disposable多了一个方法request()。它的作用是告诉上游,下游需要多少数据,如果不设置request默认是0。比如设置为3,那么超过范围之外的数据就不接收了。如果多次调用request(),会累加。

    FlowableEmitter
    它比ObservableEmitter多了一个方法requested(),这个方法返回的是异步缓存池中可放入数据的数量,比如一开始是128,当产生10个事件而没有消费时,此时这个值是128-10=118。

    Tips:不管Subscription.request(xx)设置了什么值,FlowableEmitter都会发送事件的,发送了不接收的就放入缓存里。背压在异步订阅中才有用,如果是同步订阅,是不会有缓存池的。

    5.具体的使用场景

    实际项目中,我只用来做过验证码倒计时、网络请求,RxBus。虽然了解了一些操作符,但网上的教程讲解这些操作符的时候都是以简单的例子来讲解的,具体的应用场景还是比较玄乎。

    1)RxBus
    EventBus是一个基于发布/订阅的事件总线,它简化了组件之间的通信操作。而这些RxBus都能做,所以用RxBus替换EventBus,在RxJava1就已经提出来了,如果自己的事件发送的要求不高,可以自己封装一个RxBus使用。17年的时候找的一篇文章 Android 用RxJava模拟一个EventBus ———RxBus,我在这基础上稍微改动了一下,但是没有做粘性事件。
    原理简单来说就是找个容器装所有的观察者,当有某个事件产生时,找到所有需要这个事件的观察者,向它们发送事件。这个是否发送事件的依据,其实就是看观察者要什么样的class,比如订阅String类型的,那么Integer类型的事件就不会发给它。

    两个新知识:
    1)CompositeDisposable
    一个disposable的容器,可以容纳多个disposable
    2)Subject
    Subject可以同时代表 Observer 和 Observable,允许从数据源中多次发送结果给多个观察者。

    Subject 类别 简述
    AsyncSubject 只有当 Subject 调用 onComplete 方法时,才会将 Subject 中的最后一个事件传递给所有的 Observer。
    BehaviorSubject 当观察者订阅BehaviorSubject时,它开始发射原始Observable最近发射的数据(如果此时还没有收到任何数据,它会发射一个默认值),然后继续发射其它任何来自原始Observable的数据。然而,如果原始的Observable因为发生了一个错误而终止,BehaviorSubject将不会发射任何数据,只是简单的向前传递这个错误通知。Rxlifecycle2库用的就是这个。
    PublishSubject 不会改变事件的发送顺序,在已经发送了一部分事件之后注册的 Observer 不会收到之前发送的事件。RxBus用的这个。
    ReplaySubject 无论什么时候注册 Observer 都可以接收到任何时候通过该 Observable 发射的事件。 RxBus粘性事件可用这个。
    UnicastSubject 只允许一个 Observer 进行监听,在该 Observer 注册之前会将发射的所有的事件放进一个队列中, 并在 Observer 注册的时候一起通知给它。
    public class RxBus {
    
        /**
         * CompositeDisposable定义:
         * 一个disposable的容器,可以容纳多个disposable,添加和去除的复杂度为O(1)
         *
         * 此处使用目的:
         * 是为了一个订阅者能够对应多个Disposable,在需要的时候调用 Disposable 的 dispose()取消订阅。
         *
         * 举个例子:
         * XXActivity,订阅了事件A和事件B,关闭时要取消订阅。
         * 那么只需在mSubscriptionMap里找到key为XXActivity的Value(CompositeDisposable),再取出Disposable
         * 取消订阅即可。
         *
         * 如果不用的话,就要为每一个Activity写一个容器去保存它的订阅的事件了。
         * */
        private HashMap<String, CompositeDisposable> mSubscriptionMap;
        private static volatile RxBus mRxBus;
        private final Subject<Object> mSubject;
    
        public static RxBus getIntanceBus(){
            if (mRxBus==null){
                synchronized (RxBus.class){
                    if(mRxBus==null){
                        mRxBus = new RxBus();
                    }
                }
            }
            return mRxBus;
        }
    
        
        /**
         * 正常订阅可用PublishSubject、黏性事件可用ReplaySubject。
         */
        private RxBus(){
            mSubject = PublishSubject.create().toSerialized();
        }
    
        /**
         * 一个默认的订阅方法
         * @param <T>
         * @param type   过滤,只返回特定数据类型的数据
         * @param next   next()事件,正常接收的事件
         * @param error  error()事件,错误接收的事件
         * @return
         */
        public <T> Disposable doSubscribe(Class<T> type, Consumer<T> next, Consumer<Throwable> error){
            return getObservable(type)
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(next,error);
        }
    
        /**
         * 返回指定类型的带背压的Flowable实例
         * 这里选定背压模式是 BackpressureStrategy.BUFFER,缓存区无限大
         * @param <T>
         * @param type  过滤,只返回特定数据类型的数据
         * @return
         */
        public <T> Flowable<T> getObservable(Class<T> type){
            return mSubject.toFlowable(BackpressureStrategy.BUFFER).ofType(type);
        }
    
        /**
         * 发送事件
         * @param o 事件
         */
        public void post(Object o){
            mSubject.onNext(o);
        }
    
    
        /**
         * 判断是否已有观察者订阅
         *
         * @return
         */
        public boolean hasObservers() {
            return mSubject.hasObservers();
        }
    
        /**
         * 保存订阅后的disposable,取消订阅的时候要用
         * @param o       订阅的目标
         * @param disposable
         */
        public void addSubscription(Object o, Disposable disposable) {
            if (mSubscriptionMap == null) {
                mSubscriptionMap = new HashMap<>();
            }
            //这里key值取订阅目标的实体名称(com.xx.xx.xxx),不是底层类检称
            String key = o.getClass().getName();
            //disposable 放到对应的 CompositeDisposable 里
            if (mSubscriptionMap.get(key) != null) {
                mSubscriptionMap.get(key).add(disposable);
            } else {
                CompositeDisposable disposables = new CompositeDisposable();
                disposables.add(disposable);
                mSubscriptionMap.put(key, disposables);
            }
        }
    
        /**
         * 取消订阅
         * @param o
         */
        public void unSubscribe(Object o) {
            if (mSubscriptionMap == null) {
                return;
            }
            String key = o.getClass().getName();
            if (!mSubscriptionMap.containsKey(key)){
                return;
            }
            if (mSubscriptionMap.get(key) != null) {
                mSubscriptionMap.get(key).dispose();
            }
            mSubscriptionMap.remove(key);
        }
    
        /********************************* 为RxEventBean 封装  **********************************/
        /**
         * 使用EventBus时,为了方便查找,一般都会封装 EventBean(int code,Object content)
         * 但是所有的订阅者都是订阅的这个类型,所以要自己做判断做类型转换。
         *
         * 这里写死事件是RxEventBean,且错误处理一般不处理,只用处理onNext即可。
         * @param context
         * @param action
         * */
        public void register(Context context, Consumer<RxEventBean> action) {
            Disposable disposable = RxBus.getIntanceBus().doSubscribe(RxEventBean.class, action,
                    throwable -> KLog.e("RxEventBean onError()", throwable.toString()));
            RxBus.getIntanceBus().addSubscription(context,disposable);
        }
    
    
        /**
         * 发送RxEventBean 事件
         * @param code     code,用于判断
         * @param content  内容,接收后做类型转换
         */
        public void post(int code, Object content){
            RxEventBean<Object> event = new RxEventBean<>();
            event.code = code;
            event.content = content;
            post(event);
        }
    }
    

    2)验证码倒计时
    其实关键代码就是用intervalRange()操作符。

    public class RxCodeHelper {
    
        private Context mContext;
        private TextView codeBt;
        private Disposable mDisposable;
    
        /**
         * 构造函数
         * @param mContext 上下文
         * @param codeBt   验证码的button
         */
        public RxCodeHelper(Context mContext, TextView codeBt) {
            this.mContext = mContext;
            this.codeBt = codeBt;
        }
    
        /**
         * 开启倒计时
         */
        public void start(){
            codeBt.setEnabled(false);
            //从0开始,走60个数,延迟是0s,周期为1次。
            Observable.intervalRange(0,60,0,1, TimeUnit.SECONDS)
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Observer<Long>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                            mDisposable = d;
                        }
    
                        @Override
                        public void onNext(Long aLong) {
                            codeBt.setText((60 - aLong) + "s后可重发");
                        }
    
                        @Override
                        public void onError(Throwable e) {
    
                        }
    
                        @Override
                        public void onComplete() {
                            //倒计时完毕置为可点击状态
                            codeBt.setEnabled(true);
                            codeBt.setText("获取验证码");
                        }
                    });
        }
    
        /**
         * 请求失败时,重置状态
         * 取消倒计时的订阅事件
         */
        public void reset(){
            if(mDisposable!=null){
                mDisposable.dispose();
            }
            codeBt.setEnabled(true);
            codeBt.setText("获取验证码");
        }
    
        /**
         * 界面销毁时,取消倒计时订阅事件
         */
        public void stop(){
            if(mDisposable!=null){
                mDisposable.dispose();
            }
        }
    
    }
    

    3)RxLifecycle
    目的:解决RxJava的内存泄漏问题。
    用法:
    bindUntilEvent(@NonNull ActivityEvent event)——绑定指定的生命周期,在指定生命周期时取消订阅。
    bindToLifecycle() ——绑定生命周期,取消订阅策略可看源码。以Activity为例,可看RxLifecycleAndroid 下的 ACTIVITY_LIFECYCLE, 实际上在onCreate()订阅的,在onDestroy()取消订阅;在onResume()订阅的,在onPause()取消订阅。

    知识点:
    compose():
    将一种类型的Observable转换成另一种类型的Observable,保证调用的链式结构。
    LifecycleTransformer:
    LifecycleTransformer实现了各种Transformer接口,能够将一个Observable/Flowable/Single/Completable/Maybe对象转换成另一个 Observable/Flowable/Single/Completable/Maybe对象。正好配合上文的compose操作符,使用在链式调用中。

    举例在网络请求时绑定生命周期,mvp模式。
    Activity继承了RxFragmentActivity后,BaseView接口里新增两个方法,其他View层接口继承它。

    public interface BaseView {
    //为了让 IView 可以调用 RxLifeCycle的生命周期绑定
    <T> LifecycleTransformer<T> bindToLifecycle();
    <T> LifecycleTransformer<T> bindUntilEvent(@NonNull ActivityEvent event);
    }
    

    在P层使用,此处模拟网络请求,3s后回调

    Observable.timer(3, TimeUnit.SECONDS)
    .subscribeOn(Schedulers.io()) //订阅在io线程
    .observeOn(AndroidSchedulers.mainThread()) //回调在主线程
    .compose(mView.bindUntilEvent(ActivityEvent.DESTROY)) //指定在onDestroy销毁
    // .compose(mView.bindToLifecycle()) //取消订阅交由RxLifeCycle来判断
    .subscribe(new Observer<Long>() {
    @Override
    public void onSubscribe(Disposable d) {
    mView.showLoading();
    }
    
    @Override
    public void onNext(Long aLong) {
    KLog.d("onTest1Success()回调成功");
    mView.onTest1Success("onTest1 请求成功");
    }
    
    @Override
    public void onError(Throwable e) {
    mView.dismissLoading();
    }
    
    @Override
    public void onComplete() {
    mView.dismissLoading();
    }
    });
    

    一般使用,可以用compose封装一下,如下:

    /**
         * 统一线程处理,且绑定生命周期
         * 用法: xxx .compose(RxUtil.rxSchedulerHelper(mView))
         * @param view
         * @param <T>
         * @return
         */
        public static <T> ObservableTransformer<T,T> rxSchedulerHelper(BaseView view){
            return new ObservableTransformer<T,T>() {
                @Override
                public ObservableSource<T> apply(@NonNull Observable<T> upstream) {
                    return upstream
                            .subscribeOn(Schedulers.io())    //订阅在io线程
                            .unsubscribeOn(Schedulers.io())  //取消订阅在io线程,为啥要这个,不太清楚
                            .observeOn(AndroidSchedulers.mainThread()) //回调在主线程
                            .compose(view.bindToLifecycle());  //绑定生命周期
                }
            };
        }
    

    结尾

    在网上找了一些操作符的简单例子和应用场景,写了一个Demo,看操作符概述的时候,结合代码运行结果来看更容易理解,有兴趣的可以看看。后续会在这个项目里继续更新操作符和应用场景。
    Github地址:
    https://github.com/Dengszzzzz/DRxJavaSummary

    参考

    给 Android 开发者的 RxJava 详解
    这可能是最好的RxJava 2.x 入门教程
    Android拾萃 - RxJava2操作符汇总
    Rxjava2入门教程五:Flowable背压支持——对Flowable最全面而详细的讲解
    Android Rxjava:这是一篇 清晰 & 易懂的Rxjava 入门教程
    实际应用场景
    Android 用RxJava模拟一个EventBus ———RxBus
    RxLifecycle详细解析
    ...

    相关文章

      网友评论

        本文标题:RxJava2学习总结——操作符和应用场景

        本文链接:https://www.haomeiwen.com/subject/nlmglctx.html