WebFlux是响应式Web框架,可以在资源有限的情况下提高系统的吞吐量和伸缩性,不是提高性能,在资源相同的情况下,webFlux可以处理更多的请求。
Spring MVC 采用命令式编程的方式,代码被一句一句执行,便于开发者调试。WebFlux是基于异步响应式编程。
MVC的工作流程:主线程接收到请求->准备数据->返回数据。整个过程是单线程阻塞的,用户会感觉等待时间很长是因为,在处理好之后才返回数据给浏览器。因此,如果请求有很多,则吞吐量上不去。
WebFlux工作流程是:主线程接收到请求->立刻返回数据与函数的组合(Mono或者Flux,不是结果)->开启一个新的Work线程去做实际的数据准备工作,进行真正的业务操作->Work线程完成工作->返回用户真实数据。
Mono和Flux是Reactor的两个基本概念。
Mono和Flux属于事件发布者,为消费者提供订阅接口。当有事件发生的时候,Mono或Flux会回调消费者的相应方法,然后通知消费者相应的事件。这也是响应式编程的模型。
Mono和Flux用于处理数据异步流,它不像MVC 中那样直接返回String/List,而是将异步数据流包装成Mono或者Flux对象。
开发MVC下WebFlux的流程
1.配置依赖pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
2.编写控制器
@RestController
public class HelloWorldCtrl {
@RequestMapping("/hellw")
public Mono<String> helloworld(){
return Mono.just("hello this mono world");
}
}
3.用注解式开发实现数据的增删改查
类的实现
package com.example.demo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User {
private long id;
private String name;
private int age;
}
控制器的实现
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
@RestController
@RequestMapping("/user")
public class UserController {
Map<Long,User> users = new HashMap<>();
@PostConstruct
public void init() throws Exception{
System.out.println("i am in new array ");
users.put(Long.valueOf(1),new User(1,"longzhonghua",2));
users.put(Long.valueOf(2),new User(2,"longzhiran",2));
}
/**
* 获取所有用户
* */
@GetMapping("/list1")
public Flux<User> getAll(){
return Flux.fromIterable(users.entrySet().stream()
.map(entry->entry.getValue())
.collect(Collectors.toList()));
}
/**
* 获取单个用户
**/
@RequestMapping("/{id}")
public Mono<User> getUser(@PathVariable Long id){
return Mono.justOrEmpty(users.get(id));
}
/**
* 创建用户
* **/
@PostMapping("")
public Mono<ResponseEntity<String>> addUser(User user){
users.put(user.getId(),user);
return Mono.just(new ResponseEntity<>("添加成功", HttpStatus.CREATED));
}
/**
* 修改用户
* **/
@PutMapping("/{id}")
public Mono<ResponseEntity<User>> putUser(@PathVariable Long id,User user){
System.out.println("i am in put mapping");
user.setId(id);
users.put(id,user);
return Mono.just(new ResponseEntity<>(user,HttpStatus.CREATED));
}
/**
* 删除用户
* **/
@DeleteMapping("/{id}")
public Mono<ResponseEntity<String>> deleteUser(@PathVariable Long id){
users.remove(id);
return Mono.just(new ResponseEntity<>("删除成功",HttpStatus.ACCEPTED));
}
}
测试API的功能,获取数据,修改数据,修改数据的方法是通过PUT方法访问/user/1,提交相应的name和age字段来修改内容
删除数据,可以通过DELETE方式访问/user/1,然后删除其中的内容
响应式开发方式开发WebFlux
处理器类Handler
package com.example.demo;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;
@Component
public class HelloWorldHandler {
public Mono<ServerResponse> sayHelloWord(ServerRequest serverRequest){
return ServerResponse.ok().contentType(MediaType.TEXT_PLAIN).body(Mono.just("this is webFlus demo"),String.class);
}
}
路由器类Router
package com.example.demo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;
import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
@Component
public class Router {
@Autowired
private HelloWorldHandler helloWorldHandler;
@Bean
public RouterFunction<ServerResponse> getString(){
return route(GET("/helloworld"),req->helloWorldHandler.sayHelloWord(req));
}
}
使用WebFlux运行时候出现端口占用查看和kill
lsof -i tcp:8080
kill 842
WebFlux模式操作MongoDB数据库,实现数据的增删改查
添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>
配置文件中Application.properities配置MongoDB的地址信息
spring.data.mongodb.uri=mongodb://127.0.0.1:27017/test
创建实体类Usr.class
package com.example.demo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.Id;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Usr {
@Id
private String id;
private String name;
private int age;
}
接口实现
package com.example.demo;
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
public interface UsrRepository extends ReactiveMongoRepository<Usr,String> {
}
增删改查的API实现
package com.example.demo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
@RestController
@RequestMapping(path = "/user")
public class UserCtl {
@Autowired
private UsrRepository usrRepository;
@GetMapping(value = "/list")
public Flux<Usr> getAll(){
return usrRepository.findAll();
}
@GetMapping(value = "/listdelay",produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<Usr> getAlldelay(){
return usrRepository.findAll().delayElements(Duration.ofSeconds(1));
}
@GetMapping("/{id}")
public Mono<ResponseEntity<Usr>> getUsr(@PathVariable String id){
return usrRepository.findById(id)
.map(getUsr->ResponseEntity.ok(getUsr))
.defaultIfEmpty(ResponseEntity.notFound().build());
}
@PostMapping("")
public Mono<Usr> creatUser(@Validated Usr usr){
return usrRepository.save(usr);
}
public Mono updateUser(@PathVariable(value = "id") String id,@Validated Usr usr){
return usrRepository.findById(id)
.flatMap(
existingUser->{existingUser.setName(usr.getName());
return usrRepository.save(existingUser);
})
.map(updateUser->new ResponseEntity<>(updateUser, HttpStatus.OK))
.defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
}
@DeleteMapping("/{id}")
public Mono<ResponseEntity<Void>> deleteUser(@PathVariable(value = "id") String id){
return usrRepository.findById(id)
.flatMap(existingUser->usrRepository.delete(existingUser)
.then(Mono.just(new ResponseEntity<Void>(HttpStatus.OK))))
.defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
}
}
其他
在执行启动的时候需要先创建两个log文件,创建方法是
touch mongo.log
也不知道这里为什么不能用 echo "" > mongo.log.
mongo的启动,depath数据路径,logpath日志 路径 ,--fork后台运行
mongod --dbpath /usr/local/mongodb --logpath /usr/local/mongodb/mongo.log --fork
启动后的查看
ps aux | grep mongo
mongoDB中的可视化数据
mongodb.png
特殊的请求方式
如果使用postman的常规方式数据请求,就会出现阻塞调用的情况:
阻塞到数据流结束或超时,一次返回.png
因为涉及到了响应式
数据流的请求方式,这里postman和APIFox不支持,直接使用命令行来进行网络请求
curl -i localhost:8080/user/listdelay
这里的演示效果如图:
每一秒返回一次,滚动返回数据,直到数据结束.gif
网友评论