美文网首页
Project Reactor源码分析2-线程切换

Project Reactor源码分析2-线程切换

作者: 王侦 | 来源:发表于2023-03-13 21:36 被阅读0次

4种线程调度器

  • 使用当前线程的 Schedulers.immediate()
  • 一个被复用的单一线程 Schedulers.single(),如果希望是创建新线程的模式请使用 Schedulers.newSingle()
  • 一个弹性线程池 Schedulers.elastic(),这个非常适合处理一些 I/O blocking 实践
  • 一个固定Wokers的线程池 Schedulers.parallel() 这个会创建和 CPU 核心数一样的线程池

进行线程切换的函数有两个

  • publishOn : 这个函数将影响之后的操作
  • subscribeOn : 这个函数仅仅影响 事件源 所在的线程
        Flux.just("tom")
                .map(s -> {
                    System.out.println("(concat @qq.com) at [" + Thread.currentThread() + "]");
                    return s.concat("@qq.com");
                })
                .publishOn(Schedulers.newSingle("thread-a"))
                .map(s -> {
                    System.out.println("(to string) at [" + Thread.currentThread() + "]");
                    return s;
                })
                .subscribeOn(Schedulers.newSingle("source"))
                .subscribe(System.out::println);

结果:

(concat @qq.com) at [Thread[source-1,5,main]]
(concat foo) at [Thread[thread-a-3,5,main]]
(startsWith f) at [Thread[thread-a-3,5,main]]
(to string) at [Thread[thread-b-2,5,main]]
tom@qq.com-foo

1.声明阶段

2.subscribe、onSubscribe、request阶段

2.1 LambdaSubscriber@onSubscribe (subscribeOn线程影响)

FluxSubscribeOn#subscribeOrReturn

    public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> actual) {
        Worker worker = Objects.requireNonNull(scheduler.createWorker(),
                "The scheduler returned a null Function");

        SubscribeOnSubscriber<T> parent = new SubscribeOnSubscriber<>(source,
                actual, worker, requestOnSeparateThread);
        actual.onSubscribe(parent);

        try {
            worker.schedule(parent);
        }
        catch (RejectedExecutionException ree) {
            if (parent.s != Operators.cancelledSubscription()) {
                actual.onError(Operators.onRejectedExecution(ree, parent, null, null,
                        actual.currentContext()));
            }
        }
        return null;
    }

LambdaSubscriber@onSubscribe:

SubscribeOnSubscriber#request(Long.MAX_VALUE)

  • 啥时都没干

ExecutorServiceWorker#schedule(SubscribeOnSubscriber)

  • 进入FluxSubscribeOn带的线程
  • SubscribeOnSubscriber#run
  • source.subscribe(this),也即是FluxMapFuseable#subscribe(SubscribeOnSubscriber),这里又重新进入了subscribe阶段

2.2 subscribe阶段

FluxMapFuseable#subscribe(SubscribeOnSubscriber)

  • 一路构建下去

2.3 onSubscribe

2.4 request及执行

从最外层往里面执行:

  • SubscribeOnSubscriber#requestUpstream()
  • MapFuseableSubscriber#request(Long.MAX_VALUE)
  • PublishOnSubscriber#request()
  • trySchedule(this, null, null),调用worker.schedule(this)
  • PublishOnSubscriber#run
  • MapFuseableSubscriber#onNext(t),t是tom@qq.com,调用mapper.apply(t)
  • SubscribeOnSubscriber#onNext
  • LambdaSubscriber#onNext

3.总结

  • 1)在subscribe的时候进行线程切换,subscribeOn()使得它上游的订阅阶段以及整个消费阶段异步执行。
    A)创建了SubscribeOnSubscriber对象,它同时是一个任务。
    B)在当前线程调用下游onSubscribe()方法。
    C)使用Woker异步执行SubscribeOnSubscriber任务,实际执行SubscribeOnSubscriber#run()方法。
    D)SubscribeOnSubscriber#onSubscribe(),重点是调用了requestUpstream()方法,由当前线程还是Worker向上游请求数据。

  • 2)在onNext()、onComplete()、onError()方法进行线程切换,publishOn()使得它下游的消费阶段异步执行。

参考

相关文章

网友评论

      本文标题:Project Reactor源码分析2-线程切换

      本文链接:https://www.haomeiwen.com/subject/icpcrdtx.html