上一篇文章RxJava造轮子初步的了解了rxjava的简单原理,更深入的可能还是要去查看源码了,后续会再研究。这个阶段的目标是了解使用常用的操作符。
创建操作符
create
含义:使用一个函数从头开始创建observable
实现:
-
create()函数中传入一个接受observer观察者的函数Observable.Onsubscrib()
-
Observable.Onsubscriber()函数中只有一个call(Subscriber)方法
-
在call()方法中调用subscriber.onNext()、onError()、onComplete()
/** * 创建一个observable * 注意: 在create中函数调用发送消息时候,检查,是否有观察者,没有不发送消息,减少资源消耗 * observer.isUnSubscribed() */ fun create() { Observable.create(Observable.OnSubscribe<String> { t -> if (!(t?.isUnsubscribed ?: true)) { try { for (i in 1..10) { t?.onNext("" + i) } t?.onCompleted() } catch (e: Exception) { e.printStackTrace() t?.onError(e) } } }) .subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0()) }
from
含义: 将数组或者iterable、future转化为一个observable
实现: 产生的observable将iterable、数组中的每个item数据发送出去
/**
* from 将数组或者对象,生成新的observable发送出去
*/
fun from(){
Observable.from(arrayOf(1,2,3,4,5))
.subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
}
interval
含义:间接或者无限期的发送数据
/**
* Interval:
* 固定时间发送数据
* 初始值为0
* 无线递增发送数据
*/
fun interval(){
Observable.interval(1,TimeUnit.SECONDS)
.subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
}
just
含义:发送单个或者多个对象(对多发送10个对象)
/**
* just
* 发送单个对象
* 参数可选,1-10
* 按照参数列表发送数据
*/
fun just(){
Observable.just(1,2,3,4,5,6,7,8,9,10)
.subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
}
range
含义: 发送指定范围内数据的observable
如果找不到需要发送的数据,如,起始位负数,发送数据个数不足,抛出异常
/**
* range: 发送整数范围内的有序序列
* 第一个参数:整数的起始数
* 第二个参数:一共要发送几个数据
*/
fun range(){
Observable.range(3,3)
.subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
}
repeat
含义:重复发送observable中数据
/**
* repeat
* 重复发送源数据源,重复次数可设置
* repeat() 表示无限循环
* repeat(5):表示循环5次
*
* 其他循环操作符
* 满足条件循环
* repeatWhen()
* doWhile()
* whileDo()
*/
fun repeat(){
Observable.just(1)
.repeat(5)
.subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
}
timer
含义:在一段时间之后发送数据
/**
* timer
* 在一定延时后发送一条数据
*/
fun timer(){
Observable.timer(1,TimeUnit.SECONDS)
.subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
}
变换操作符
buffer
含义: 可以将一段时间或者将count个数据打包发送
: 可以实现backpress 背压操作,将快速产生很多数据,缓存打包发送
/**
* buffer
* buffer(3) 三个为一体,打包发送
* buffer(3,4)
* 第一个参数为count:几个数据作为一个打包
* 第二个参数为跳跃:跳跃第一个值
*/
fun buffer() {
Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
// .buffer(3)
.buffer(3, 4)
.subscribe(Action1<List<Int>> {
integers ->
integers.forEach {
integer ->
Log.e(TAG, "" + integer)
}
Log.e(TAG, "------------------------------------")
}, RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
}
flatMap
含义:
将observable发送的每一项数据进行变换操作,转化为多个observables,再使用merge()将所有observables合并
因为是merge()合并发送,所以发送的顺序不是有序的
/**
* flatMap
* * 合并所有产生的observables,产生的自己数据列不能保证顺序
* 1.通过一个指定的函数将源数据源变化为其他数据
* 2.新建一个observable发送变化后的数据源
* 3.merge合并所有产生的observable->放入新的observable一起发送出去
* 4.发送的顺序是无序的
* flatMap(function1,maxCount)
* 1.第二个参数:从源数据最大同时订阅个数,当达到最大限制,会等待其中一个终止在订阅
*/
fun flatMap() {
Observable.from(arrayOf(1, 2, 3, 4, 5, 6, 7, 8))
// .flatMap { t -> Observable.just("flatmap-change==="+t) }
.flatMap({
t ->
var list = arrayListOf(1, 2, 3)
Observable.from(list).delay(100, TimeUnit.MILLISECONDS)
}, 3)
.subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
}
concatMap
含义:和flatMap类似,都是将原始observable发送的每一项数据进行转化,不同的是,concatMap是按照顺序连接每一个发送数据
-
就是当前一个数据源结束之后接着下一个事件的发送
/** * concatMap * * 按照次序连接生成的observables,然后产生自己的数据列 * 1.和flatMap操作符类似 * 2.不同的是,严格按照源数据的顺序发送数据源 */ fun concatMap() { Observable.from(arrayOf(1, 2, 3, 4, 5, 6, 7, 8)) // .flatMap { t -> Observable.just("flatmap-change==="+t) } .concatMap({ t -> var list = arrayListOf(t, t, t) Observable.from(list).delay(100, TimeUnit.MILLISECONDS) }) .subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0()) }
switchMap
含义: 只按最后发送过来的事件为准,永远只监听最后一个事件
/**
* switchMap
* 只监听当前的数据
*/
fun switchMap() {
Observable.from(arrayOf(1, 2, 3, 4, 5, 6, 7, 8))
.switchMap({
t ->
var list = arrayListOf(t, t, t)
Observable.from(list).delay(100, TimeUnit.MILLISECONDS)
})
.subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
}
map
含义: 将observable中发送的源数据,转化为另一种数据
/**
* map
* 1.根据你指定的函数将源数据源转化为另一种类型
*/
fun map() {
Observable.from(arrayOf(1, 2, 3, 4, 5, 6, 7, 8))
.map {
number ->
"" + number
}
.subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
}
辅助操作符
delay
含义: 1.第一种是对整个observable的延时,在一段时间之后再发送数据
2.第二章是对observable中的每一项数据发送之前延时
/**
* delay
* 延时一段指定的时间,再发送observable数据
* * 整体发射时间延长
* delay(observable)
* *: 发射的每一项都会延时
* *: 每一项数据都默认使用这个bservable的定时器
*
* delaySubscription(long,timeunit)
* *: 延时订阅原始的observable
* *: 整体的延时订阅
*
*/
fun delay() {
Observable.just(1, 2, 3)
// .delay(1,TimeUnit.SECONDS)
// .delay { t ->
// Observable.create<Int> {
// subscriber->
// Thread.sleep(1000)
// subscriber.onNext(t)
// subscriber.onCompleted()
// }
// }
// .delaySubscription(1,TimeUnit.SECONDS)
.delaySubscription(object : Func0<Observable<Int>> {
override fun call(): Observable<Int> {
return Observable.create<Int> {
subscriber ->
Thread.sleep(1000)
subscriber.onNext(1)
subscriber.onCompleted()
}
}
})
.subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
}
timestamp
含义:为observable中每一个数据源包装一个时间戳,返回Timestamped<T>类型
其中t.timestampMillis获取发送这条数据的时间戳
t.value获取原始发送数据
/**
* timestamp
*/
fun timestamp(){
Observable.from(arrayOf(1, 2, 3, 4, 5, 6, 7, 8))
.timestamp()
.subscribe(object :Action1<Timestamped<Int>>{
override fun call(t: Timestamped<Int>?) {
Log.e(RxUtil.TAG,""+t?.timestampMillis+"value-"+t?.value)
}
})
}
doEtch生命周期
含义: observable的整个生命周期,在发送之前调用一下事件
/**
* doEatch
* *: 在observable的对于生命周期之前的时候调用对应代码
* doOnNext:在subscriber->onNext之前调用
* doOnError:在onError->之前调用
* doOnCompleted: 在onComplete->之前调用
* doOnTerminate: observable终止的时候调用(无论是否正常终止)
*/
fun doEatch() {
Observable.from(arrayOf(1, 2, 3, 4, 5, 6, 7, 8))
// .doOnNext { Log.e(RxUtil.TAG,"doOnNext-onNext") }
// .doOnTerminate{Log.e(RxUtil.TAG,"doOnTerminate-doOnTerminate")}
// .finallyDo{Log.e(RxUtil.TAG,"finallyDo-finallyDo")}
.doOnEach(object : Observer<Int> {
override fun onNext(t: Int?) {
Log.e(RxUtil.TAG, "doEatch-onNext")
}
override fun onError(e: Throwable?) {
Log.e(RxUtil.TAG, "doEatch-onError")
}
override fun onCompleted() {
Log.e(RxUtil.TAG, "doEatch-onCompleted")
}
})
.subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
}
timeout
含义: 在一段时间之后没有发送数据,有两种处理
1. 直接抛出异常
2. 运行默认的observable数据
可以结合onErrorReturn进行错误处理
/**
* timeout
* *: 超过一段时间没有发送数据,抛出异常
* timeout(long,timeunit,observable)
* *: 超过一段时间,执行默认的observable
*/
fun timeout() {
Observable.create<Int> {
subscriber ->
Thread.sleep(2000)
subscriber.onNext(1)
subscriber.onCompleted()
}
// .timeout(1, TimeUnit.SECONDS)
.timeout(1, TimeUnit.SECONDS, Observable.create {
subscriber ->
subscriber.onNext(-1)
subscriber.onCompleted()
})
.subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
}
toList
含义:将observable发送的所有数据结果包装一个list集合中,一起发出去
toSortedList()可以对生成的数据进行排序
toSortedList(func2())这个自定义实现排序还有问题
/**
* toList
* *: 让observable将多项数据组合成一个list数据返回
* toSortedList
* *: 可以排序,默认自然顺序
*/
fun toList(){
Observable.from(arrayOf(1, 3, 2, 5, 4, 8, 7, 6))
// .flatMap { t -> Observable.just("flatmap-change==="+t) }
.concatMap({
t ->
var list = arrayListOf(3, 1, 2)
Observable.from(list)
})
// .toList()
.toSortedList()
// .toSortedList(object :Func2<Int,Int,Int>{
// override fun call(t1: Int?, t2: Int?): Int {
// return t2?.toInt()?:0
// }
// })
.subscribe(object :Action1<List<Int>>{
override fun call(t: List<Int>?) {
t?.forEach {
Log.e(RxUtil.TAG,"toList-"+t)
}
}
}, Action1<Throwable> {
error->
Log.e(RxUtil.TAG,"toList-"+error.toString())
})
}
toMap
含义: 将observable中所有数据结果合并到map中,一起发送出去
/**
* toMap
* *: 将原始所有数据合并到map中,发送这个map
*
*/
fun toMap(){
Observable.from(arrayOf(1, 2, 3, 4, 5, 6, 7, 8))
.toMap(object :Func1<Int,Int>{
override fun call(t: Int): Int {
return 10*t
}
},object :Func1<Int,String>{
override fun call(t: Int?): String {
return ""+t
}
})
.subscribe(object :Action1<Map<Int,String>>{
override fun call(map: Map<Int,String>?) {
map?.forEach {
(key,value)->
Log.e(RxUtil.TAG, "toMap-key-$key-value-$value")
}
}
}, Action1<Throwable> {
error->
Log.e(RxUtil.TAG,"toMap-"+error.toString())
})
}
本期的操作符暂时这么多啊,后面还有第二波哦
附上github地址
网友评论