美文网首页
Rxjava使用

Rxjava使用

作者: 满满正能量_617a | 来源:发表于2017-05-19 18:51 被阅读0次

    前言

    去年就开始使用Rxjava开发,由于当时项目赶着急,没有详细去了解这一块(项目框架被老大搭建好直接使用,所以就偷了点懒),发现Rxjava与Retrofic配合使用的时候特别的简洁,代码变干净了许多;现在有空了需详细地把这一部分知识点补上来,发现其实Rxjava入门还是听容易的。

    定义

    本质很简单: 异步(库)

    特点

    逻辑的简洁,代码易读,而非代码量少;

    链式结构,可以多次请求,可以直接将前一次服务器返回结果作为作为下一次请求的参数,经过subscribe发送出去;也可以通过observeon()设置数据回调执行的线程。

    AsyncTask 、 Handler都是异步操作方法,但是Rxjava与它们相比,优势在于随着程序逻辑变得越来越复杂,它依然能够保持简洁。

    API 介绍和原理简析

    Rxjava的异步实现,是通过一种扩展的观察者模式来实现的。

    观察者模式

    观察者对被观察者的某种高度敏感,当被观察者发生改变的时候,观察者就会观察到这样的变化,并且做出相应的响应。

    1. Rxjava的观察者模式

    基本概念:可观察者,即被观察者(数据发射源):Observable、 观察者:Observer(同subscriber可互转) 、 订阅(即注册):subscribe ()、事件。Observable 和 Observer 通过 subscribe() 方法实现订阅关系(注册),从而 Observable 可以在需要的时候发出事件来通知 Observer。

    订阅者的回调方法有:

    *  onNext(T item):普通回调方法,Observable调用这个方法发射数据,方法的参数就是       Observable发射的数据,这个方法可被调用多次;

    *  onCompleted(): 正常终止,如果没有遇到错误,Observable在最后一次调用onNext之后调用此方法。

    *  onError():当Observable遇到错误或者无法返回期望的数据时会调用这个方法,这个调用会终止Observable,后续不会再调用onNext和onCompleted,onError方法的参数是抛出的异常。

    在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个,如图1

    图1

    2. 基本实现:

    * Observer(Subscriber):观察者(数据接收方)

    它决定事件触发时将有怎样的行为,Observer是Rxjava的常用接口,同时RxJava 还内置了一个实现了 Observer 的抽象类:Subscriber。 Subscriber 对 Observer 接口进行了一些扩展,在 RxJava 的 subscribe 过程中,Observer 也总是会先被转换成一个 Subscriber 再使用,所以只要实现基本功能选择 Observer 和 Subscriber 效果是完全一样的,他们的区别是:

    onStart(): 这是 Subscriber 增加的方法。它会在 subscribe 刚开始,而事件还未发送之前被调用,可以用于做一些准备工作,例如数据的清零或重置工作。

    unsubscribe(): 这是 Subscriber 所实现的另一个接口 Subscription 的方法,用于取消订阅。在这个方法被调用后,Subscriber 将不再接收事件。

    创建一个接收数据的行为:

    图2   接受数据

    * Observable:被观察者(发射数据方)

    它决定什么时候触发事件以及触发怎样的事件。

    使用create创建一个Observable对象:

    图3   发射数据

    * Subscribe :订阅

    创建了 Observable 和 Observer 之后,再用 subscribe() 方法将它们联结起来,就可以工作了。代码形式很简单:observable.subscribe(observer);

    **  扩展内容

    Subscription:Observable调用subscribe( )方法返回的对象,同样有unsubscribe( )方法,可以用来取消订阅事件

    Action0:RxJava中的一个接口,它只有一个无参call()方法,且无返回值,同样还有Action1,Action2...Action9等,Action1封装了含有1个参的call()方法,即call(T t),Action2封装了含有2个参数的call方法,即call(T1 t1,T2 t2),以此类推;

    Func0:与Action0非常相似,也有call()方法,但是它是有返回值的,同样也有Func0、Func1...Func9;

    **  基本用法

    just( ):将为你创建一个Observable并自动为你调用onNext( )发射数据,见图4:

    图4   just()用法

    from( ):遍历集合,接收一至九个参数,发送每个item

    图5   from()用法

    ** 变换操作

    flatmap():flatMap()接收一个Observable的输出(files中的每一个Observable对象)作为输入,然后作为一个新的Observable再发射。

    其中,file为文件数组

    图6   flatmap()用法

    filter():观测序列中只有通过的数据才会被发射

    图7  filter()用法

    ** 操作符

    就是为了解决对Observable对象的变换的问题,用于在Observable和最终的Subscriber之间修改Observable发出的事件。

    map( ) :就是用来把一个事件(原始Observable对象)转换为另一个事件的。

    图8  map()  方法

    3. 线程控制(Scheduler)

    在不指定线程的情况下, RxJava 遵循的是线程不变的原则,即:在哪个线程调用 subscribe(),就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。如果需要切换线程,就需要用到 Scheduler (调度器)。

    1)Scheduler在RxJava 中,Scheduler ——调度器,相当于线程控制器,RxJava 通过它来指定每一段代码应该运行在什么样的线程。RxJava 已经内置了几个 Scheduler ,它们已经适合大多数的使用场景:

    Schedulers.immediate(): 这个调度器允许你立即在当前线程执行你指定的工作。它是timeout(),timeInterval(),以及timestamp()方法默认的调度器。

    Schedulers.newThread(): 总是未任务启用新线程,并在新线程执行操作。

    Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程。

    Schedulers.computation(): 这个是计算工作默认的调度器,它与I/O操作无关。它也是许多RxJava方法的默认调度器:buffer(),debounce(),delay(),interval(),sample(),skip()。

    Schedulers.trampoline( ):当我们想在当前线程执行一个任务时,并不是立即,我们可以用.trampoline()将它入队。这个调度器将会处理它的队列并且按序运行队列中每一个任务。它是repeat()和retry()方法默认的调度器。

    AndroidSchedulers.mainThread():Android 专用,它指定的操作将在 Android 主线程运行。

    有了这几个 Scheduler ,就可以使用 subscribeOn() 和 observeOn() 两个方法来对线程进行控制了。

    * subscribeOn(): 指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程。

    * observeOn(): 指定 Subscriber 所运行在的线程。也叫做事件消费的线程

    2)doOnSubscribe()

    在前面讲 Subscriber 的时候,提到过 Subscriber 的 onStart() 可以用作流程开始前的初始化。然而 onStart() 由于在 subscribe() 发生时就被调用了,如图9,因此不能指定线程,而是只能执行在 subscribe() 被调用时的线程。这就导致如果 onStart() 中含有对线程有要求的代码,将会有线程非法的风险,因为有时你无法预测 subscribe() 将会在什么线程执行。

    图9  

    而与 Subscriber.onStart() 相对应的,有一个方法 Observable.doOnSubscribe() ,他们保持一致的执行线程。它和 Subscriber.onStart() 同样是在 subscribe() 调用后而且在事件发送前执行,但区别在于它可以指定线程。默认情况下, doOnSubscribe() 执行在 subscribe() 发生的线程;而如果在 doOnSubscribe() 之后有 subscribeOn() 的话,它将执行在离它最近的 subscribeOn() 所指定的线程。

    如上,在 doOnSubscribe()的后面跟一个 subscribeOn() ,确保了Subscriber.onStart()与事件发生(被激活)执行在同一线程。

    图10

    RxJava 与Retrofit结合(简单介绍,后面详细介绍)

    首先在模块gradle中添加依赖:

    图11

    Retrofit 把请求封装进 Observable

    定义的Api为:

    图12

    使用实例:在请求结束后调用 onNext() 返回网络返回的MyAttentionListEntity数据类型

    图13

    总结:本文只是针对于android开发简单的介绍了一下Rxjava的使用方法,总之Rxjava功能很强大,Observable可以是一个数据库查询、网络请求,Subscriber用来显示查询结果或者是请求结果。

    参考:http://blog.csdn.net/lzyzsd/article/details/41833541/

    http://gank.io/post/560e15be2dca930e00da1083#toc_4

    相关文章

      网友评论

          本文标题:Rxjava使用

          本文链接:https://www.haomeiwen.com/subject/qsgdxxtx.html