美文网首页
WebFlux使用笔记

WebFlux使用笔记

作者: 数字d | 来源:发表于2022-03-16 14:34 被阅读0次

    WebFlux是响应式Web框架,可以在资源有限的情况下提高系统的吞吐量和伸缩性,不是提高性能,在资源相同的情况下,webFlux可以处理更多的请求。
    Spring MVC 采用命令式编程的方式,代码被一句一句执行,便于开发者调试。WebFlux是基于异步响应式编程。
    MVC的工作流程:主线程接收到请求->准备数据->返回数据。整个过程是单线程阻塞的,用户会感觉等待时间很长是因为,在处理好之后才返回数据给浏览器。因此,如果请求有很多,则吞吐量上不去。
    WebFlux工作流程是:主线程接收到请求->立刻返回数据与函数的组合(Mono或者Flux,不是结果)->开启一个新的Work线程去做实际的数据准备工作,进行真正的业务操作->Work线程完成工作->返回用户真实数据。
    Mono和Flux是Reactor的两个基本概念。
    Mono和Flux属于事件发布者,为消费者提供订阅接口。当有事件发生的时候,Mono或Flux会回调消费者的相应方法,然后通知消费者相应的事件。这也是响应式编程的模型。
    Mono和Flux用于处理数据异步流,它不像MVC 中那样直接返回String/List,而是将异步数据流包装成Mono或者Flux对象。


    开发MVC下WebFlux的流程

    1.配置依赖pom.xml

        <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-webflux</artifactId>
            </dependency>
    

    2.编写控制器

    @RestController
    public class HelloWorldCtrl {
        @RequestMapping("/hellw")
        public Mono<String> helloworld(){
            return Mono.just("hello this mono world");
        }
    }
    

    3.用注解式开发实现数据的增删改查
    类的实现

    package com.example.demo;
    
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public class User {
        private long id;
        private String name;
        private int age;
    }
    

    控制器的实现

    
    import org.springframework.http.HttpStatus;
    import org.springframework.http.ResponseEntity;
    import org.springframework.web.bind.annotation.*;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Mono;
    
    import javax.annotation.PostConstruct;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.stream.Collectors;
    
    @RestController
    @RequestMapping("/user")
    public class UserController {
        Map<Long,User> users = new HashMap<>();
    
        @PostConstruct
        public void init() throws Exception{
            System.out.println("i am in new array ");
            users.put(Long.valueOf(1),new User(1,"longzhonghua",2));
            users.put(Long.valueOf(2),new User(2,"longzhiran",2));
        }
    
        /**
         * 获取所有用户
         * */
        @GetMapping("/list1")
        public Flux<User> getAll(){
            return Flux.fromIterable(users.entrySet().stream()
                    .map(entry->entry.getValue())
                    .collect(Collectors.toList()));
        }
        /**
         * 获取单个用户
        **/
        @RequestMapping("/{id}")
        public Mono<User> getUser(@PathVariable Long id){
            return Mono.justOrEmpty(users.get(id));
        }
    
        /**
         * 创建用户
         * **/
        @PostMapping("")
        public Mono<ResponseEntity<String>>  addUser(User user){
            users.put(user.getId(),user);
            return Mono.just(new ResponseEntity<>("添加成功", HttpStatus.CREATED));
        }
    
        /**
         * 修改用户
         * **/
        @PutMapping("/{id}")
        public Mono<ResponseEntity<User>> putUser(@PathVariable Long id,User user){
            System.out.println("i am in put mapping");
            user.setId(id);
            users.put(id,user);
            return Mono.just(new ResponseEntity<>(user,HttpStatus.CREATED));
        }
    
        /**
         * 删除用户
         * **/
        @DeleteMapping("/{id}")
        public Mono<ResponseEntity<String>> deleteUser(@PathVariable Long id){
            users.remove(id);
            return Mono.just(new ResponseEntity<>("删除成功",HttpStatus.ACCEPTED));
        }
    }
    
    

    测试API的功能,获取数据,修改数据,修改数据的方法是通过PUT方法访问/user/1,提交相应的name和age字段来修改内容
    删除数据,可以通过DELETE方式访问/user/1,然后删除其中的内容

    响应式开发方式开发WebFlux

    处理器类Handler

    package com.example.demo;
    
    import org.springframework.http.MediaType;
    import org.springframework.stereotype.Component;
    import org.springframework.web.reactive.function.server.ServerRequest;
    import org.springframework.web.reactive.function.server.ServerResponse;
    import reactor.core.publisher.Mono;
    
    @Component
    public class HelloWorldHandler {
        public Mono<ServerResponse> sayHelloWord(ServerRequest  serverRequest){
            return ServerResponse.ok().contentType(MediaType.TEXT_PLAIN).body(Mono.just("this is webFlus demo"),String.class);
        }
    }
    
    

    路由器类Router

    package com.example.demo;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.stereotype.Component;
    import org.springframework.web.reactive.function.server.RouterFunction;
    import org.springframework.web.reactive.function.server.ServerResponse;
    
    import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
    import static org.springframework.web.reactive.function.server.RouterFunctions.route;
    
    @Component
    public class Router {
        @Autowired
        private HelloWorldHandler  helloWorldHandler;
        @Bean
        public RouterFunction<ServerResponse> getString(){
            return route(GET("/helloworld"),req->helloWorldHandler.sayHelloWord(req));
        }
    }
    
    

    使用WebFlux运行时候出现端口占用查看和kill

    lsof -i tcp:8080
    kill 842
    

    WebFlux模式操作MongoDB数据库,实现数据的增删改查

    添加依赖

    <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>
    <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
    </dependency>
    

    配置文件中Application.properities配置MongoDB的地址信息

    spring.data.mongodb.uri=mongodb://127.0.0.1:27017/test
    

    创建实体类Usr.class

    package com.example.demo;
    
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    import org.springframework.data.annotation.Id;
    
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public class Usr {
        @Id
        private String id;
        private String name;
        private int age;
    }
    

    接口实现

    package com.example.demo;
    
    import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
    
    public interface UsrRepository extends ReactiveMongoRepository<Usr,String> {
    }
    
    

    增删改查的API实现

    package com.example.demo;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.http.HttpStatus;
    import org.springframework.http.MediaType;
    import org.springframework.http.ResponseEntity;
    import org.springframework.validation.annotation.Validated;
    import org.springframework.web.bind.annotation.*;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Mono;
    
    import java.time.Duration;
    
    @RestController
    @RequestMapping(path = "/user")
    public class UserCtl {
        @Autowired
        private UsrRepository usrRepository;
    
        @GetMapping(value = "/list")
        public Flux<Usr> getAll(){
            return usrRepository.findAll();
        }
    
        @GetMapping(value = "/listdelay",produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
        public Flux<Usr> getAlldelay(){
            return usrRepository.findAll().delayElements(Duration.ofSeconds(1));
        }
    
        @GetMapping("/{id}")
        public Mono<ResponseEntity<Usr>> getUsr(@PathVariable String id){
            return usrRepository.findById(id)
                    .map(getUsr->ResponseEntity.ok(getUsr))
                    .defaultIfEmpty(ResponseEntity.notFound().build());
        }
    
        @PostMapping("")
        public Mono<Usr> creatUser(@Validated Usr usr){
            return usrRepository.save(usr);
        }
    
        public Mono updateUser(@PathVariable(value = "id") String id,@Validated Usr usr){
            return usrRepository.findById(id)
                    .flatMap(
                            existingUser->{existingUser.setName(usr.getName());
                                return usrRepository.save(existingUser);
                            })
                    .map(updateUser->new ResponseEntity<>(updateUser, HttpStatus.OK))
                    .defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
        }
    
        @DeleteMapping("/{id}")
        public Mono<ResponseEntity<Void>> deleteUser(@PathVariable(value = "id") String id){
            return usrRepository.findById(id)
                    .flatMap(existingUser->usrRepository.delete(existingUser)
                            .then(Mono.just(new ResponseEntity<Void>(HttpStatus.OK))))
                    .defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
        }
    }
    
    

    其他
    在执行启动的时候需要先创建两个log文件,创建方法是

    touch mongo.log
    

    也不知道这里为什么不能用 echo "" > mongo.log.

    mongo的启动,depath数据路径,logpath日志 路径 ,--fork后台运行

    mongod --dbpath /usr/local/mongodb --logpath /usr/local/mongodb/mongo.log --fork
    

    启动后的查看

    ps aux | grep mongo
    
    

    mongoDB中的可视化数据


    mongodb.png

    特殊的请求方式
    如果使用postman的常规方式数据请求,就会出现阻塞调用的情况:


    阻塞到数据流结束或超时,一次返回.png

    因为涉及到了响应式
    数据流的请求方式,这里postman和APIFox不支持,直接使用命令行来进行网络请求

    curl -i localhost:8080/user/listdelay
    

    这里的演示效果如图:


    每一秒返回一次,滚动返回数据,直到数据结束.gif

    相关文章

      网友评论

          本文标题:WebFlux使用笔记

          本文链接:https://www.haomeiwen.com/subject/ccfydrtx.html