美文网首页
4、webflux

4、webflux

作者: lesline | 来源:发表于2018-10-15 08:35 被阅读73次

Spring WebFlux 概述

图一:

webflux1.png

图二:


webflux2.png

从图可以看出对支持Spring 5的Spring Boot 2.0来说,新加入的响应式技术栈是其主打核心特性。

左侧是传统的基于Servlet的Spring Web MVC框架,右侧是5.0版本新引入的基于Reactive Streams的Spring WebFlux框架,从上到下依次是Router Functions,WebFlux,Reactive Streams三个新组件。
Router Functions: 对标@Controller,@RequestMapping等标准的Spring MVC注解,提供一套函数式风格的API,用于创建Router,Handler和Filter。
WebFlux: 核心组件,协调上下游各个组件提供响应式编程支持。
Reactive Streams: 一种支持背压(Backpressure)的异步数据流处理标准,主流实现有RxJava和Reactor,Spring WebFlux默认集成的是Reactor。

在Web容器的选择上,Spring WebFlux既支持像Tomcat,Jetty这样的的传统容器(前提是支持Servlet 3.1 Non-Blocking IO API),又支持像Netty,Undertow那样的异步容器。不管是何种容器,Spring WebFlux都会将其输入输出流适配成Flux格式,以便进行统一处理。

值得一提的是,除了新的Router Functions接口,Spring WebFlux同时支持使用老的Spring MVC注解声明Reactive Controller。和传统的MVC Controller不同,Reactive Controller操作的是非阻塞的ServerHttpRequest和ServerHttpResponse,而不再是Spring MVC里的HttpServletRequest和HttpServletResponse。

示例:

1、增加依赖

  <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId> 
    </dependency>

2、DAO层:

import com.getset.webfluxdemo.model.User;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
import reactor.core.publisher.Mono;

public interface UserRepository extends ReactiveCrudRepository<User, String> {
    Mono<User> findByUsername(String username);
    Mono<Long> deleteByUsername(String username);
}

3、service层:

@Service
public class UserService {
    @Autowired
    private UserRepository userRepository;

    public Flux<User> findAll() {
        return userRepository.findAll().log();
    }
    public Mono<Long> deleteByUsername(String username) {
        return userRepository.deleteByUsername(username);
    }
    public Mono<User> findByUsername(String username) {
        return userRepository.findByUsername(username);
    }
}

4、Spring MVC注解声明Reactive Controller:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@RestController
@RequestMapping(“/user”)
public class UserController {
    @Autowired
    private UserService userService;

    @DeleteMapping(“/{username}”)
    public Mono<Long> deleteByUsername(@PathVariable String username) {
        return this.userService.deleteByUsername(username);
    }
    @GetMapping(“/{username}”)
    public Mono<User> findByUsername(@PathVariable String username) {
        return this.userService.findByUsername(username);
    }

    @GetMapping(value = “”, produces = MediaType./APPLICATION_STREAM_JSON_VALUE/)
    public Flux<User> findAll() {
        return this.userService.findAll().delayElements(Duration./ofSeconds/(1));
    }
}

5、Router Functions接口实现

import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;

import static org.springframework.web.reactive.function.server.RequestPredicates.*;
import static org.springframework.web.reactive.function.server.RouterFunctions.nest;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;

@Configuration
public class AllRouters {

    @Bean
    RouterFunction<ServerResponse> userRouter(UserHandler handler) {
        return nest(
                // 相当于类上面的 @RequestMapping("/user")
                path("/user"),
                // 下面的相当于类里面的 @RequestMapping
                // 得到所有用户
                route(GET("/"), handler::getAllUser)
                        // 创建用户
                        .andRoute(POST("/").and(accept(MediaType.APPLICATION_JSON_UTF8)),
                                handler::createUser)
                        // 删除用户
                        .andRoute(DELETE("/{id}"), handler::deleteUserById));
    }
}
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;

import static org.springframework.http.MediaType.APPLICATION_JSON_UTF8;
import static org.springframework.web.reactive.function.server.ServerResponse.notFound;
import static org.springframework.web.reactive.function.server.ServerResponse.ok;

@Component
public class UserHandler {
    private final UserRepository repository;
    public UserHandler(UserRepository rep) {
        this.repository = rep;
    }
    /**
     * 得到所有用户
     */
    public Mono<ServerResponse> getAllUser(ServerRequest request) {
        return ok().contentType(APPLICATION_JSON_UTF8)
                .body(this.repository.findAll(), User.class);
    }

    /**
     * 创建用户
     */
    public Mono<ServerResponse> createUser(ServerRequest request) {
        // 2.0.0 是可以工作, 但是2.0.1 下面这个模式是会报异常
        Mono<User> user = request.bodyToMono(User.class);

        return user.flatMap(u -> {
            // 校验代码需要放在这里
            CheckUtil.checkName(u.getName());

            return ok().contentType(APPLICATION_JSON_UTF8)
                    .body(this.repository.save(u), User.class);
        });
    }

    /**
     * 根据id删除用户
     */
    public Mono<ServerResponse> deleteUserById(ServerRequest request) {
        String id = request.pathVariable("id");

        return this.repository.findById(id)
                .flatMap(
                        user -> this.repository.delete(user).then(ok().build()))
                .switchIfEmpty(notFound().build());
    }

}

6、实现持续推送消息:mongDB-tail

@RestController
@RequestMapping("/events")
public class MyEventController {
    @Autowired
    private MyEventRepository myEventRepository;

    @GetMapping(path = "", produces = MediaType./APPLICATION_STREAM_JSON_VALUE/)
    public Flux<MyEvent> getEvents() {
        return this.myEventRepository.findBy();
    }
}

import com.getset.webfluxdemo.model.MyEvent;
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
import org.springframework.data.mongodb.repository.Tailable;
import reactor.core.publisher.Flux;

public interface MyEventRepository extends ReactiveMongoRepository<MyEvent, Long> {
    @Tailable
    Flux<MyEvent> findBy();
}

@Data
@AllArgsConstructor
@NoArgsConstructor
@Document(collection = "event")
public class MyEvent {
    @Id
    private Long id;
    private String message;
}

(5)Spring WebFlux快速上手——响应式Spring的道法术器 - 刘康的专栏 - CSDN博客

响应式Spring Data

开发基于响应式流的应用,就像是在搭建数据流流动的管道,从而异步的数据能够顺畅流过每个环节。前边的例子主要聚焦于应用层,然而绝大多数系统免不了要与数据库进行交互,所以我们也需要响应式的持久层API和支持异步的数据库驱动。
目前Spring Data支持的可以进行响应式数据访问的数据库有MongoDB、Redis、Apache Cassandra和CouchDB。
Spring Boot 2.0 有两条不同的线:
Spring Web MVC -> Spring Data
Spring WebFlux -> Spring Data Reactive

1、对于Spring Data Reactive原,来的 Spring 针对 Spring Data (JDBC等)的事务管理肯定不起作用了。因为原来的 Spring 事务管理(Spring Data JPA)都是基于 ThreadLocal 传递事务的,其本质是基于 阻塞 IO 模型,不是异步的。但 Reactive 是要求异步的,不同线程里面 ThreadLocal 肯定取不到值了。如果想在Reactive 编程中做到事务,通过在参数上面传递 conn,复杂度较高。
2、对于Spring Data,想使用反应式编程,可以通过协程或线程异步集成。使用时注意:spring申明式事务管理时,线程边界保证事务在同一个线程中。

          public Flux<Order> findAll() {
              return Flux.fromCallable(
                              () ->  orderRepository.findAll()
                         ).subscribeOn(Schedulers.elastic());
       }
       orderRepository.findAll方法返回值类型List<Order>

为啥只能运行在 Servlet 3.1+ 容器

大家知道,3.1 规范其中一个新特性是异步处理支持。
异步处理支持:Servlet 线程不需一直阻塞,即不需要到业务处理完毕再输出响应,然后结束 Servlet线程。异步处理的作用是在接收到请求之后,Servlet 线程可以将耗时的操作委派给另一个线程来完成,在不生成响应的情况下返回至容器。主要应用场景是针对业务处理较耗时的情况,可以减少服务器资源的占用,并且提高并发处理速度。
WebFlux的实现需要容器的异步支持,所以 WebFlux 支持的容器可以是 Tomcat、Jetty(Non-Blocking IO API) ,也可以是 Netty 和 Undertow,其本身就支持异步容器。在容器中 Spring WebFlux 会将输入流适配成 Mono 或者 Flux 格式进行统一处理。


参考:

官网:
Web on Reactive Stack
23. WebFlux framework

响应式Spring的道法术器:
响应式Spring的道法术器(Spring WebFlux 教程) - CSDN博客
响应式Spring的道法术器(Spring WebFlux 快速上手 + 全面介绍)-刘康的博客-51CTO博客

使用 Spring 5 的 WebFlux 开发反应式 Web 应用
聊聊 Spring Boot 2.0 的 WebFlux | 泥瓦匠-右侧关注我的公众号吧

相关文章

网友评论

      本文标题:4、webflux

      本文链接:https://www.haomeiwen.com/subject/iemxzftx.html