我十分推荐在项目中使用RxJava 2,它通过使用可观察序列来编写异步程序和切换线程,提供了响应式的变成风格,相比android推出的handler和asynctask,RxJava 2可以让代码更加简洁明晰。
官方网站上对RxJava的定义是:
A library for composing asynchronous and event-based programs using observable sequences for the Java VM.
强调了“异步”,貌似跟多线程没有关系哦!的确是这样的,RxJava 2默认并不支持多线程。好,用代码说服你:
Observable.just(1, 6, 9)
.doOnNext(object : Consumer<Int> {
@Throws(Exception::class)
override fun accept(integer: Int?) {
Log.i(TAG, "Emitting item on: " + currentThread().name + ", value: $integer")
}
})
.map(object : Function<Int, Int> {
override fun apply(p0: Int): Int {
Log.i(TAG,"Processing item on: " + currentThread().name + ", value: $p0")
return p0!! * 2
}
})
.subscribeWith(object : DisposableObserver<Int>() {
override fun onNext(@NonNull integer: Int) {
Log.i(TAG,"Consuming item on: " + currentThread().name + ", value: $integer")
}
override fun onError(@NonNull e: Throwable) {}
override fun onComplete() {}
})
/*
Emitting item on: main, value: 1
Processing item on: main, value: 1
Consuming item on: main, value: 2
Emitting item on: main, value: 6
Processing item on: main, value: 6
Consuming item on: main, value: 12
Emitting item on: main, value: 9
Processing item on: main, value: 9
Consuming item on: main, value: 18
*/
根据注释里的输出结果可以推断,RxJava 2的操作是运行在当前的主线程的,是会被阻塞的。
你可能会问doOnNext()是作什么用的。它只是一个提供副作用的操作符,可以脱离observable链而执行一些不纯的操作。
小试牛刀:多线程
为了理解RxJava 2如何切换线程,你必须对RxJava 2的三个重要的操作符熟悉:Schedulers、observeOn和subscribeOn。
下面举一个多线程的例子:加入从网络上获取一个书籍Book的列表信息,并在UI线程先是出来,进了RxJava 2坑的同学很快都会写出下面的代码:
getBooks().subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(object : DisposableObserver<Int>() {
override fun onNext(@NonNull integer: Int) {
// You can access your Book objects here
}
override fun onError(@NonNull e: Throwable) {
// Handler errors here
}
override fun onComplete() {
// All your book objects have been fetched. Done!
}
})
有没有很简洁?getBooks()方法执行网络操作并返回一个Book的列表。网络操作是一个很耗时的操作,我们使用subscribeOn()方法使网络操作在Schedulers.io()线程中进行,然后使用observeOn()方法指定消费者在Schedulers.mainThread()主线程执行操作。
拥抱调度器:Schedulers
你可以把Schedulers认为是一个执行不同任务的线程池。如果你想在一个线程中执行一个任务,那么需要挑选一个合适的调度器Schedulers。RxJava 2提供了几种不同类型的调度器Schedulers,如果你挑选了不合适的Schedulers,那么你的代码就不是最优的,下面看下这几个调度器Schedulers:
- Schedulers.io。通常用来执行一些非即CPU密集型的操作,比如读写文件、网络操作、读写数据库等等。这个调度器没有上限,为了满足需要,它的线程池的数量可以增加。
- Schedulers.computation()。这个调度器常用来执行CPU密集型的操作,比如大量数据集的计算、图片处理等等。它的线程池的数量是有线的。由于此调度器只适合于CPU密集型任务,所以我们希望限制线程的数量,这样它们就不会在CPU时间之间相互争斗,从而使自己饿死。
- Schedulers.newThread()。每次使用这个调度器的时候,都会完全新建一个线程来执行分配的任务。它不会使用线程池,当然也不会享受线程池带来的好处。线程的创建和销毁都很昂贵,因此您应该非常小心,不要滥用过多的线程生成导致严重的系统减速和内存错误。理想情况下,您将很少使用此调度器,主要用于启动一个完全独立的线程来执行长时间运行、隔离的任务。
- Schedulers.single()。这个是RxJava 2新引进的调度器,在RxJava 1中不存在的。它是一个单独的线程,使用顺序的方式执行任务。如果你在后台有很多任务要执行,但是一次只能执行一个,这种情况下使用这个调度器是最合适的。
- Schedulers.from(Executor executor)。这个方法可以让你用自己的Executor来创建自定义的Scheduler。 假设,你想限制并行网络操作在你的应用程序被调用的数量,你可以创建一个定制的调度器并固定线程池大小,Scheduler.from(Executors.newFixedThreadPool(n)),并在代码中网络相关的Observables使用它。
- AndroidSchedulers.mainThread()。这是一个特殊的调度器,在标准的Rxjava库中找不到它,它存在于RxAndroid库。它专门为android程序设计,在UI主线程执行UI相关的操作。默认情况下,它会在与应用程序主线程关联的looper中执行队列任务,但是还有其他的特殊情况,允许我们使用像AndroidSchedulers.from(Looper looper)这样的api来使用任何Looper。
注意:在使用由无边界限制的线程池(如Schedulers.io())支持的调度程序时要小心,因为总是存在无限增长线程池和大量线程泛滥的风险。
理解observeOn和subscribeOn
相信你对Rxjava提供的几种不同的调度器已经有所理解,那么就要理解下observeOn和subscribeOn这两个重要的操作符了。
subscribeOn
这个操作符指定了上游的源观察者释放元素操作所在的线程。如果有一串观察者,那么源观察者总是位于顶部,在这里元素被生成。在前面章节中的第一个代码段中我们没有使用observeOn,你清楚地看到释放操作是在UI主线程中进行的。如果observeOn指定了Schedulers.computation()调度器,那么上游的操作都会在comoutation线程执行,如下面的代码:
Observable.just(2, 3)
.doOnNext { Log.i(TAG,"Emitting item " + it + " on: " + currentThread().getName()) }
.subscribeOn(Schedulers.computation())
.map {
Log.i(TAG,"Mapping item " + it + " on: " + currentThread().getName())
it * it
}
.filter {
Log.i(TAG,"Filtering item " + it + " on: " + currentThread().getName())
it % 2 == 0
}
.subscribe { Log.i(TAG,"Consuming item " + it + " on: " + currentThread().getName()) }
/*
Emitting item 2 on: RxComputationThreadPool-1
Mapping item 2 on: RxComputationThreadPool-1
Filtering item 4 on: RxComputationThreadPool-1
Consuming item 4 on: RxComputationThreadPool-1
Emitting item 3 on: RxComputationThreadPool-1
Mapping item 3 on: RxComputationThreadPool-1
Filtering item 9 on: RxComputationThreadPool-1
* */
这段代码中,我们没有使用完整的DisposableSubscriber,因为我们不需要onError()和onComplete(),只处理onNext()一个单独的消费者就足够了。
在观察链中,observeOn的位置没有任何影响,如果你很好奇,你可以自己试下把上面代码中的observeOn方法放到末尾(但是必须要在消费者即subscribe的前面)。
另一个重要的点是你不能在观察链中多次使用observeOn,如果你那样作的话,其实只有第一个observeOn即最接近源Observable的才会生效。
Observable.just(1, 2, 3, 4, 5, 6)
.subscribeOn(Schedulers.io())
.subscribeOn(Schedulers.computation())
.subscribeOn(Schedulers.newThread())
.doOnNext { it -> Log.i(TAG,"Emitting item " + it + " on: " + currentThread().getName()) }
.subscribe { it -> Log.i(TAG,"Consuming item " + it + " on: " + currentThread().getName()) }
/*
Emitting item 1 on: RxCachedThreadScheduler-1
Consuming item 1 on: RxCachedThreadScheduler-1
Emitting item 2 on: RxCachedThreadScheduler-1
Consuming item 2 on: RxCachedThreadScheduler-1
Emitting item 3 on: RxCachedThreadScheduler-1
Consuming item 3 on: RxCachedThreadScheduler-1
Emitting item 4 on: RxCachedThreadScheduler-1
Consuming item 4 on: RxCachedThreadScheduler-1
Emitting item 5 on: RxCachedThreadScheduler-1
Consuming item 5 on: RxCachedThreadScheduler-1
Emitting item 6 on: RxCachedThreadScheduler-1
Consuming item 6 on: RxCachedThreadScheduler-1
* */
通过看注释,你已经知道上面代码中只有subscribeOn(Schedulers.io())是生效的,其余无效。为什么呢?我简单暴力地说:Rxjava是链式操作,自上而下,下游的调度器是是上游订阅者指定的,那么要找到这个调度器就要自下而上回溯,自然就找到了距离源Observable最近的subscribeOn指定的调度器才真正起作用。不知道你有没有搞懂。
observeOn()
observeOn可以很方便地切换线程,指定了消费者所在的线程。
Observable.just(2, 3)
.doOnNext { Log.i(TAG,"Emitting item " + it + " on: " + currentThread().getName()) }
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.map {
Log.i(TAG,"Mapping item " + it + " on: " + currentThread().getName())
it * it
}
.observeOn(Schedulers.newThread())
.filter {
Log.i(TAG,"Filtering item " + it + " on: " + currentThread().getName())
it % 2 == 0
}
.observeOn(AndroidSchedulers.mainThread())
.subscribe { Log.i(TAG,"Consuming item " + it + " on: " + currentThread().getName()) }
/*
Emitting item 2 on: RxCachedThreadScheduler-1
Emitting item 3 on: RxCachedThreadScheduler-1
Mapping item 2 on: RxComputationThreadPool-1
Mapping item 3 on: RxComputationThreadPool-1
Filtering item 4 on: RxNewThreadScheduler-1
Filtering item 9 on: RxNewThreadScheduler-1
Consuming item 4 on: main
* */
通过上面的代码,你应该知道可以多次调用observeOn操作符,指定后面的操作所在的线程。这段代码还有个不同的地方是Emitting、Mapping和Filtering使用不同的调度器,那么总是先执行完Emitting,再执行完Mapping,然后执行完Filtering,最后执行Consuming,而前面的代码中Emitting、Mapping和Filtering使用相同的调度器,那么总是执行完一个完整的事件(即Emitting、Mapping、Filtering和Consuming),再执行下一个完整的事件。
Rxjava有很多的东西可讲的,我也在不断学习中。希望多多交流,有错误的地方也希望留言指正,我的联系方式:owl@violetpersimmon.com
网友评论