美文网首页
RxJava 操作符第一波

RxJava 操作符第一波

作者: 三季人 | 来源:发表于2017-08-16 11:59 被阅读169次

    上一篇文章RxJava造轮子初步的了解了rxjava的简单原理,更深入的可能还是要去查看源码了,后续会再研究。这个阶段的目标是了解使用常用的操作符。

    创建操作符

    create

    含义:使用一个函数从头开始创建observable
    实现:

    • create()函数中传入一个接受observer观察者的函数Observable.Onsubscrib()

    • Observable.Onsubscriber()函数中只有一个call(Subscriber)方法

    • 在call()方法中调用subscriber.onNext()、onError()、onComplete()

        /**
                 * 创建一个observable
                 * 注意: 在create中函数调用发送消息时候,检查,是否有观察者,没有不发送消息,减少资源消耗
                 * observer.isUnSubscribed()
                 */
                fun create() {
                    Observable.create(Observable.OnSubscribe<String> { t ->
                        if (!(t?.isUnsubscribed ?: true)) {
                            try {
                                for (i in 1..10) {
                                    t?.onNext("" + i)
                                }
                                t?.onCompleted()
                            } catch (e: Exception) {
                                e.printStackTrace()
                                t?.onError(e)
                            }
      
                        }
                    })
                            .subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
                }
      
    from

    含义: 将数组或者iterable、future转化为一个observable
    实现: 产生的observable将iterable、数组中的每个item数据发送出去

        /**
                 * from 将数组或者对象,生成新的observable发送出去
                 */
                fun from(){
                    Observable.from(arrayOf(1,2,3,4,5))
                            .subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
    
                }
    
    interval

    含义:间接或者无限期的发送数据

        /**
                 * Interval:
                 *      固定时间发送数据
                 *      初始值为0
                 *      无线递增发送数据
                 */
                fun interval(){
                    Observable.interval(1,TimeUnit.SECONDS)
                            .subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
                }
    
    just

    含义:发送单个或者多个对象(对多发送10个对象)

        /**
                 * just
                 *      发送单个对象
                 *      参数可选,1-10
                 *      按照参数列表发送数据
                 */
                fun just(){
                    Observable.just(1,2,3,4,5,6,7,8,9,10)
                            .subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
                }
    
    range

    含义: 发送指定范围内数据的observable
    如果找不到需要发送的数据,如,起始位负数,发送数据个数不足,抛出异常

        /**
                 * range: 发送整数范围内的有序序列
                 *      第一个参数:整数的起始数
                 *      第二个参数:一共要发送几个数据
                 */
                fun range(){
                    Observable.range(3,3)
                            .subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
                }
    
    repeat

    含义:重复发送observable中数据

        /**
                 * repeat
                 *      重复发送源数据源,重复次数可设置
                 *      repeat() 表示无限循环
                 *      repeat(5):表示循环5次
                 *
                 * 其他循环操作符
                 *      满足条件循环
                 *      repeatWhen()
                 *      doWhile()
                 *      whileDo()
                 */
                fun repeat(){
                    Observable.just(1)
                            .repeat(5)
                            .subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
                }
    
    timer

    含义:在一段时间之后发送数据

        /**
                 * timer
                 *      在一定延时后发送一条数据
                 */
                fun timer(){
                    Observable.timer(1,TimeUnit.SECONDS)
                            .subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
                }
    

    变换操作符

    buffer

    含义: 可以将一段时间或者将count个数据打包发送
    : 可以实现backpress 背压操作,将快速产生很多数据,缓存打包发送

        /**
                 * buffer
                 *      buffer(3)  三个为一体,打包发送
                 *      buffer(3,4)
                 *              第一个参数为count:几个数据作为一个打包
                 *              第二个参数为跳跃:跳跃第一个值
                 */
                fun buffer() {
                    Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
        //                    .buffer(3)
                            .buffer(3, 4)
                            .subscribe(Action1<List<Int>> {
                                integers ->
                                integers.forEach {
                                    integer ->
                                    Log.e(TAG, "" + integer)
                                }
    
                                Log.e(TAG, "------------------------------------")
    
                            }, RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
                }
    
    flatMap

    含义:
    将observable发送的每一项数据进行变换操作,转化为多个observables,再使用merge()将所有observables合并
    因为是merge()合并发送,所以发送的顺序不是有序的

        /**
                 * flatMap
                 *      * 合并所有产生的observables,产生的自己数据列不能保证顺序
                 *      1.通过一个指定的函数将源数据源变化为其他数据
                 *      2.新建一个observable发送变化后的数据源
                 *      3.merge合并所有产生的observable->放入新的observable一起发送出去
                 *      4.发送的顺序是无序的
                 * flatMap(function1,maxCount)
                 *      1.第二个参数:从源数据最大同时订阅个数,当达到最大限制,会等待其中一个终止在订阅
                 */
                fun flatMap() {
                    Observable.from(arrayOf(1, 2, 3, 4, 5, 6, 7, 8))
        //                    .flatMap { t -> Observable.just("flatmap-change==="+t) }
                            .flatMap({
                                t ->
                                var list = arrayListOf(1, 2, 3)
                                Observable.from(list).delay(100, TimeUnit.MILLISECONDS)
                            }, 3)
                            .subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
                }
    
    concatMap

    含义:和flatMap类似,都是将原始observable发送的每一项数据进行转化,不同的是,concatMap是按照顺序连接每一个发送数据

    • 就是当前一个数据源结束之后接着下一个事件的发送

        /**
                 * concatMap
                 *      * 按照次序连接生成的observables,然后产生自己的数据列
                 *      1.和flatMap操作符类似
                 *      2.不同的是,严格按照源数据的顺序发送数据源
                 */
                fun concatMap() {
                    Observable.from(arrayOf(1, 2, 3, 4, 5, 6, 7, 8))
        //                    .flatMap { t -> Observable.just("flatmap-change==="+t) }
                            .concatMap({
                                t ->
                                var list = arrayListOf(t, t, t)
                                Observable.from(list).delay(100, TimeUnit.MILLISECONDS)
                            })
                            .subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
                }
      
    switchMap

    含义: 只按最后发送过来的事件为准,永远只监听最后一个事件

         /**
                 * switchMap
                 *      只监听当前的数据
                 */
                fun switchMap() {
                    Observable.from(arrayOf(1, 2, 3, 4, 5, 6, 7, 8))
                            .switchMap({
                                t ->
                                var list = arrayListOf(t, t, t)
                                Observable.from(list).delay(100, TimeUnit.MILLISECONDS)
                            })
                            .subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
                }
    
    map

    含义: 将observable中发送的源数据,转化为另一种数据

        /**
                 * map
                 *      1.根据你指定的函数将源数据源转化为另一种类型
                 */
                fun map() {
                    Observable.from(arrayOf(1, 2, 3, 4, 5, 6, 7, 8))
                            .map {
                                number ->
                                "" + number
                            }
                            .subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
                }
    

    辅助操作符

    delay

    含义: 1.第一种是对整个observable的延时,在一段时间之后再发送数据
    2.第二章是对observable中的每一项数据发送之前延时

        /**
                 * delay
                 *      延时一段指定的时间,再发送observable数据
                 *      * 整体发射时间延长
                 * delay(observable)
                 *      *: 发射的每一项都会延时
                 *      *: 每一项数据都默认使用这个bservable的定时器
                 *
                 * delaySubscription(long,timeunit)
                 *      *: 延时订阅原始的observable
                 *      *: 整体的延时订阅
                 *
                 */
                fun delay() {
                    Observable.just(1, 2, 3)
        //                    .delay(1,TimeUnit.SECONDS)
        //                    .delay { t ->
        //                        Observable.create<Int> {
        //                            subscriber->
        //                            Thread.sleep(1000)
        //                            subscriber.onNext(t)
        //                            subscriber.onCompleted()
        //                        }
        //                    }
        //                    .delaySubscription(1,TimeUnit.SECONDS)
                            .delaySubscription(object : Func0<Observable<Int>> {
                                override fun call(): Observable<Int> {
                                    return Observable.create<Int> {
                                        subscriber ->
                                        Thread.sleep(1000)
                                        subscriber.onNext(1)
                                        subscriber.onCompleted()
                                    }
                                }
                            })
                            .subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
                }
    
    timestamp

    含义:为observable中每一个数据源包装一个时间戳,返回Timestamped<T>类型
    其中t.timestampMillis获取发送这条数据的时间戳
    t.value获取原始发送数据

        /**
                 * timestamp
                 */
                fun timestamp(){
                    Observable.from(arrayOf(1, 2, 3, 4, 5, 6, 7, 8))
                            .timestamp()
                            .subscribe(object :Action1<Timestamped<Int>>{
                                override fun call(t: Timestamped<Int>?) {
                                    Log.e(RxUtil.TAG,""+t?.timestampMillis+"value-"+t?.value)
                                }
    
                            })
                }
    
    doEtch生命周期

    含义: observable的整个生命周期,在发送之前调用一下事件

        /**
                 * doEatch
                 *      *: 在observable的对于生命周期之前的时候调用对应代码
                 *  doOnNext:在subscriber->onNext之前调用
                 *  doOnError:在onError->之前调用
                 *  doOnCompleted: 在onComplete->之前调用
                 *  doOnTerminate: observable终止的时候调用(无论是否正常终止)
                 */
                fun doEatch() {
                    Observable.from(arrayOf(1, 2, 3, 4, 5, 6, 7, 8))
        //                    .doOnNext { Log.e(RxUtil.TAG,"doOnNext-onNext") }
        //                    .doOnTerminate{Log.e(RxUtil.TAG,"doOnTerminate-doOnTerminate")}
        //                    .finallyDo{Log.e(RxUtil.TAG,"finallyDo-finallyDo")}
                            .doOnEach(object : Observer<Int> {
                                override fun onNext(t: Int?) {
                                    Log.e(RxUtil.TAG, "doEatch-onNext")
                                }
    
                                override fun onError(e: Throwable?) {
                                    Log.e(RxUtil.TAG, "doEatch-onError")
                                }
    
                                override fun onCompleted() {
                                    Log.e(RxUtil.TAG, "doEatch-onCompleted")
                                }
                            })
                            .subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
                }
    
    timeout

    含义: 在一段时间之后没有发送数据,有两种处理
    1. 直接抛出异常
    2. 运行默认的observable数据
    可以结合onErrorReturn进行错误处理

        /**
                 * timeout
                 *      *: 超过一段时间没有发送数据,抛出异常
                 *  timeout(long,timeunit,observable)
                 *      *: 超过一段时间,执行默认的observable
                 */
                fun timeout() {
                    Observable.create<Int> {
                        subscriber ->
                        Thread.sleep(2000)
                        subscriber.onNext(1)
                        subscriber.onCompleted()
                    }
        //                    .timeout(1, TimeUnit.SECONDS)
                            .timeout(1, TimeUnit.SECONDS, Observable.create {
                                subscriber ->
                                subscriber.onNext(-1)
                                subscriber.onCompleted()
                            })
                            .subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
                }
    
    toList

    含义:将observable发送的所有数据结果包装一个list集合中,一起发出去
    toSortedList()可以对生成的数据进行排序
    toSortedList(func2())这个自定义实现排序还有问题

        /**
                 * toList
                 *      *: 让observable将多项数据组合成一个list数据返回
                 *  toSortedList
                 *      *: 可以排序,默认自然顺序
                 */
                fun toList(){
                    Observable.from(arrayOf(1, 3, 2, 5, 4, 8, 7, 6))
        //                    .flatMap { t -> Observable.just("flatmap-change==="+t) }
                            .concatMap({
                                t ->
                                var list = arrayListOf(3, 1, 2)
                                Observable.from(list)
                            })
        //                    .toList()
                            .toSortedList()
        //                    .toSortedList(object :Func2<Int,Int,Int>{
        //                        override fun call(t1: Int?, t2: Int?): Int {
        //                            return t2?.toInt()?:0
        //                        }
        //                    })
                            .subscribe(object :Action1<List<Int>>{
                                override fun call(t: List<Int>?) {
                                    t?.forEach {
                                        Log.e(RxUtil.TAG,"toList-"+t)
                                    }
                                }
    
                            }, Action1<Throwable> {
                                error->
                                Log.e(RxUtil.TAG,"toList-"+error.toString())
                            })
                }
    
    toMap

    含义: 将observable中所有数据结果合并到map中,一起发送出去

        /**
                 * toMap
                 *      *: 将原始所有数据合并到map中,发送这个map
                 *
                 */
                fun toMap(){
                    Observable.from(arrayOf(1, 2, 3, 4, 5, 6, 7, 8))
                            .toMap(object :Func1<Int,Int>{
                                override fun call(t: Int): Int {
                                    return 10*t
                                }
                            },object :Func1<Int,String>{
                                override fun call(t: Int?): String {
                                    return ""+t
                                }
    
                            })
                            .subscribe(object :Action1<Map<Int,String>>{
                                override fun call(map: Map<Int,String>?) {
                                    map?.forEach {
                                        (key,value)->
                                        Log.e(RxUtil.TAG, "toMap-key-$key-value-$value")
                                    }
                                }
    
                            }, Action1<Throwable> {
                                error->
                                Log.e(RxUtil.TAG,"toMap-"+error.toString())
                            })
                }
    

    本期的操作符暂时这么多啊,后面还有第二波哦

    附上github地址

    相关文章

      网友评论

          本文标题:RxJava 操作符第一波

          本文链接:https://www.haomeiwen.com/subject/bdzbrxtx.html