创建被观察者(Observable),定义要发送的事件。
创建观察者(Observer),接受事件并做出响应操作。
观察者通过订阅(subscribe)被观察者把它们连接到一起。
new Thread() {
@Override
public void run() {
Log.d(TAG, "Thread run() 所在线程为 :" + Thread.currentThread().getName());
Observable
.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d(TAG, "Observable subscribe() 所在线程为 :" + Thread.currentThread().getName());
emitter.onNext("文章1");
emitter.onNext("文章2");
emitter.onComplete();
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "Observer onSubscribe() 所在线程为 :" + Thread.currentThread().getName());
}
@Override
public void onNext(String s) {
Log.d(TAG, "Observer onNext() 所在线程为 :" + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "Observer onError() 所在线程为 :" + Thread.currentThread().getName());
}
@Override
public void onComplete() {
Log.d(TAG, "Observer onComplete() 所在线程为 :" + Thread.currentThread().getName());
}
});
}
}.start();
输出结果为:
Thread run() 所在线程为 :Thread-2
Observer onSubscribe() 所在线程为 :Thread-2
Observable subscribe() 所在线程为 :RxCachedThreadScheduler-1
Observer onNext() 所在线程为 :main
Observer onNext() 所在线程为 :main
Observer onComplete() 所在线程为 :main
image.png
从上面的例子可以看到:
Observer(观察者)的onSubscribe()方法运行在当前线程中。
Observable(被观察者)中的subscribe()运行在subscribeOn()指定的线程中。
Observer(观察者)的onNext()和onComplete()等方法运行在observeOn()指定的线程中。
一句话总结RxJava子线程和主线程的原理
切换到子线程的关键是将Runnable放到线程池去执行,切换到主线程是利用在主线程实例化的Handle发送Message,让runnable回调到主线程去。
网友评论