美文网首页
Rxjava使用

Rxjava使用

作者: 满满正能量_617a | 来源:发表于2017-05-19 18:51 被阅读0次

前言

去年就开始使用Rxjava开发,由于当时项目赶着急,没有详细去了解这一块(项目框架被老大搭建好直接使用,所以就偷了点懒),发现Rxjava与Retrofic配合使用的时候特别的简洁,代码变干净了许多;现在有空了需详细地把这一部分知识点补上来,发现其实Rxjava入门还是听容易的。

定义

本质很简单: 异步(库)

特点

逻辑的简洁,代码易读,而非代码量少;

链式结构,可以多次请求,可以直接将前一次服务器返回结果作为作为下一次请求的参数,经过subscribe发送出去;也可以通过observeon()设置数据回调执行的线程。

AsyncTask 、 Handler都是异步操作方法,但是Rxjava与它们相比,优势在于随着程序逻辑变得越来越复杂,它依然能够保持简洁。

API 介绍和原理简析

Rxjava的异步实现,是通过一种扩展的观察者模式来实现的。

观察者模式

观察者对被观察者的某种高度敏感,当被观察者发生改变的时候,观察者就会观察到这样的变化,并且做出相应的响应。

1. Rxjava的观察者模式

基本概念:可观察者,即被观察者(数据发射源):Observable、 观察者:Observer(同subscriber可互转) 、 订阅(即注册):subscribe ()、事件。Observable 和 Observer 通过 subscribe() 方法实现订阅关系(注册),从而 Observable 可以在需要的时候发出事件来通知 Observer。

订阅者的回调方法有:

*  onNext(T item):普通回调方法,Observable调用这个方法发射数据,方法的参数就是       Observable发射的数据,这个方法可被调用多次;

*  onCompleted(): 正常终止,如果没有遇到错误,Observable在最后一次调用onNext之后调用此方法。

*  onError():当Observable遇到错误或者无法返回期望的数据时会调用这个方法,这个调用会终止Observable,后续不会再调用onNext和onCompleted,onError方法的参数是抛出的异常。

在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个,如图1

图1

2. 基本实现:

* Observer(Subscriber):观察者(数据接收方)

它决定事件触发时将有怎样的行为,Observer是Rxjava的常用接口,同时RxJava 还内置了一个实现了 Observer 的抽象类:Subscriber。 Subscriber 对 Observer 接口进行了一些扩展,在 RxJava 的 subscribe 过程中,Observer 也总是会先被转换成一个 Subscriber 再使用,所以只要实现基本功能选择 Observer 和 Subscriber 效果是完全一样的,他们的区别是:

onStart(): 这是 Subscriber 增加的方法。它会在 subscribe 刚开始,而事件还未发送之前被调用,可以用于做一些准备工作,例如数据的清零或重置工作。

unsubscribe(): 这是 Subscriber 所实现的另一个接口 Subscription 的方法,用于取消订阅。在这个方法被调用后,Subscriber 将不再接收事件。

创建一个接收数据的行为:

图2   接受数据

* Observable:被观察者(发射数据方)

它决定什么时候触发事件以及触发怎样的事件。

使用create创建一个Observable对象:

图3   发射数据

* Subscribe :订阅

创建了 Observable 和 Observer 之后,再用 subscribe() 方法将它们联结起来,就可以工作了。代码形式很简单:observable.subscribe(observer);

**  扩展内容

Subscription:Observable调用subscribe( )方法返回的对象,同样有unsubscribe( )方法,可以用来取消订阅事件

Action0:RxJava中的一个接口,它只有一个无参call()方法,且无返回值,同样还有Action1,Action2...Action9等,Action1封装了含有1个参的call()方法,即call(T t),Action2封装了含有2个参数的call方法,即call(T1 t1,T2 t2),以此类推;

Func0:与Action0非常相似,也有call()方法,但是它是有返回值的,同样也有Func0、Func1...Func9;

**  基本用法

just( ):将为你创建一个Observable并自动为你调用onNext( )发射数据,见图4:

图4   just()用法

from( ):遍历集合,接收一至九个参数,发送每个item

图5   from()用法

** 变换操作

flatmap():flatMap()接收一个Observable的输出(files中的每一个Observable对象)作为输入,然后作为一个新的Observable再发射。

其中,file为文件数组

图6   flatmap()用法

filter():观测序列中只有通过的数据才会被发射

图7  filter()用法

** 操作符

就是为了解决对Observable对象的变换的问题,用于在Observable和最终的Subscriber之间修改Observable发出的事件。

map( ) :就是用来把一个事件(原始Observable对象)转换为另一个事件的。

图8  map()  方法

3. 线程控制(Scheduler)

在不指定线程的情况下, RxJava 遵循的是线程不变的原则,即:在哪个线程调用 subscribe(),就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。如果需要切换线程,就需要用到 Scheduler (调度器)。

1)Scheduler在RxJava 中,Scheduler ——调度器,相当于线程控制器,RxJava 通过它来指定每一段代码应该运行在什么样的线程。RxJava 已经内置了几个 Scheduler ,它们已经适合大多数的使用场景:

Schedulers.immediate(): 这个调度器允许你立即在当前线程执行你指定的工作。它是timeout(),timeInterval(),以及timestamp()方法默认的调度器。

Schedulers.newThread(): 总是未任务启用新线程,并在新线程执行操作。

Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程。

Schedulers.computation(): 这个是计算工作默认的调度器,它与I/O操作无关。它也是许多RxJava方法的默认调度器:buffer(),debounce(),delay(),interval(),sample(),skip()。

Schedulers.trampoline( ):当我们想在当前线程执行一个任务时,并不是立即,我们可以用.trampoline()将它入队。这个调度器将会处理它的队列并且按序运行队列中每一个任务。它是repeat()和retry()方法默认的调度器。

AndroidSchedulers.mainThread():Android 专用,它指定的操作将在 Android 主线程运行。

有了这几个 Scheduler ,就可以使用 subscribeOn() 和 observeOn() 两个方法来对线程进行控制了。

* subscribeOn(): 指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程。

* observeOn(): 指定 Subscriber 所运行在的线程。也叫做事件消费的线程

2)doOnSubscribe()

在前面讲 Subscriber 的时候,提到过 Subscriber 的 onStart() 可以用作流程开始前的初始化。然而 onStart() 由于在 subscribe() 发生时就被调用了,如图9,因此不能指定线程,而是只能执行在 subscribe() 被调用时的线程。这就导致如果 onStart() 中含有对线程有要求的代码,将会有线程非法的风险,因为有时你无法预测 subscribe() 将会在什么线程执行。

图9  

而与 Subscriber.onStart() 相对应的,有一个方法 Observable.doOnSubscribe() ,他们保持一致的执行线程。它和 Subscriber.onStart() 同样是在 subscribe() 调用后而且在事件发送前执行,但区别在于它可以指定线程。默认情况下, doOnSubscribe() 执行在 subscribe() 发生的线程;而如果在 doOnSubscribe() 之后有 subscribeOn() 的话,它将执行在离它最近的 subscribeOn() 所指定的线程。

如上,在 doOnSubscribe()的后面跟一个 subscribeOn() ,确保了Subscriber.onStart()与事件发生(被激活)执行在同一线程。

图10

RxJava 与Retrofit结合(简单介绍,后面详细介绍)

首先在模块gradle中添加依赖:

图11

Retrofit 把请求封装进 Observable

定义的Api为:

图12

使用实例:在请求结束后调用 onNext() 返回网络返回的MyAttentionListEntity数据类型

图13

总结:本文只是针对于android开发简单的介绍了一下Rxjava的使用方法,总之Rxjava功能很强大,Observable可以是一个数据库查询、网络请求,Subscriber用来显示查询结果或者是请求结果。

参考:http://blog.csdn.net/lzyzsd/article/details/41833541/

http://gank.io/post/560e15be2dca930e00da1083#toc_4

相关文章

网友评论

      本文标题:Rxjava使用

      本文链接:https://www.haomeiwen.com/subject/qsgdxxtx.html