美文网首页
WebFlux使用笔记

WebFlux使用笔记

作者: 数字d | 来源:发表于2022-03-16 14:34 被阅读0次

WebFlux是响应式Web框架,可以在资源有限的情况下提高系统的吞吐量和伸缩性,不是提高性能,在资源相同的情况下,webFlux可以处理更多的请求。
Spring MVC 采用命令式编程的方式,代码被一句一句执行,便于开发者调试。WebFlux是基于异步响应式编程。
MVC的工作流程:主线程接收到请求->准备数据->返回数据。整个过程是单线程阻塞的,用户会感觉等待时间很长是因为,在处理好之后才返回数据给浏览器。因此,如果请求有很多,则吞吐量上不去。
WebFlux工作流程是:主线程接收到请求->立刻返回数据与函数的组合(Mono或者Flux,不是结果)->开启一个新的Work线程去做实际的数据准备工作,进行真正的业务操作->Work线程完成工作->返回用户真实数据。
Mono和Flux是Reactor的两个基本概念。
Mono和Flux属于事件发布者,为消费者提供订阅接口。当有事件发生的时候,Mono或Flux会回调消费者的相应方法,然后通知消费者相应的事件。这也是响应式编程的模型。
Mono和Flux用于处理数据异步流,它不像MVC 中那样直接返回String/List,而是将异步数据流包装成Mono或者Flux对象。


开发MVC下WebFlux的流程

1.配置依赖pom.xml

    <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>

2.编写控制器

@RestController
public class HelloWorldCtrl {
    @RequestMapping("/hellw")
    public Mono<String> helloworld(){
        return Mono.just("hello this mono world");
    }
}

3.用注解式开发实现数据的增删改查
类的实现

package com.example.demo;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User {
    private long id;
    private String name;
    private int age;
}

控制器的实现


import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;

@RestController
@RequestMapping("/user")
public class UserController {
    Map<Long,User> users = new HashMap<>();

    @PostConstruct
    public void init() throws Exception{
        System.out.println("i am in new array ");
        users.put(Long.valueOf(1),new User(1,"longzhonghua",2));
        users.put(Long.valueOf(2),new User(2,"longzhiran",2));
    }

    /**
     * 获取所有用户
     * */
    @GetMapping("/list1")
    public Flux<User> getAll(){
        return Flux.fromIterable(users.entrySet().stream()
                .map(entry->entry.getValue())
                .collect(Collectors.toList()));
    }
    /**
     * 获取单个用户
    **/
    @RequestMapping("/{id}")
    public Mono<User> getUser(@PathVariable Long id){
        return Mono.justOrEmpty(users.get(id));
    }

    /**
     * 创建用户
     * **/
    @PostMapping("")
    public Mono<ResponseEntity<String>>  addUser(User user){
        users.put(user.getId(),user);
        return Mono.just(new ResponseEntity<>("添加成功", HttpStatus.CREATED));
    }

    /**
     * 修改用户
     * **/
    @PutMapping("/{id}")
    public Mono<ResponseEntity<User>> putUser(@PathVariable Long id,User user){
        System.out.println("i am in put mapping");
        user.setId(id);
        users.put(id,user);
        return Mono.just(new ResponseEntity<>(user,HttpStatus.CREATED));
    }

    /**
     * 删除用户
     * **/
    @DeleteMapping("/{id}")
    public Mono<ResponseEntity<String>> deleteUser(@PathVariable Long id){
        users.remove(id);
        return Mono.just(new ResponseEntity<>("删除成功",HttpStatus.ACCEPTED));
    }
}

测试API的功能,获取数据,修改数据,修改数据的方法是通过PUT方法访问/user/1,提交相应的name和age字段来修改内容
删除数据,可以通过DELETE方式访问/user/1,然后删除其中的内容

响应式开发方式开发WebFlux

处理器类Handler

package com.example.demo;

import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;

@Component
public class HelloWorldHandler {
    public Mono<ServerResponse> sayHelloWord(ServerRequest  serverRequest){
        return ServerResponse.ok().contentType(MediaType.TEXT_PLAIN).body(Mono.just("this is webFlus demo"),String.class);
    }
}

路由器类Router

package com.example.demo;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;

import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;

@Component
public class Router {
    @Autowired
    private HelloWorldHandler  helloWorldHandler;
    @Bean
    public RouterFunction<ServerResponse> getString(){
        return route(GET("/helloworld"),req->helloWorldHandler.sayHelloWord(req));
    }
}

使用WebFlux运行时候出现端口占用查看和kill

lsof -i tcp:8080
kill 842

WebFlux模式操作MongoDB数据库,实现数据的增删改查

添加依赖

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>

配置文件中Application.properities配置MongoDB的地址信息

spring.data.mongodb.uri=mongodb://127.0.0.1:27017/test

创建实体类Usr.class

package com.example.demo;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.Id;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Usr {
    @Id
    private String id;
    private String name;
    private int age;
}

接口实现

package com.example.demo;

import org.springframework.data.mongodb.repository.ReactiveMongoRepository;

public interface UsrRepository extends ReactiveMongoRepository<Usr,String> {
}

增删改查的API实现

package com.example.demo;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;

@RestController
@RequestMapping(path = "/user")
public class UserCtl {
    @Autowired
    private UsrRepository usrRepository;

    @GetMapping(value = "/list")
    public Flux<Usr> getAll(){
        return usrRepository.findAll();
    }

    @GetMapping(value = "/listdelay",produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
    public Flux<Usr> getAlldelay(){
        return usrRepository.findAll().delayElements(Duration.ofSeconds(1));
    }

    @GetMapping("/{id}")
    public Mono<ResponseEntity<Usr>> getUsr(@PathVariable String id){
        return usrRepository.findById(id)
                .map(getUsr->ResponseEntity.ok(getUsr))
                .defaultIfEmpty(ResponseEntity.notFound().build());
    }

    @PostMapping("")
    public Mono<Usr> creatUser(@Validated Usr usr){
        return usrRepository.save(usr);
    }

    public Mono updateUser(@PathVariable(value = "id") String id,@Validated Usr usr){
        return usrRepository.findById(id)
                .flatMap(
                        existingUser->{existingUser.setName(usr.getName());
                            return usrRepository.save(existingUser);
                        })
                .map(updateUser->new ResponseEntity<>(updateUser, HttpStatus.OK))
                .defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
    }

    @DeleteMapping("/{id}")
    public Mono<ResponseEntity<Void>> deleteUser(@PathVariable(value = "id") String id){
        return usrRepository.findById(id)
                .flatMap(existingUser->usrRepository.delete(existingUser)
                        .then(Mono.just(new ResponseEntity<Void>(HttpStatus.OK))))
                .defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
    }
}

其他
在执行启动的时候需要先创建两个log文件,创建方法是

touch mongo.log

也不知道这里为什么不能用 echo "" > mongo.log.

mongo的启动,depath数据路径,logpath日志 路径 ,--fork后台运行

mongod --dbpath /usr/local/mongodb --logpath /usr/local/mongodb/mongo.log --fork

启动后的查看

ps aux | grep mongo

mongoDB中的可视化数据


mongodb.png

特殊的请求方式
如果使用postman的常规方式数据请求,就会出现阻塞调用的情况:


阻塞到数据流结束或超时,一次返回.png

因为涉及到了响应式
数据流的请求方式,这里postman和APIFox不支持,直接使用命令行来进行网络请求

curl -i localhost:8080/user/listdelay

这里的演示效果如图:


每一秒返回一次,滚动返回数据,直到数据结束.gif

相关文章

网友评论

      本文标题:WebFlux使用笔记

      本文链接:https://www.haomeiwen.com/subject/ccfydrtx.html