美文网首页
webflux的delay原理详解

webflux的delay原理详解

作者: 二当家的黑板报 | 来源:发表于2019-08-11 12:46 被阅读0次

    反应式编程一开始是从前端和客户端开始兴起,现在大有蔓延到后端的趋势,Spring5推出的webflux就是反应式编程的产物。

    webflux对比于springMVC,性能高出很多,网上已经有很多的测评,不再在过多说明。

    左图同步,右图异步

    上图看出对比于同步,异步所用的线程是比较少的,不过有个前提是,程序逻辑中有阻塞(如io阻塞等),且这种阻塞是可以异步化的。

    为了满足这个前提,反应式编程框架就必须将这些阻塞变成异步化,如新出的WebClient工具就是将http请求io异步化。

    delay方法就是用来代替sleep方法的,下面来讲解一下delay方法是怎么将延时异步化的。

    源码解读

    • 通过查看Mono<Long> delay(Duration duration)方法源码,它会构造一个MonoDelay类,并通过传入全局公用的调度器Schedulers.parallel()来调度里面的异步任务。
        public static Mono<Long> delay(Duration duration) {
            return delay(duration, Schedulers.parallel());
        }
    
        public static Mono<Long> delay(Duration duration, Scheduler timer) {
            return onAssembly(new MonoDelay(duration.toMillis(), TimeUnit.MILLISECONDS, timer));
        }
    
    • 查看MonoDelay类的订阅方法subscribe
    public void subscribe(CoreSubscriber<? super Long> actual) {
        MonoDelayRunnable r = new MonoDelayRunnable(actual);
    
        actual.onSubscribe(r);
    
        try {
        //重点在于下面的 timedScheduler.schedule(r, delay, unit)
        //通过timedScheduler来调度延时任务,而不是当前线程阻塞等待
            r.setCancel(timedScheduler.schedule(r, delay, unit));
        }
        catch (RejectedExecutionException ree) {
            if(r.cancel != OperatorDisposables.DISPOSED) {
                actual.onError(Operators.onRejectedExecution(ree, r, null, null,
                        actual.currentContext()));
            }
        }
    }
    
    • 查看ParallelScheduler的delay方法:
    public Disposable schedule(Runnable task, long delay, TimeUnit unit) {
      //pick方法会获取一个ScheduledExecutorService线程执行器给到Schedulers使用
     return Schedulers.directSchedule(pick(), task, delay, unit);
    }
    
    • 查看directSchedule方法:
    static Disposable directSchedule(ScheduledExecutorService exec,
          Runnable task,
          long delay,
          TimeUnit unit) {
      //包装任务
       SchedulerTask sr = new SchedulerTask(task);
       Future<?> f;
       if (delay <= 0L) {
          f = exec.submit((Callable<?>) sr);
       }
       else {
         //延时调度
         //ScheduledExecutorService是java自带的并发调度接口,
         //通过一条线程轮询延时队列来避免所有线程阻塞
          f = exec.schedule((Callable<?>) sr, delay, unit);
       }
      //设置结果
       sr.setFuture(f);
    
       return sr;
    }
    

    自此就可以知道为什么delay方法没有阻塞线程,因为它的延时处理都交给了ScheduledExecutorService执行器处理,调用delay方法的主线程就直接返回了,等到延时时间过后,ScheduledExecutorService就会从线程池就获取一个线程来处理延时后的任务逻辑。整个流程就类似于上面图片中的右图。

    通过反应式编程范式,将所有阻塞都修改为类似于delay之于sleep的形式,就能大幅度提升服务性能了。

    查看原文 深入了解更多知识。

    相关文章

      网友评论

          本文标题:webflux的delay原理详解

          本文链接:https://www.haomeiwen.com/subject/ffojjctx.html