此篇内容均是来自书籍《RxJava响应式编程》李衍顺 著
四、使用Scheduler进行线程调度
4.1 什么是Scheduler
RxJava 是一种响应式编程框架,响应式编程就是围绕着异步数据进行的编程,而RxJava就是使用Scheduler来实现异步的。Observable默认是单线程的,而大部分的Observable默认工作在Subscriber调用subscribe方法进行订阅时所在的线程中。但对响应速度很高的UI线程,理想情况是耗时的Observable工作在一个后台线程上,当工作完成后在UI线程上通知Subscriber来更新UI。 RxJava使用subscribeOn和observeOn将Scheduler应用到Observable和Subscriber上,并提供多种不同的Scheduler,分别是io,immediate,trampoline,computation等。此外RxJava还提供了一个from方法,可以根据传入的Executor对象来创建Scheduler。
多次调用了observeOn和subscribeOn,到底Observable和Subscriber会工作在哪个线程里呢?
-多次调用observeOn会影响从其调用位置开始的后面所有操作符合Subscriber。所以如果我们只想更改Subscriber的Scheduler, 应该在所有操作符的后面使用observeOn。
-多次调用subscribeOn时,只有最上面的那个起作用,所以subscribeOn只调用一次就行了,多了没有效果且造成困扰。
private void scheduler() {
Observable
.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
System.out.println("start:" + Thread.currentThread().getName());
subscriber.onNext(1);
subscriber.onCompleted();
}
})
.subscribeOn(Schedulers.newThread())
.map(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer integer) {
System.out.println(integer + ":" + Thread.currentThread().getName());
return integer + 1;
}
})
.observeOn(Schedulers.io())
.map(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer integer) {
System.out.println(integer + ":" + Thread.currentThread().getName());
return integer + 1;
}
})
.subscribeOn(Schedulers.computation())
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("action: " + Thread.currentThread().getName());
}
});
}
运行结果如下,因为最上面的subscribeOn使用的是newThread类型的Scheduler,所以create和map都运行在这个Scheduler上;然后后面observeOn使用的是io类型的Scheduler,所以接下来的map和Subscriber都会运行在io的Scheduler上。注意后面还有subscribeOn使用computation类型的Scheduler,但是因为上面已经有了observeOn操作符,所以computation类型的Scheduler并没起作用。
start:RxNewThreadScheduler-1
1:RxNewThreadScheduler-1
2:RxIoScheduler-2
action: RxIoScheduler-2
我们将observeOn操作符挪到最后,结果是
start:RxNewThreadScheduler-1
1:RxNewThreadScheduler-1
2:RxNewThreadScheduler-1
action: RxComputationScheduler-1
4.2 Scheduler的类型
4.2.1 computation
computation Scheduler适用于和CPU有关的任务,但不适合那些会造成阻塞的任务,如访问磁盘和网络等。这是因为computation Scheduler内部会根据当前运行环境的CPU核心数来创建一个线程池,里面的每个线程会占用一个CPU的核心,从而可以充分利用CPU的计算资源。如果在computation Scheduler上进行阻塞的操作,当前的Scheduler在阻塞的时候还会占用CPU,从而造成资源的浪费。另外,由于CPU的核心数有限,所以computation Scheduler内的线程也是有限的,如果有超出CPU核心数的任务要进行,后来的任务就必须排队等待。所以在使用computation Scheduler时,我们最好也能保证同时进行的任务数量小于CPU的核心数,这样新建的任务会立刻申请到资源并且开始运行。当没有使用Scheduler的时候, 很多和时间相关的操作符,如delay、timer、skip、take、等,所创建的Observable默认就会运行在computation Scheduler上的
4.2.2 newThread
newThread Scheduler每次都会新建一个线程。一般情况下不是很推荐这个Scheduler,这是因为每次新建一个线程都会造成稍微的延迟,而且这个线程在任务结束的时候就会终结,所以也不能重用。newThread Scheduler适合那些工作时间长且总数少的任务,大多数情况下都可以使用io Scheduler来代替newThread Scheduler。
4.2.3 io
io Scheduler类似于newThread Scheduler,不同之处是io Scheduler的线程可以被回收利用。io Scheduler内部也会维持一个线程池,当使用io Scheduler来处理任务的时候, 会先从线程池中查找空闲的线程,如果有空闲线程就会在这个空闲线程上执行任务;否则就会创建一个新的线程来执行任务,并在任务执行完毕时将这个空闲的线程加入到线程池中。当然空闲的线程不会一直在那里等待,RxJava默认空闲线程的存活时间是60秒,空闲时间超过60秒的线程会被回收。
io Scheduler特别适合那些使用很少CPU资源的I/O操作。因为I/O操作一般都会花费比较长的时间来等待网络请求或读取磁盘的返回结果,所以使用一个较大的线程池会比较合适,这样新来的任务就不需要排队等待。io Scheduler所使用的线程池是不限大小的,所有如果有足够多任务同时使用io Scheduler就会导致内存不足(OOM)的错误
4.2.4 immediate
immediate Scheduler会在当前的线程上立即开始执行任务,这会将当前线程上正在进行的任务阻塞。如果用Outer代表当前线程上正在执行的任务,用Inner来代表使用immediate Scheduler的任务,他们的执行顺序会如下所示,很明显这是一种“插队”的策略。
Outer start
Inner start
Inner end
Outer end
4.2.5 trampoline
trampoline Scheduler同immediate Scheduler很像,都会在当前线程上执行任务。但是trampoline并不是立即开始执行任务的,而是等待当前线程上之前的任务都结束之后再开始执行。同样适用Outter 和Inner来分别代表当前线程上的任务和使用trampoline Scheduler的任务:
Outer start
Outer end
Inner start
Inner end
4.2.6 from
RxJava内置的各种Scheduler可以满足绝大部分使用需求,但是不排除有一些特殊的需求无法被满足,这时候我们可以使用Schedulers.from(Executor executor) 工厂方法来跟进我们提供的Executor创建Scheduler。
首先创建一个类实现ThreadFactory接口,然后新建一个Executor对象,这是核心和最大线程池大小为2,将空闲线程的存活时间设置为2秒,使用LinkedBlockingQueue来作为任务排队序列,使用新建的ThreadFactory来创建新的线程,最后跟进我们创建的Executor对象获取一个Scheduler对象。有了这个Scheduler对象,我们就可以和其它的Scheduler一样使用它了。
private void from(){
Executor executor = new ThreadPoolExecutor(
2,
2,
2000L, TimeUnit.MILLISECONDS,
new LinkedBlockingDeque<Runnable>(1000),
new SimpleThreadFactory()
);
Observable.interval(0,1, TimeUnit.SECONDS)
.take(5)
.observeOn(Schedulers.from(executor))
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
System.out.println(aLong + "-" + Thread.currentThread().getName());
}
});
}
结果
0-Thread-1
应该是interval这个方法使用的错误了, 所以才会调用一个。待研究。
网友评论