如果想要在RxJava中引入多线程的功能,可以使用一些操作符使得RxJava在指定的线程上运行。
比如说一些耗时的操作,如我们看电影,一个线程在后台下载数据,然后播放视频在当前的线程,这样就可以给我们更好的观影体验。
默认情况下,Observable和其链式的操作,包括调用subscrbe方法都会在同一个线程上。SubscribeOn操作符指定Observable在特定的一个线程上进行操作,ObserveOn操作符则指定了Observable在哪个线程上通知它的观察者(Observer)。
官方提供了一个示意图:
- SubscribeOn操作符指定了Observable初始时运行的线程,不管在哪个位置调用的SubscribeOn
- ObserveOn影响后续的操作符运行的线程,因此我们可以多次调用ObserveOn
运行一段代码即可看出其中关系:
public class RxSchedulerDemo {
public static void main(String[] args) throws InterruptedException {
Observable.just("Scheduler Demo")
.map(s -> {
System.out.println("当前线程:" + Thread.currentThread().getName());
return s;
})
// 指定最开始时要运行的线程
.subscribeOn(Schedulers.newThread())
// 指定接下来的操作要运行的线程
.observeOn(Schedulers.newThread())
.map(s -> {
System.out.println("当前线程:" + Thread.currentThread().getName());
return s;
})
.subscribe(s -> System.out.println("当前线程:" + Thread.currentThread().getName()));
;
Thread.sleep(1000);
Observable.just("Scheduler Demo")
.map(s -> {
System.out.println("当前线程:" + Thread.currentThread().getName());
return s;
})
.subscribeOn(Schedulers.trampoline())
.observeOn(Schedulers.newThread())
.map(s -> {
System.out.println("当前线程:" + Thread.currentThread().getName());
return s;
})
.subscribe(s -> System.out.println("当前线程:" + Thread.currentThread().getName()));
;
Thread.sleep(1000);
Observable.just("Scheduler Demo")
.map(s -> {
System.out.println("当前线程:" + Thread.currentThread().getName());
return s;
})
.subscribeOn(Schedulers.newThread())
.subscribe(s -> System.out.println("当前线程:" + Thread.currentThread().getName()));
}
}
里面有几个关键的类,顺便提一下。
- Scheduler: 抽象类,可以创建Worker
- Worker:真正做线程调度的类,schedule方法做线程调度,入参为Runnable
最后
对RxJava的Schedules有个概念方便使用,想要更深入其中的内容,还需要再探索其中的源码才行啊。
网友评论