美文网首页
Project Reactor源码分析2-线程切换

Project Reactor源码分析2-线程切换

作者: 王侦 | 来源:发表于2023-03-13 21:36 被阅读0次

    4种线程调度器

    • 使用当前线程的 Schedulers.immediate()
    • 一个被复用的单一线程 Schedulers.single(),如果希望是创建新线程的模式请使用 Schedulers.newSingle()
    • 一个弹性线程池 Schedulers.elastic(),这个非常适合处理一些 I/O blocking 实践
    • 一个固定Wokers的线程池 Schedulers.parallel() 这个会创建和 CPU 核心数一样的线程池

    进行线程切换的函数有两个

    • publishOn : 这个函数将影响之后的操作
    • subscribeOn : 这个函数仅仅影响 事件源 所在的线程
            Flux.just("tom")
                    .map(s -> {
                        System.out.println("(concat @qq.com) at [" + Thread.currentThread() + "]");
                        return s.concat("@qq.com");
                    })
                    .publishOn(Schedulers.newSingle("thread-a"))
                    .map(s -> {
                        System.out.println("(to string) at [" + Thread.currentThread() + "]");
                        return s;
                    })
                    .subscribeOn(Schedulers.newSingle("source"))
                    .subscribe(System.out::println);
    

    结果:

    (concat @qq.com) at [Thread[source-1,5,main]]
    (concat foo) at [Thread[thread-a-3,5,main]]
    (startsWith f) at [Thread[thread-a-3,5,main]]
    (to string) at [Thread[thread-b-2,5,main]]
    tom@qq.com-foo
    

    1.声明阶段

    2.subscribe、onSubscribe、request阶段

    2.1 LambdaSubscriber@onSubscribe (subscribeOn线程影响)

    FluxSubscribeOn#subscribeOrReturn

        public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> actual) {
            Worker worker = Objects.requireNonNull(scheduler.createWorker(),
                    "The scheduler returned a null Function");
    
            SubscribeOnSubscriber<T> parent = new SubscribeOnSubscriber<>(source,
                    actual, worker, requestOnSeparateThread);
            actual.onSubscribe(parent);
    
            try {
                worker.schedule(parent);
            }
            catch (RejectedExecutionException ree) {
                if (parent.s != Operators.cancelledSubscription()) {
                    actual.onError(Operators.onRejectedExecution(ree, parent, null, null,
                            actual.currentContext()));
                }
            }
            return null;
        }
    

    LambdaSubscriber@onSubscribe:

    SubscribeOnSubscriber#request(Long.MAX_VALUE)

    • 啥时都没干

    ExecutorServiceWorker#schedule(SubscribeOnSubscriber)

    • 进入FluxSubscribeOn带的线程
    • SubscribeOnSubscriber#run
    • source.subscribe(this),也即是FluxMapFuseable#subscribe(SubscribeOnSubscriber),这里又重新进入了subscribe阶段

    2.2 subscribe阶段

    FluxMapFuseable#subscribe(SubscribeOnSubscriber)

    • 一路构建下去

    2.3 onSubscribe

    2.4 request及执行

    从最外层往里面执行:

    • SubscribeOnSubscriber#requestUpstream()
    • MapFuseableSubscriber#request(Long.MAX_VALUE)
    • PublishOnSubscriber#request()
    • trySchedule(this, null, null),调用worker.schedule(this)
    • PublishOnSubscriber#run
    • MapFuseableSubscriber#onNext(t),t是tom@qq.com,调用mapper.apply(t)
    • SubscribeOnSubscriber#onNext
    • LambdaSubscriber#onNext

    3.总结

    • 1)在subscribe的时候进行线程切换,subscribeOn()使得它上游的订阅阶段以及整个消费阶段异步执行。
      A)创建了SubscribeOnSubscriber对象,它同时是一个任务。
      B)在当前线程调用下游onSubscribe()方法。
      C)使用Woker异步执行SubscribeOnSubscriber任务,实际执行SubscribeOnSubscriber#run()方法。
      D)SubscribeOnSubscriber#onSubscribe(),重点是调用了requestUpstream()方法,由当前线程还是Worker向上游请求数据。

    • 2)在onNext()、onComplete()、onError()方法进行线程切换,publishOn()使得它下游的消费阶段异步执行。

    参考

    相关文章

      网友评论

          本文标题:Project Reactor源码分析2-线程切换

          本文链接:https://www.haomeiwen.com/subject/icpcrdtx.html