美文网首页
Reactive、Reactor和webflux

Reactive、Reactor和webflux

作者: lbjfish | 来源:发表于2020-09-12 20:50 被阅读0次

    开题

    Reactor顾名思义核反应堆,光听名字就知道它有多强了,首先Reactor是异步非阻塞的,基于netty,而tomcat不是,一个请求一个线程(除了Servlet3.1以上),使用Reactor就是整个代码在执行链上也更清晰,做过前端的同学应该很有感悟,不管是jquery还是vue都是一级一级往下点,那种感觉真的很爽,当然java8也有类似体验。所以诞生之初,这个东西就不是为了java而生的,java是重语言,强调稳定性,直到jdk1.8也不愿意为reactor妥协。最终Spring按奈不住了,率先把reactor集成至自己函数库中,所以Spring的版本至少是Spring5,而jdk至少是1.7,(因为Spring5实现了很多关于响应式编程的东西),然后webflux坑很深,完全看上去像另一门语言(重点),所以学习成本相对陡峭,我更多从使用者方向去思考。
    国内使用这个技术的公司好像没几家,除了我上家公司以外(用的也不是很好),首先我知道的有阿里,当然也只是一部分技术部分,也是我同学告诉我的,然后我在这家公司刚做技术选型时注册中心还在纠结eureka还是nacos,因为我来这之前根本不会nacos,只是听过,用eureka倒是很熟,但是看到nacos支持响应式编程,我还是很开心的,加上其他一些因素还是选定了nacos。
    所以大家暂时不用担心,这个技术暂时还不会取代java命令式编程,因为兼容其他中间件还需要时间,但是也是一个警告。

    命令式编程和响应式编程区别

    命令式编程一行一个代码,我们很明确就能知道,下一行代码跟上一行代码关系,因为是按步骤一步一步往下走的,最终返回的那个结果是上面一行一行代码组合最终的呈现结果。

    而响应式编程不一样,它不会再描述每一步我们要进行的步骤,它只描述你要构建数据将要流经的管道,当数据流经管道时,可以对它们进行某种形式的修改或者使用。这样做的好处是我们不再关注每一行代码是做什么的(想象有100行代码),只需要关注管道最终返回的结果是什么,然后依据上一个管代的结果,流到我们这个管道需要做什么。每个管道都是异步非阻塞的。

    主要原因是Servlet是阻塞和多线程的,每个连接都会使用一个线程。在请求处理的时候,会在线程池中拉取一个worker线程来对请求进行处理。同时,请求线程是阻塞的,直到worker线程提示它完成为止。这也带来的后果就是阻塞式Web框架在大量请求无法有效地扩展。缓慢的worker线程所带来的延迟会使情况变得更糟,因为它将花费更长的时间才能将worker线程送回池中,准备处理另一个请求。在某些场景中,这种设计完全可以接受。事实上,这种方式也是这十年来Web应用程序的开发方式,但是时代在改变。这种方式适合以前偶尔浏览网站的人们,而现在人们会频繁消费HTTPAPI,他们会持续地和Web API交换数据。

    事件轮询请求

    数据库支持

    1. 2.3执行时间和普通比。
    2. Spring Data Reactive不支持 MySQL,进一步也不支持 MySQL 事务。所以用了 Reactive 原来的 spring 事务管理就不好用了。jdbc jpa 的事务是基于阻塞 IO 模型的,如果 Spring Data Reactive 没有升级 IO 模型去支持 JDBC,生产上的应用只能使用不强依赖事务的。

    Reactor的主要类

    在Reactor中,经常使用的类并不多,主要有以下两个:

    • Mono 实现了 org.reactivestreams.Publisher 接口,代表0到1个元素的发布者(Publisher)
    • Flux 同样实现了 org.reactivestreams.Publisher 接口,代表0到N个元素的发布者

    Publisher

    Mono和Flux都是Publisher,发布者起到发送流数据作用。

    Subscriber

    1.Subscriber,因为一次只请求一个元素会导致本身效率低下。
    2.为了验证是不是一次请求一个元素,fromInter 或 range。
    onComplete因为是多线程,为了防止发布者和订阅者结束后有个通知,否则会造成周期竞争。
    onComplete或onError都会触发终止订阅

    Subscription 和 Processor

    发布者、订阅者关系流程

    Backpressure

    指在异步场景中,被观察者发送事件速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略,可能最终导致溢出。subscription

    doOnSubscribe 和 doOnNext

    1. doOnSubscribe是事件被订阅之前(也就是事件源发起之前)会调用的方法, 它一般执行在 subscribe() 发生的线程;而如果在 doOnSubscribe() 之后有 subscribeOn() 的话,它将执行在离它最近的 subscribeOn() 所指定的线程。
    2. doOnNext是观察者被通知之前(也就是回调之前)会调用的方法,说白了就是最终回调之前的前一个回调方法,这个方法一般做的事件类似于观察者做的事情,只是自己不是最终的回调者。(观察者即最终回调者)

    Mono和Flux

    开发者应只关注Publisher ,如果开发中间件,redis、dubbo,甚至nacos, Web Flux 则会自动帮我们实现 Subscriber

    Flux类的静态方法

    • just():可以指定序列中包含的全部元素。
    • fromArray(),fromIterable()和 fromStream():可以从一个数组、Iterable 对象或 Stream 对象中创建 Flux 对象。
    • empty():创建一个不包含任何元素,只发布结束消息的序列。
    • error(Throwable error):创建一个只包含错误消息的序列。
    • range(int start, int count):创建包含从 start 起始的 count 个数量的 Integer 对象的序列。
    • interval(Duration period):创建一个包含了从 0 开始递增的 Long 对象的序列。其中包含的元素按照指定的间隔来发布。除了间隔时间之外,还可以指定起始元素发布之前的延迟时间。
    • concat,类似于Mono的zip(但是不一样,这个不会返回tuple)。
    • concatWith,类似于Mono的zipWith(但是不一样,这个不会返回tuple)。
    • concatMapIterable(Arrays.asList),会在Flux.just(1,2,3)每个元素中逐个穿插指定集合元素,或对当前Flux进行数据操作(比如逐个元素加2)。
    • defer():一种懒创建方式,对比just。
    • MathFlux.sumInt(Flux.range),这是新出的一种封装,MathFlux有多种函数计算实现,可以根据不同场景选型。替换可.as()。也可替换reduce也能实现上述功能,但是reduce功能更加强大,不仅可以对数字甚至任何类型都可以。
    • generate() 方法同步和逐一的方式来产生 Flux 序列,next()方法只能最多被调用一次,不调用 complete()方法,所产生的是一个无限序列。
    • create() 跟上面类似,只是它允许有多个元素。
    • buffer(int) 和 bufferUntil(), 这两个操作符的作用是把当前流中的元素收集到集合中,并把集合对象作为流中的新元素。在进行收集时可以指定不同的条件。
    • concatWith(Flux)、onErrorResume()、onErrorReturn、doFinally 和 retry() 异常处理对比java。
    • collectList() 和 collect(Collectors.toList()) ,Mono转Flux。

    Mono类的静态方法

    • zipWith():不需要上一个Mono的结果(类型可以不一样)。
    • zipWhen():需要上一个Mono的结果(类型可以不一样)。
    • zip():组装多个Mono(类型可以不一样)。
    • flatMapMany():Mono转Flux。
    • delayElement,类似于Thread.sleep,可以结合map(同步)、flatMap(异步)。

    Flux和Mono共有方法

    • transform():抽出公共部分组装。
    • defer():同Flux。
    • publishOn(Schedulers) 和 subscribeOn(Schedulers),可以动态切换线程,可以结合buffer、log使用。

    Schedulers 类有如下几种对上下文操作的静态方法:

    • immediate():无执行上下文,提交的Runnable将直接在原线程上执行,可以理解没有调度
    • single():可重用单线程,使用一个线程处理所有请求
    • elastic(): 没有边界的弹性线程池
    • boundedElastic():有边界弹性线程池,设置线程限制,默认为cpu核心数*10。达到上限后最多可以提交10万个任务。是阻塞线程的方法
    • parallel(): 固定线程数量的并行线程池,线程数量和cpu内核一样多

    WebFlux

    RouterFunction 类似 Spring Web 的 @RequestMapping 。RouterFunction 用来定义 Spring 5 应用的路由信息。RouterFunctions 助手类包含一些有用的方法,例如 route 定义路由并构建 RouterFunction 对象。RequestPredicates 包含大量有用的方法如 GET, POST, path, queryParam ,accept, headers, contentType 等等,可用来定义路由和构建 RouterFunction。每个 Route 映射到一个处理方法,当接收到 HttpRequest 请求的时候就会调用。

    Mono<ServerResponse> 是在配置控制器方法中返回的,而不是controller。

    RouterFunction 为应用程序提供了 DSL 风格的路由功能。此时,Spring 并不支持两种风格混合使用。

    相关文章

      网友评论

          本文标题:Reactive、Reactor和webflux

          本文链接:https://www.haomeiwen.com/subject/mrxpektx.html