美文网首页我爱编程
RxJava(RxKotlin)、RxAndroid 简单使用

RxJava(RxKotlin)、RxAndroid 简单使用

作者: ChenME | 来源:发表于2018-08-08 14:03 被阅读1293次

    RxJava(RxKotlin)、RxAndroid

    ps:文章中涉及到的代码均使用 Kotlin 实现,即需要导入 RxKotlin,同时也涉及到了 RxAndroid 相关内容

    导入方法:

    1. 在项目的 build.gradle 文件中添加 RxKotlin 的版本信息
    buildscript {
        ext.rx_kotlin_version = '1.0.0'
        ext.rx_android_version = '1.2.1'
    }
    
    1. 在 module 的 build.gradle 文件中添加 RxKotlin 以及 RxAndroid 的依赖
    dependencies {
        // RxKotlin RxAndroid
        implementation "io.reactivex:rxkotlin:$rx_kotlin_version"
        implementation "io.reactivex:rxandroid:$rx_android_version"
    }
    

    1. 一些常用的网站

    1. RxJava文档
    2. RxJava中文文档
    3. RxJava经典资料

    2. 观察者模式的四大要素

    1. Observable 被观察者
    2. Observer 观察者
    3. subscribe 订阅
    4. 事件

    3. 操作符

    3.1 Creating 操作符

    create
    just
    from
    range
    repeat
    interval
    defer
    empty / never
    timer
    start
    
    • create 操作符,直接创建一个 Subscriber 对象
    Observable.create<String> {
        it.onNext("Hello Rx!")
        it.onCompleted()
    }.subscribe(object : Subscriber<String>() {
        override fun onNext(t: String) {
            println("onNext() --> $t")
        }
        override fun onCompleted() {
            println("onCompleted()")
        }
        override fun onError(e: Throwable?) {
            println("onError()")
        }
    })
    
    onNext() --> Hello Rx!
    onCompleted()
    
    • just 操作符将一系列对象逐个发射出去,注意集合对象将作为一个整体进行发射
    Observable.just(1, 1.0, "String", true)
            .subscribe(object : Subscriber<Any>() {
                override fun onNext(t: Any) {
                    println("onNext() --> $t")
                }
                override fun onCompleted() {
                    println("onCompleted()")
                }
                override fun onError(e: Throwable?) {
                    println("onError()")
                }
            })
    
    Observable.just(listOf(1, 2, 3, 4, 5))
            .subscribe(object : Subscriber<List<Int>>() {
                override fun onNext(t: List<Int>) {
                    t.forEach { println("onNext() --> $it") }
                }
                override fun onCompleted() {
                    println("onCompleted()")
                }
                override fun onError(e: Throwable?) {
                    println("onError()")
                }
            })
    
    onNext() --> 1
    onNext() --> 1.0
    onNext() --> String
    onNext() --> true
    onCompleted()
    onNext() --> 1
    onNext() --> 2
    onNext() --> 3
    onNext() --> 4
    onNext() --> 5
    onCompleted()
    
    • from 操作符可以将集合中的元素逐个发射出去
    Observable.from(listOf(5, 4, 3, 2, 1, 0))
            .subscribe(object : Subscriber<Int>() {
                override fun onNext(t: Int) {
                    println("onNext() --> $t")
                }
                override fun onCompleted() {
                    println("onCompleted()")
                }
                override fun onError(e: Throwable?) {
                    println("onError()")
                }
            })
    
    onNext() --> 5
    onNext() --> 4
    onNext() --> 3
    onNext() --> 2
    onNext() --> 1
    onNext() --> 0
    onCompleted()
    
    • range 在一定范围内向观察者发射整型数据,repeat 重复发射,默认重复无数次
    Observable.range(1, 3)
            .repeat(2)
            .subscribe(object : Subscriber<Int>() {
                override fun onNext(t: Int) {
                    println("onNext() --> $t")
                }
                override fun onCompleted() {
                    println("onCompleted()")
                }
                override fun onError(e: Throwable?) {
                    println("onError()")
                }
            })
    
    onNext() --> 1
    onNext() --> 2
    onNext() --> 3
    onNext() --> 1
    onNext() --> 2
    onNext() --> 3
    onCompleted()
    
    • interval 定时向观察者发送一个 Long 类型的数字(逐个叠加)
    Observable.interval(2, 2, TimeUnit.SECONDS)
            .subscribe(object : Subscriber<Long>() {
                override fun onNext(t: Long) {
                    println("onNext() --> $t")
                }
                override fun onCompleted() {
                    println("onCompleted()")
                }
                override fun onError(e: Throwable?) {
                    println("onError()")
                }
            })
    
    onNext() --> 0
    onNext() --> 1
    onNext() --> 2
    onNext() --> 3
    ...
    
    • defer 延迟创建 Observable 对象,只有在调用 subscribe() 方法时,才会创建 Observable 对象
    var arg = "初始值"
    val observable = Observable.defer { Observable.just(arg) }
    arg = "再次赋值"
    observable.subscribe(object : Subscriber<String>() {
        override fun onNext(t: String) {
            println("onNext() --> $t")
        }
        override fun onCompleted() {
            println("onCompleted()")
        }
        override fun onError(e: Throwable?) {
            println("onError()")
        }
    })
    
    onNext() --> 再次赋值
    onCompleted()
    

    3.2 Transforming 操作符

    map
    flatMap
    groupBy
    buffer
    scan
    window
    
    • map
    Observable.just(123, 234).map {
           "¥ $it"
       }.subscribe(object : Subscriber<String>() {
           override fun onNext(t: String) {
               println("onNext() --> $t")
           }
           override fun onCompleted() {
               println("onCompleted()")
           }
           override fun onError(e: Throwable?) {
               println("onError()")
           }
       })
    
    onNext() --> ¥ 123
    onNext() --> ¥ 234
    onCompleted()
    
    • flatMap
    Observable.just(123, 234, 345)
            .flatMap {
                Observable.just("$ $it")
            }.subscribe(object : Subscriber<String>() {
                override fun onNext(t: String) {
                    println("onNext() --> $t")
                }
                override fun onCompleted() {
                    println("onCompleted()")
                }
                override fun onError(e: Throwable?) {
                    println("onError()")
                }
            })
    
    onNext() --> $ 123
    onNext() --> $ 234
    onNext() --> $ 345
    onCompleted()
    
    • groupBy
    Observable.just(1, 2, 3, 4, 5, 6)
            .groupBy { it % 2 }
            .subscribe(object : Observer<GroupedObservable<Int, Int>> {
                override fun onError(e: Throwable?) {
                }
                override fun onNext(t: GroupedObservable<Int, Int>) {
                    t.subscribe(object : Subscriber<Int>() {
                        override fun onNext(r: Int) {
                            println("group -> ${t.key}, value -> $r")
                        }
                        override fun onCompleted() {
                        }
                        override fun onError(e: Throwable?) {
                        }
                    })
                }
                override fun onCompleted() {
                }
            })
    
    group -> 1, value -> 1
    group -> 0, value -> 2
    group -> 1, value -> 3
    group -> 0, value -> 4
    group -> 1, value -> 5
    group -> 0, value -> 6
    
    • buffer
    Observable.range(0, 7)
            .buffer(3)
            .subscribe(object : Subscriber<List<Int>>() {
                override fun onNext(t: List<Int>) {
                    println(t)
                }
                override fun onCompleted() {
                }
                override fun onError(e: Throwable?) {
                }
            })
    
    [0, 1, 2]
    [3, 4, 5]
    [6]
    
    • scan
    Observable.range(1, 5)
            .scan { sum, num -> sum + num }
            .subscribe(object : Subscriber<Int>() {
                override fun onNext(t: Int) {
                    println("sum = $t")
                }
                override fun onCompleted() {
                }
                override fun onError(e: Throwable?) {
                }
            })
    
    sum = 1
    sum = 3
    sum = 6
    sum = 10
    sum = 15
    

    3.3 Filtering 操作符

    debounce // 在一定时间间隔内没有操作,数据才会发射给观察者
    distinct // 去重
    elementAt // 取指定位置的一个数据
    filter // 按照指定的规则进行条件的过滤
    first // 取第一个数据
    last // 取最后一个数据
    ignoreElements // 忽略所有数据,不向观察者发送任何数据,只回调 onCompleted() 或 onError()
    sample // 取样
    skip //  跳过
    skipLast // 跳过最后几项
    take
    takeLast
    
    • debounce
    Observable.create<Int> {
        for (i in 1..10) {
            try {
                it.onNext(i)
                if (i % 2 == 0) {
                    Thread.sleep(1000)
                } else {
                    Thread.sleep(2000)
                }
            } catch (e: Exception) {
                it.onError(e)
            }
        }
        it.onCompleted()
    }
            .subscribeOn(Schedulers.io())
            .debounce(2, TimeUnit.SECONDS)
            .subscribe(object : Subscriber<Int>() {
                override fun onNext(t: Int) {
                    println("onNext() --> $t")
                }
                override fun onCompleted() {
                    println("onCompleted()")
                }
                override fun onError(e: Throwable?) {
                    println("onError()")
                }
            })
    
    onNext() --> 1
    onNext() --> 3
    onNext() --> 5
    onNext() --> 7
    onNext() --> 9
    onNext() --> 10
    onCompleted()
    
    • distinct
    Observable.just(1, 2, 3, 4, 2, 3)
            .distinct()
            .subscribe(object : Subscriber<Int>() {
                override fun onNext(t: Int) {
                    println("onNext() --> $t")
                }
                override fun onCompleted() {
                    println("onCompleted()")
                }
                override fun onError(e: Throwable?) {
                    println("onError()")
                }
            })
    
    onNext() --> 1
    onNext() --> 2
    onNext() --> 3
    onNext() --> 4
    onCompleted()
    

    3.4 Combining 操作符(组合)

    zip
    merge
    startWith
    combineLatest
    join
    switchOnNext
    
    • zip 用来合并两个 Observable 发射的数据项,根据 Func2() 函数指定的规则生成一个新的 Observable 并发射出去,当其中一个 Observable 发射数据结束或者出现异常后,另一个 Observable 也将停止发射数据
    val observable1 = Observable.just(10, 20, 30)
    val observable2 = Observable.just(1, 2, 3, 4)
    Observable.zip(observable1, observable2) { o1, o2 ->
        o1 + o2
    }.subscribe(object : Subscriber<Int>() {
        override fun onNext(t: Int) {
            println("onNext() --> $t")
        }
        override fun onCompleted() {
            println("onCompleted()")
        }
        override fun onError(e: Throwable?) {
            println("onError()")
        }
    })
    
    onNext() --> 11
    onNext() --> 22
    onNext() --> 33
    onCompleted()
    
    • merge 将两个 Observable 发射的数据项按照发射时间顺序合并成一个 Observable 进行发射
    val observable1 = Observable.just(10, 20, 30)
    val observable2 = Observable.just(1, 2, 3, 4)
    Observable.merge(observable1, observable2)
            .subscribe(object : Subscriber<Int>() {
                override fun onNext(t: Int) {
                    println("onNext() --> $t")
                }
                override fun onCompleted() {
                    println("onCompleted()")
                }
                override fun onError(e: Throwable?) {
                    println("onError()")
                }
            })
    
    onNext() --> 10
    onNext() --> 20
    onNext() --> 30
    onNext() --> 1
    onNext() --> 2
    onNext() --> 3
    onNext() --> 4
    onCompleted()
    
    • startWith 用于在一个 Observable 发射数据前插入一个 Observable
    val observable1 = Observable.just(10, 20, 30)
    val observable2 = Observable.just(1, 2, 3, 4)
    observable1.startWith(observable2)
            .subscribe(object : Subscriber<Int>() {
                override fun onNext(t: Int) {
                    println("onNext() --> $t")
                }
                override fun onCompleted() {
                    println("onCompleted()")
                }
                override fun onError(e: Throwable?) {
                    println("onError()")
                }
            })
    
    onNext() --> 1
    onNext() --> 2
    onNext() --> 3
    onNext() --> 4
    onNext() --> 10
    onNext() --> 20
    onNext() --> 30
    onCompleted()
    
    • combineLatest 用于将两个 Observable 发射的临近的数据项通过 Func2() 函数指定的规则组合成一个新的 Observable
    val observable1 = Observable.just(10, 20, 30)
    val observable2 = Observable.just(1, 2, 3, 4)
    Observable.combineLatest(observable1, observable2) { o1, o2 ->
        o1 + o2
    }.subscribe(object : Subscriber<Int>() {
        override fun onNext(t: Int) {
            println("onNext() --> $t")
        }
        override fun onCompleted() {
            println("onCompleted()")
        }
        override fun onError(e: Throwable?) {
            println("onError()")
        }
    
    onNext() --> 31
    onNext() --> 32
    onNext() --> 33
    onNext() --> 34
    onCompleted()
    
    • join
    val observable1 = Observable.just(10, 20, 30)
    val observable2 = Observable.just(1, 2, 3, 4)
    observable1.join(observable2,
            { Observable.timer(2, TimeUnit.SECONDS) },
            { Observable.timer(0, TimeUnit.SECONDS) },
            { t1, t2 -> Observable.just(t1 + t2) })
            .subscribe(object : Subscriber<Observable<Int>>() {
                override fun onNext(t: Observable<Int>) {
                    t.subscribe(object : Subscriber<Int>() {
                        override fun onNext(data: Int) {
                            println("onNext() --> $data")
                        }
                        override fun onCompleted() {
                            println("onCompleted()")
                        }
                        override fun onError(e: Throwable?) {
                            println("onError()")
                        }
                    })
                }
                override fun onCompleted() {
                    println("onCompleted()")
                }
                override fun onError(e: Throwable?) {
                    println("onError()")
                }
            })
    
    onNext() --> 11
    onCompleted()
    onNext() --> 21
    onCompleted()
    onNext() --> 31
    onCompleted()
    onNext() --> 12
    onCompleted()
    onNext() --> 22
    onCompleted()
    onNext() --> 32
    onCompleted()
    onNext() --> 13
    onCompleted()
    onNext() --> 23
    onCompleted()
    onNext() --> 33
    onCompleted()
    onNext() --> 14
    onCompleted()
    onNext() --> 24
    onCompleted()
    onNext() --> 34
    onCompleted()
    onCompleted()
    

    4. 线程调度(结合 RxAndroid)

    1. Schedulers.io() I/O 线程,执行耗时操作
    2. AndroidSchedulers.mainThread() Android 中的UI线程,执行UI更新
    3. subscribeOn() 调度被观察者运行的线程
    4. observeOn() 调度观察者运行的线程

    相关文章

      网友评论

        本文标题:RxJava(RxKotlin)、RxAndroid 简单使用

        本文链接:https://www.haomeiwen.com/subject/ekztbftx.html