- 原文链接:Observe on the correct thread
- 原文作者: Dionysis Lorentzos
- 译文出自: 小鄧子的简书
- 译者: 小鄧子
- 状态: 完成
尽管很多人了解RxJava的基本逻辑,但是在Observable链和操作符究竟运行在哪个线程,仍然会有许多困惑。
首先,让我们梳理清晰,在RxJava中.subsribeOn( )
和.observeOn( )
区别:
-
.subsribeOn( )
操作符可以改变Observable应该在哪个调度器上执行任务。 -
.observeOn( )
操作符可以改变Observable将在哪个调度器上发送通知。 -
另外,你需要知道,默认情况下,链上的操作符将会在调用
.subsribeOn( )
的那个线程上执行任务。
一些例子##
1. 主线程或者 .subscribe( )所在线程
如果在Android的Activity下onCreate( )
方法中,也就是主线程中使用如下代码:
Observable.just(1,2,3)
.subscribe( );
表现会像这样:

2. 调用 .subscribeOn( )
尽管代码片段在主线程中,但是整个代码块将运行在.subscribeOn( )
定义的线程上:
Observable.just(1,2,3)
.subscribeOn(Schedulers.newThread())
.subscribe();
表现会像这样:

3. 调用 .observeOn( )
如果你的代码片段在主线程中,默认情况下Observable的创建是在.subscribeOn( )
定义的线程上,但是,调用.observeOn( )
之后,余下的代码将会执行在.observeOn( )
所定义的线程上:
Observable.just(1,2,3)
.observeOn(Schedulers.newThread())
.subscribe();

3. 合并逻辑
照理合并操作符,放在一起就像这样:
Observable.just(1,2,3)
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.subscribe();

一些技巧##
1. UI线程运行异常
Observable.just(1,2,3)
.subscribeOn(Schedulers.newThread())
.subscribe(/** 与UI线程相关的逻辑 **//);
很明显,这是错误哒。
2. 保证逻辑运行在工作线程中
如果存在以下代码片段:
Observable.just(1,2,3)
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.flatMap(/** 与UI线程无关的逻辑**//)
.subscribe();
请用以下代码替代:
Observable.just(1,2,3)
.subscribeOn(Schedulers.newThread())
.flatMap(/** 与UI线程无关的逻辑**//)
.observeOn(AndroidSchedulers.mainThread())
.subscribe();
通过用第二段代码代替第一段,.flatMap( )
操作符(或者在这一点的其他逻辑操作符)将运行在后台线程。这样做就不会阻塞UI线程,同时可以防患ANR或其他类似问题的发生。看起来有点像AsyncTask模式,尽可能的把逻辑放在的.doInBackground( )
中,而不是.onPostExecute( )
。
3. 取决于更早的 .subscribeOn( )
以下代码:
Observable.just(1,2,3)
.subscribeOn(thread1)
.subscribeOn(thread2)
.subscribe();
因为thread1的逻辑将会覆盖thread2,所以Observable的创建和.subscribe( )
的逻辑处理都将运行在thread1中。因此,根本没有必要写多个.subscribeOn( )
操作符。
网友评论
System.out.println("subscribeOn(Schedulers.newThread() " + Thread.currentThread().getName());
return s;
}).subscribeOn(Schedulers.io()).map(s -> {
System.out.println("subscribeOn(Schedulers.io() " + Thread.currentThread().getName());
return s;
}).observeOn(Schedulers.io()).map(s -> {
System.out.println("observeOn(Schedulers.io() " + Thread.currentThread().getName());
return s;
}).observeOn(Schedulers.computation()).map(s -> {
System.out.println("observeOn(Schedulers.computation() " + Thread.currentThread().getName());
return s;
}).subscribeOn(Schedulers.immediate()).map(s -> {
System.out.println("subscribeOn(Schedulers.immediate() " + Thread.currentThread().getName());
return s;
})
.subscribe((a) -> {
System.out.println("onComplete " + Thread.currentThread().getName());
});
验证了下,结果一致,
subscribeOn(Schedulers.newThread() RxNewThreadScheduler-1
subscribeOn(Schedulers.io() RxNewThreadScheduler-1
observeOn(Schedulers.io() RxCachedThreadScheduler-1
observeOn(Schedulers.computation() RxComputationThreadPool-3
subscribeOn(Schedulers.immediate() RxComputationThreadPool-3
onComplete RxComputationThreadPool-3
有些case是需要用到多个.subscribeOn( )的,比如
// 当前线程不是UI线程
Observable.create(...) // 这是一个IO操作
.subscribeOn(Schedulers.io())
.doOnSubscribe(() -> view.XXX) // 这是一个UI操作,例如显示进度条
.subscribeOn(AndroidSchedulers.mainThread())
...