美文网首页
大话RxJava 的基本及应用

大话RxJava 的基本及应用

作者: DorisSunny | 来源:发表于2017-03-17 21:09 被阅读0次

    RxJava

    RxJavaGithub的wiki的介绍:

    RxJavais a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.

    大概意思是RxJavaJava VM上一个使用可观测序列来组成的一个异步的、基于事件的库。

    从这么一句话,看来有两个关键点: ① 异步 ② 事件 ③可观测 。 概括性的话总是那么抽象,看了这句话,还是不清楚,RxJava到底是干什么的。一个亘古不变的道理, 实践出真理。只有你用了,你才知道它是什么。

    在我看来,RxJava就是一个异步的扩展的观察者模式。区别于AsynTask以及Handler的优点在于代码简洁,链式调用。但是注意啦,我这里说的代码简洁,可不是说它的代码量少,而是因为它逻辑清晰简洁,当你回顾代码的时候,可以方便的看出当初写的逻辑。而不像Handler,要找到发送message的地方,以及接受message的地方, 更有各种缩进让你一番好找,不方便代码的回溯。而RxJava的链式调用,使整体代码结构看起来很清晰。

    RxJava 的神奇之处不仅在于异步的观察者模式,更强大的在于它的操作符,它提供了大量的操作符来帮助你更好的解决复杂的逻辑。结合Retrofit可以很方便的嵌套请求网络,解决类似当你要获取服务器上面的数据的时候,首先要登录,而登录之前要先获取服务器端的一个认证挑战字。代码如下:

    InnerServerApi.requestLoginCode(mLoginCodeUrl, mAccount).subscribeOn(Schedulers.io()).observeOn(Schedulers.io())
                    .flatMap(new Func1<LoginCode, Observable<AAAData>>() {
                        @Override
                        public Observable<AAAData> call(final LoginCode loginCode) {
                            Log.d(TAG, loginCode.getLoginCode() + "HAHAHA");
                            return InnerServerApi.requestLoginData(mAAALoginUrl, loginCode.getLoginCode(), mAccount, mPassword);
                        }
                    }).subscribeOn(Schedulers.io()).subscribe(new Subscriber<AAAData>() {
                        @Override
                        public void onCompleted() {
                            Logger.i(TAG, "onCompleted");
                        }
                        
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "login:onFailure");
                        }
                        
                        @Override
                        public void onNext(AAAData loginData) {
                            Log.d(TAG, "login:onSuccess:");
                        }
                        
                    });
    

    我们不说代码量是否多了,反正逻辑是清晰了,这才是我们追求的。 如果采用AsyncTask避免不了的CallBack嵌套,嵌套到回溯起来,自己一时半会都反应不过来呢。 这就是RxJava迷人地方之一,也是最迷人的操作符。下面,我会讲一些比较通用的操作符。更多操作符也可以查看API,里面有图片为你展示事件序列经过操作符之后的流向。

    下面就开始学习基本概念啦。。。


    RxJava的四个关键

    1. Observable 被观察者,事件的发出者。
    2. Observer 观察者,被动观察事件的发出并做相应的处理。 通常我们都是用SubscriberSubscriber是继承自Observer的一个类,其实就算你实例化的是一个Observer对象,底层也是用Subscriber包裹了这个Observer对象来实现相应的逻辑的,所以可以直接使用Subscriber
    3. event 事件 被观察者发出的事件
    4. 订阅 (Subscribe),观察者订阅被观察者,即观察者监听被观察者的动态,一旦被观察者发出了什么动作就传送到了观察者那里,举个例子,就像我们关注某个微信公众号,关注这个动作就相当于 订阅, 然后微信公众号有新的消息推送,我们就可以看到。我们就是观察者,公众号就是被我们观察的对象。

    通过这四个关键,更加说明了它是一个异步的扩展的观察者模式。

    RxJava 的家长里短

    RxJava的功能强大在于通过操作符变换,线程调度灵活的实现各种事件序列的处理,我觉得Observable的每一个方法都可以描述成一个操作符,下面就用操作符来说。

    RxJava的观察者模式如图:

    RxJava1.png

    简单使用模板

    使用RxJava 基本的步骤是下面三步:

    创建被观察者

    Observable 创建一个Observable的方法有很多个。其中一个就是create,有关于更多的创建操作符,创建型操作符 点这个可以了解到。

    Observable  observable =  Observable.create(new Observable.OnSubscribe<Integer>() {
              @Override
             public void call(Subscriber<? super Integer> subscriber) {
                  try {
                      subscriber.onNext(3);
                      subscriber.onNext(4);
                     subscriber.onCompleted();
                  } catch (Exception e) {
                      subscriber.onError(e);
                 }
              }
         })
    
    

    call 方法里面是被观察者订阅之后要向观察者发出的事件。

    创建观察者

    Subscriber subscriber = new Subscriber<Integer>() {
                @Override
                public void onCompleted() {
    
                }
    
               @Override
               public void onError(Throwable e) {
                    Log.i(TAG, e.toString());
                    }
    
                @Override
                public void onNext(Integer integer) {
                    Log.i(TAG, integer + "");
                }
            }
    

    onCompleted 是整个事件序列结束之后调用的方法。在一个正确运行的事件序列中,onCompleted()onError() 有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted()onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个,在自己创建的Observable里面应该注意控制。
    onError 是当事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。
    onNext,相当于监听按钮点击事件里面的onClick,即接收到被观察者发出的事件时,观察者做处理的函数。

    其实还有一个onStart()方法,不过这个是Subscriber中新添加的,Observer里面是没有这个方法的。在 subscribe 刚开始,而事件还未发送之前被调用,可以用于做一些准备工作,例如数据的清零或重置。这是一个可选方法,默认情况下它的实现为空。需要注意的是,如果对准备工作的线程有要求(例如弹出一个显示进度的对话框,这必须在主线程执行), onStart() 就不适用了,因为它总是在 subscribe 所发生的线程被调用,而不能指定线程。要在指定的线程来做准备工作,可以使用 doOnSubscribe() 方法

    订阅

      //被观察者,主动去订阅观察者,为了实现链式调用就设计成观察者主动订阅观察者。
      observeable.subscribe(subscriber);
    

    这样就完成了一个事件的订阅的流程。

    线程控制Scheduler

    在RxJava中我们可以通过Scheduler来实现多线程,指定事件发送的线程,以及事件处理的线程。事件发送就是Observable的call方法执行的代码,通过subscribeOn我们可以指定call方法执行的线程。事件处理即Subscriber事件处理方法执行的线程,通过observerOn可以指定事件处理的线程。不过调用subscribeOnobserverOn切换线程,没有再做切换,则此次事件一直处于最后一次切换的线程。

    需要注意的是,①observerOn可以多次调用并且有效,但SchedulerOn 虽然也可以多次调用,但是只有第一次是有效的。②默认的Observable的产生事件,以及通知事件给Observer是在同一个线程里面的,这个线程就是subscribe所在的线程。③有些操作符是有默认的调度器的,可以查看官方文档。比如timer默认就是computation Scheduler.

    RxJava内置Scheduler

    • Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。
    • Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
    • Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
    • Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
    • AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行,如果希望Observer的事件处理发生在主线程的话,就要调用observerOn(AndroidSchedulers.mainThread()).

    常用的几个操作符

    • 创建型操作符

    from

    github介绍:

    from() — convert an Iterable, a Future, or an Array into an Observable

    from(java.lang.Iterable<? extends T> iterable) /from(T[] array),Observable可用于将传入的数组或者集合中拆分成具体的对象,分发下去。

     ArrayList<Integer> list = new ArrayList<>();
            list.add(1);
            list.add(2);
            list.add(0);
            list.add(4);
            Observable.from(list); //观察者依次接收到 1,2,0,4.
    

    just

    just() — convert an object or several objects into an Observable that emits that object or those objects

    just(T t1, T t2··· T tn),其实just与from 差不多,也是将t1, t2··· tn依次分发下去。

    Observable.just("Tom" , "John", "Mary");
    

    timer

    timer() — create an Observable that emits a single item after a given delay

    Paste_Image.png

    常用的timer(long delay, java.util.concurrent.TimeUnit unit), delay延迟的时间,unit延迟的时间的单位。用于定时在一定时间之后,再分发事件,观察者接收到信号,做相应处理。相当于Handler里面的延迟发送。

            Observable.timer(1000, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.io()).unsubscribeOn(Schedulers.io())
                    .subscribe(new Subscriber<Long>() {
                        @Override
                        public void onCompleted() {
    
                        }
    
                        @Override
                        public void onError(Throwable e) {
    
                        }
    
                        @Override
                        public void onNext(Long aLong) {
                            mThread = Thread.currentThread();
                            Log.i(TAG, "name2" +mThread.getName() + "pid" + mThread.getId());
                        }
                    });
    

    interval

    interval( ) — create an Observable that emits a sequence of integers spaced by a given time interval

    interval(long interval, java.util.concurrent.TimeUnit unit) ,相当于Timer定时器,定时分发事件。与timer的区别在于可以定时的重复分发事件。而timer只操作一次。repeatWhen跟interval的作用是一样的。

    • 过滤型操作符

    filter

    filter() — filter items emitted by an Observable

    Paste_Image.png

    可以过滤事件,只有符合条件的事件才能被继续分发下去。

        Observable.just(1,2,0,3).filter(new Func1<Integer, Boolean>() {
                @Override
                public Boolean call(Integer integer) {
                //过滤掉等于0的事件
                    if(integer != 0) {
                      return integer;
                    }
                    return false;
                }
            })
    

    last

    last() — emit only the last item emitted by an Observable

    只能发整个事件序列的最后一个事件。同理,first(),只分发整个事件序列的第一个事件。

    Paste_Image.png
    • 强大的转换操作符

    RxJava中最强大的就是转换型操作符了,可以对Observable进行一些转换,做更多的操作,实现嵌套之类的逻辑。在我看来转换操作符的作用就是将一个Observable转换成另外一个Observable,我们这里假设只有两个Observable,当订阅之后,第一个Observable的call方法首先被调用,即第一个Observable发送事件序列,第二个Observable对第一个Observable发送出来的事件做处理(比如filiter对事件进行过滤)或者对第一个Observable返回的数据类型进行处理转换成另外一个对象或者Observable(比如Map以及flatMap)然后第二个Observable开始发送事件序列,最后在Subscribe里面进行处理。在官方的API里面有形象的"弹珠图"来演示事件发送的发展。可以点这里去看 ===> RxJava API

    map

    map将一个对象转换成另外一个对象分发下去,返回的是一个Observable对象。

    Paste_Image.png
    Observable.just(1,2,3,4).map(new Func1<Integer, String>() {
                            @Override
                            public String call(Integer i) {
                                Log.i(TAG, "integer" +i);
                                return s;
                            }
                        });
    

    上述代码就是将Integer转换成String对象分发下去。

    flatMap

    flatMap可以网络请求的嵌套,比如请求服务器要先申请到挑战字 再登录服务器,代码就可以如下使用:

    Paste_Image.png
     InnerServerApi.requestLoginCode(mLoginCodeUrl, getSN()).subscribeOn(Schedulers.io()).observeOn(Schedulers.io())
                    .flatMap(new Func1<LoginCode, Observable<AAAData>>() {
                        @Override
                        public Observable<AAAData> call(LoginCode loginCode) {
                            Log.d(TAG, loginCode.getLoginCode());
                            String displayId = android.os.Build.DISPLAY;
                   return InnerServerApi.requestLoginData(mAAALoginUrl,loginCode.getLoginCode(), "doris", mPassword);
                        }
                        // if error, retry ,3 times.
                    }).subscribeOn(Schedulers.io()).retry(3).subscribe(new Subscriber<AAAData>() {
                @Override
                public void onCompleted() {
                    Logger.i(TAG, "onCompleted");
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.e(TAG, e.toString());
                    Log.i(TAG, "login error");
                }
    
                @Override
                public void onNext(AAAData loginData) {
                    Log.i(TAG, "login Success");
                }
    
            });
    

    mapflatMap 的区别在于map 是一对一,而且map是直接返回一个对象,而flatMap则是返回Observable,用于分发事件。flatMap的一对多的体现看下面这段代码:

    Observable.from(folders)
        .flatMap(new Func1<File, Observable<File>>() {
            @Override
            public Observable<File> call(File file) {
                return Observable.from(file.listFiles());
            }
        })
    

    这段代码,先通过第一个Observable把一个个目录发送出去,然后通过flatMap再把目录中的文件一个个分发下去。这是map做不到的。

    compose

    Observable进行一个整体的变化,flatMap只是对接收到的事件一个一个的转换,而compose是对整个Observable做一些处理。

    //自定义一个转换器
    final Observable.Transformer    schedulersTransformer   = new Observable.Transformer() {
                            @Override
                            public Object call(Object observable) {
                                return ((Observable) observable).subscribeOn(Schedulers.io()).unsubscribeOn(Schedulers.io())
                                        .observeOn(AndroidSchedulers.mainThread());
                            }
                        };
                        
    //这样就对observable进行了上面call方法里面的操作。
    observable.compose(schedulersTransformer); 
    
    1. 还有很多zipWithmerge等这些合并Observable,可以看这里Combing操作符
    • 错误处理操作符

    retry

    retry(long count) 常用的retry方法,当发生错误时,retry count次,比如我们网络请求的时候失败了,我们可以重新请求三次,retry(3).

    Paste_Image.png

    onErrorResumeNext

    当发生错误的时候,调用这个操作符里面的方法,将当前的Observable转换成另一个Observable继续发送事件。应用场景,当我们请求当前服务器失败的时候,可以选择一个备用的服务器地址重新请求数据。

    Paste_Image.png

    RxJava 与 Retrofit 结合实例

    RetrofitRxJava结合可以更好的完成网络请求。不了解Retrofit的,赶紧去学习吧,Retrofit的底层使用OkHttp完成网络请求。下面给大家讲讲,我用到的网络请求的例子以及踩过的坑。

    • 从服务器获取文件,失败的话重试

    /**  downloadFile是返回的是一个Observable对象,picUrl是要下载的图片url,我将这个事件发送在IO线程中执行。
    retry(3),如果失败的话,重试三次,下载文件
    map 将得到的数据写入到文件中。
    **/
    downloadFile(picurl).subscribeOn(Schedulers.io()).unsubscribeOn(Schedulers.io()).retry(3).filter(new Func1<String, Boolean>() {
                @Override
                public Boolean call(String picUrl) {
                                    //调用filter,过滤掉null的Url,如果picUrl是空的话,那么就不下载
                    return !TextUtils.isEmpty(picUrl) && !picUrl.equals("null");
                }
            }).map(new Func1<ResponseBody, String>() {
                @Override
                public String call(ResponseBody responseBody) {
                    return writeToFile(responseBody, mFileName);
                }
            }).subscribe(new Subscriber<String>() {
                @Override
                public void onCompleted() {
                    Logger.i(TAG, "testAAA downloadCompleted");
                }
                
                @Override
                public void onError(Throwable e) {
                    Logger.i(TAG, "testAAA downloadFail" + e.toString());
                }
                
                @Override
                public void onNext(String   s) {
                                   //返回 下载的文件存放路径
                    Logger.i(TAG, "onNext" + s);
                                    
                }
            });
    

    在上述代码中,通过filter过滤掉了不合理的url,其实也可以添加一个判断是否是一个Http网址的判断,你可以自己试试。并且通过map直接将下载到的数据,存储到了文件当中,而且如果下载过程中出现类似的 socket timeout错误,可以通过,retry重新请求,一个简单的链式调用就下载文件并保存的逻辑写好了。 So easy! 必须给RxJava怒赞!!!

    • 嵌套网络请求

    InnerServerApi.requestLoginCode(mLoginCodeUrl, mAccount).subscribeOn(Schedulers.io()).observeOn(Schedulers.io())
                    .flatMap(new Func1<LoginCode, Observable<AAAData>>() {
                        @Override
                        public Observable<AAAData> call(final LoginCode loginCode) {
                            Log.d(TAG, loginCode.getLoginCode() + "HAHAHA");
                            return InnerServerApi.requestLoginData(mAAALoginUrl, loginCode.getLoginCode(), mAccount, mPassword);
                        }
                    }).subscribeOn(Schedulers.io()).subscribe(new Subscriber<AAAData>() {
                        @Override
                        public void onCompleted() {
                            Logger.i(TAG, "onCompleted");
                        }
                        
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "login:onFailure");
                        }
                        
                        @Override
                        public void onNext(AAAData loginData) {
                            Log.d(TAG, "login:onSuccess:");
                        }
                        
                    });
    

    解释一下,requestLoginCode是通过Retrofit定义的一个返回Observable对象的一个方法,获取我要登录服务器需要用到的 验证字段, 得到验证字段之后,通过flatMap直接通过得到的LoginCode调用登录方法requestLoginDatarequestLoginData返回的也是一个Observable对象,然后订阅,最后获取到登录返回的数据。这样说可能大家不明白,我说说我这个代码的应用场景,你要登录一个服务器,首先你要向他申请一个跟你的账号相关的 验证码,然后再通过验证码,以及账号密码登录。 就想你要吃苹果,那么你要先买个水果刀,然后才能切水果最后吃到水果。 大家可以回想一下,没有RxJava之前,利用AsyncTask我们都是如何完成这个逻辑的,首先获取到LoginCode,然后再Callback的onSuccess方法中再调用一个AsyncTask请求,然后就出现了很多迷之缩进,相比之下,RxJava简直是飞流直下,逻辑明了清晰。不相信的机智的你,可以写一段对比一下。 我这里就不贴出来的。

    • 错误处理 onErrorResumeNext

    在应用开发中,会遇到这样的需求,有几个备份的网络地址,当你请求第一个网络地址不成功的时候,你想要用备份的网络地址。RxJava可以很方便的,帮你捕捉到错误,并且用备份的地址,重新请求。 就获取网络时间的代码,来举个例子吧。

        Observable.create(new Observable.OnSubscribe<Date>() {
                @Override
                public void call(Subscriber<? super Date> subscriber) {
                    try {
                                          //第一次从百度上面获取网络时间
                        mTimeUrl = "http://www.baidu.com/";
                        Logger.d(TAG, "mTimeUrl" + mTimeUrl);
                        Date date = GetNetworkTime.getWebsiteDate();
                        Logger.d(TAG, "call date" + date);
                        subscriber.onNext(date);
                        subscriber.onCompleted();
                    } catch (IOException e) {
                                          //如果网络超时则调用onError,触发onErrorResumeNext通过备份的地址 http://www.beijing-time.org 获取网络时间。
                        subscriber.onError(e);
                        e.printStackTrace();
                    }
                }
            }).subscribeOn(Schedulers.io()).unsubscribeOn(Schedulers.io()).onErrorResumeNext(new Func1<Throwable, Observable<? extends Date>>() {
                @Override
                public Observable<? extends Date> call(Throwable throwable) {
                    return Observable.create(new Observable.OnSubscribe<Date>() {
                        @Override
                        public void call(Subscriber<? super Date> subscriber) {
    //通过http://www.beijing-time.org 请求时间,如果这次访问失败,则获取网络时间失败 
                            mTimeUrl = "http://www.beijing-time.org";
                            try {
                                Date date = GetNetworkTime.getWebsiteDate();
                                Logger.d(TAG, "call1" + date);
                                subscriber.onNext(date);
                                subscriber.onCompleted();
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                    });
                        }
                    });
                }
            }).subscribe(new Observer<Date>() {
                @Override
                public void onCompleted() {
                    Logger.d(TAG, "onCompleted");
                }
                
                @Override
                public void onError(Throwable e) {
                }
                
                @Override
                public void onNext(Date date) {
                    Logger.d(TAG, "call3" + date);
                    netWorkCallback.doSometing(date);
                }
            });
    

    在上述代码中,当第一个Observable发送获取网络时间的时候超时的时候,我们自己调用OnError方法,从而onErrorResumeNext拦截到错误并且将当前的Observable转换成另外一个Observable,通过这个新的Observable继续发送新的事件,这里的新的事件就是通过备份的url,再次获取网络是事件。这个就可以完成多个地址切换请求网络数据,当第一个地址不成功,换第二个地址。这在做应用开发时也经常用到。我觉得挺实用的,当然onErrorResumeNext的应用场景还有很多,就等你慢慢发掘吧。

    总结

    1. 在用RxJava的时候,要注意当前的操作是哪个线程。注意不要把应该在UI线程操作的放在了子线程,也不要把大量的操作放在主线程。

    2. Observable 发送一串事件序列的时候,如果其中有一个出错了,那么接下来的事件,观察者都不会接收到。即 OnError是整个事件结束的标志,如果出错了,并且没有做什么出错处理,那么就直接调用了OnError,结束整个事件。 所以这意味着,你利用RxJava发送了一个下载十个文件的事件序列时,如果其中有一个文件下载失败,其他的文件就停止下载了。这个时候你只能用,for循环,然后一个一个下载,方便控制。

    参考链接

    1. 给 Android 开发者的 RxJava 详解

    2. 官方文档

    3. RxJava API

    最后的最后,我说的哪里不对的或者有什么问题,欢迎留言,大家共进步。O

    相关文章

      网友评论

          本文标题:大话RxJava 的基本及应用

          本文链接:https://www.haomeiwen.com/subject/iqzjnttx.html