4种线程调度器
- 使用当前线程的 Schedulers.immediate()
- 一个被复用的单一线程 Schedulers.single(),如果希望是创建新线程的模式请使用 Schedulers.newSingle()
- 一个弹性线程池 Schedulers.elastic(),这个非常适合处理一些 I/O blocking 实践
- 一个固定Wokers的线程池 Schedulers.parallel() 这个会创建和 CPU 核心数一样的线程池
进行线程切换的函数有两个
- publishOn : 这个函数将影响之后的操作
- subscribeOn : 这个函数仅仅影响 事件源 所在的线程
Flux.just("tom")
.map(s -> {
System.out.println("(concat @qq.com) at [" + Thread.currentThread() + "]");
return s.concat("@qq.com");
})
.publishOn(Schedulers.newSingle("thread-a"))
.map(s -> {
System.out.println("(to string) at [" + Thread.currentThread() + "]");
return s;
})
.subscribeOn(Schedulers.newSingle("source"))
.subscribe(System.out::println);
结果:
(concat @qq.com) at [Thread[source-1,5,main]]
(concat foo) at [Thread[thread-a-3,5,main]]
(startsWith f) at [Thread[thread-a-3,5,main]]
(to string) at [Thread[thread-b-2,5,main]]
tom@qq.com-foo
1.声明阶段
2.subscribe、onSubscribe、request阶段
2.1 LambdaSubscriber@onSubscribe (subscribeOn线程影响)
FluxSubscribeOn#subscribeOrReturn
public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> actual) {
Worker worker = Objects.requireNonNull(scheduler.createWorker(),
"The scheduler returned a null Function");
SubscribeOnSubscriber<T> parent = new SubscribeOnSubscriber<>(source,
actual, worker, requestOnSeparateThread);
actual.onSubscribe(parent);
try {
worker.schedule(parent);
}
catch (RejectedExecutionException ree) {
if (parent.s != Operators.cancelledSubscription()) {
actual.onError(Operators.onRejectedExecution(ree, parent, null, null,
actual.currentContext()));
}
}
return null;
}
LambdaSubscriber@onSubscribe:
SubscribeOnSubscriber#request(Long.MAX_VALUE)
- 啥时都没干
ExecutorServiceWorker#schedule(SubscribeOnSubscriber)
- 进入FluxSubscribeOn带的线程
- SubscribeOnSubscriber#run
- source.subscribe(this),也即是FluxMapFuseable#subscribe(SubscribeOnSubscriber),这里又重新进入了subscribe阶段
2.2 subscribe阶段
FluxMapFuseable#subscribe(SubscribeOnSubscriber)
- 一路构建下去
2.3 onSubscribe
2.4 request及执行
从最外层往里面执行:
- SubscribeOnSubscriber#requestUpstream()
- MapFuseableSubscriber#request(Long.MAX_VALUE)
- PublishOnSubscriber#request()
- trySchedule(this, null, null),调用worker.schedule(this)
- PublishOnSubscriber#run
- MapFuseableSubscriber#onNext(t),t是tom@qq.com,调用mapper.apply(t)
- SubscribeOnSubscriber#onNext
- LambdaSubscriber#onNext
3.总结
-
1)在subscribe的时候进行线程切换,subscribeOn()使得它上游的订阅阶段以及整个消费阶段异步执行。
A)创建了SubscribeOnSubscriber对象,它同时是一个任务。
B)在当前线程调用下游onSubscribe()方法。
C)使用Woker异步执行SubscribeOnSubscriber任务,实际执行SubscribeOnSubscriber#run()方法。
D)SubscribeOnSubscriber#onSubscribe(),重点是调用了requestUpstream()方法,由当前线程还是Worker向上游请求数据。 -
2)在onNext()、onComplete()、onError()方法进行线程切换,publishOn()使得它下游的消费阶段异步执行。
网友评论