美文网首页
RSocket协议初识-Springboot中使用(一)

RSocket协议初识-Springboot中使用(一)

作者: 后厂村老司机 | 来源:发表于2020-05-27 22:03 被阅读0次

    前言

    前几天无聊翻SpringBoot官方文档,无意中发现文档中增加了一个章节叫RSocket协议的鬼东西,遂研究了一下。

    RSocket是什么?

    RSocket是一种二进制字节流传输协议,位于OSI模型中的5~6层,底层可以依赖TCP、WebSocket、Aeron协议。

    RSocket设计目标是什么?

    1、支持对象传输,包括request\response、request\stream、fire and forget、channel
    2、支持应用层流量控制
    3、支持单连接双向、多次复用
    4、支持连接修复
    5、更好的使用WebSocket和Aeron协议

    RSocket与其他协议有什么区别?

    对比Http1.x

    • Http1.x只支持request\response,但是现实应用中并不是所有请求都需要有回应(Fire And Forget)、有的需求需要一个请求返回一个数据流(request\stream)、有的还需要双向数据传输(channel)。

    对比Http2.x

    • http2.x不支持应用层流量控制、伪双向传输,即服务端push数据本质上还是对客户端请求的响应,而不是直接推送。RSocket做到了真正的双向传输,使得服务端可以调用客户端服务,使得服务端和客户端在角色上完全对等,即两边同时是Requester和Responder。

    对比grpc

    对比TCP

    • 其实两者不在一个层面,为啥要作比较呢,因为netty让tcp层的编程也很容易,但是需要自定义传输协议,比如定义header、body长度等等,用起来还是很麻烦的。

    对比WebSocket

    • websocket不支持应用层流量控制,本质上也是一端请求另一端响应,不支持连接修复。

    RSocket协议的形式是什么?

    • 连接上传输的数据是流(Stream)
    • 流(Stream)由帧(Frame)组成
    • 帧(Frame)包含了元数据(MetaData)与业务数据(Data)

    结论:

    基于RSocket协议,我们的业务数据会被打包成帧,并以帧流的形式在客户端与服务端互相传输。所以RSocket的所有特性都是基于这个帧流实现的。后续有时间会针对每个帧类型做解析。

    RSocket适用于哪些场景?

    1、移动设备与服务器的连接。

    • 数据双向传输,且支持流量控制。支持背压,背压的意思:如果客户端请求服务端过快,那么服务端会堆积请求,最终耗光资源。有了背压服务端可以根据自己的资源来控制客户端的请求速度,即调用客户端告诉他别发那么快。
    • 支持连接修复,比如手机进地铁之后,网络断开一段时间,其他协议需要重新建立连接,RSocket则可以修复连接继续传输帧数据。

    2、微服务场景。

    • spring cloud目前支持的http协议,不能fire and forget、不能请求流数据、不能单连接双向调用;替换成RSocket之后可以满足以上需求的同时提高性能。且针对服务治理、负载均衡等RSocket都在慢慢完善。

    3、由于微服务和移动设备的普及,RSocket火起来应该就是这几年的事儿。

    BB了这么多你给我上个代码

    SpringBoot中的使用

    • step1、构建SpringBoot项目,引入依赖
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-rsocket</artifactId>
            </dependency>
    
    • step2、编写需要传输的消息类和服务器类
    
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    
    import java.time.Instant;
    
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public class Message {
        private String from;
        private String to;
        private long index;
        private long created = Instant.now().getEpochSecond();
    
        public Message(String from, String to) {
            this.from = from;
            this.to = to;
            this.index = 0;
        }
    
        public Message(String from, String to, long index) {
            this.from = from;
            this.to = to;
            this.index = index;
        }
    }
    
    import java.time.Duration;
    import java.util.ArrayList;
    import java.util.List;
    import org.springframework.messaging.handler.annotation.MessageMapping;
    import org.springframework.messaging.handler.annotation.Payload;
    import org.springframework.messaging.rsocket.RSocketRequester;
    import org.springframework.messaging.rsocket.annotation.ConnectMapping;
    import org.springframework.stereotype.Controller;
    import lombok.extern.slf4j.Slf4j;
    import reactor.core.publisher.Flux;
    
    @Slf4j
    @Controller
    public class RSocketController {
    
        private final List<RSocketRequester> CLIENTS = new ArrayList<>();
    
        @MessageMapping("request-response")
        public Message requestResponse(Message request) {
            log.info("收到请求: {}", request);
            return new Message("服务端", "客户端");
        }
    
        @MessageMapping("fire-and-forget")
        public void fireAndForget(Message request) {
            log.info("收到fire-and-forget请求: {}", request);
        }
    
        @MessageMapping("stream")
        Flux<Message> stream(Message request) {
            log.info("收到流式请求: {}", request);
            return Flux
                    .interval(Duration.ofSeconds(1))
                    .map(index -> new Message(”服务端“, "客户端", index))
                    .log();
        }
    
        @MessageMapping("channel")
        Flux<Message> channel(final Flux<Duration> settings) {
            return settings
                    .doOnNext(setting -> log.info("发射间隔为 {} 秒.", setting.getSeconds()))
                    .switchMap(setting -> Flux.interval(setting)
                            .map(index -> new Message("服务端", "客户端", index)))
                    .log();
        }
    }
    
    • step3、配置文件里增加配置项
    spring.main.lazy-initialization=true
    spring.rsocket.server.port=7000
    
    • step4、编写客户端代码
    
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    
    import java.time.Instant;
    
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public class Message {
        private String from;
        private String to;
        private long index;
        private long created = Instant.now().getEpochSecond();
    
        public Message(String from, String to) {
            this.from = from;
            this.to = to;
            this.index = 0;
        }
    
        public Message(String from, String to, long index) {
            this.from = from;
            this.to = to;
            this.index = index;
        }
    }
    
    import java.time.Duration;
    import javax.annotation.PreDestroy;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.messaging.rsocket.RSocketRequester;
    import org.springframework.messaging.rsocket.RSocketStrategies;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    import lombok.extern.slf4j.Slf4j;
    import reactor.core.Disposable;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Mono;
    
    @Slf4j
    @RestController
    public class RSocketClient {
    
        private final RSocketRequester rsocketRequester;
        private static Disposable disposable;
    
        @Autowired
        public RSocketClient(RSocketRequester.Builder rsocketRequesterBuilder, RSocketStrategies strategies) {
            this.rsocketRequester = rsocketRequesterBuilder
                    .rsocketStrategies(strategies)
                    .connectTcp("localhost", 7000)
                    .block();
    
            this.rsocketRequester.rsocket()
                    .onClose()
                    .doOnError(error -> log.warn("发生错误,链接关闭"))
                    .doFinally(consumer -> log.info("链接关闭"))
                    .subscribe();
        }
    
        @PreDestroy
        void shutdown() {
            rsocketRequester.rsocket().dispose();
        }
    
        @GetMapping("request-response")
        public Message requestResponse() {
            Message message = this.rsocketRequester
                    .route("request-response")
                    .data(new Message("客户端", "服务器"))
                    .retrieveMono(Message.class)
                    .block();
            log.info("客户端request-response收到响应 {}", message);
            return message;
        }
    
        @GetMapping("fire-and-forget")
        public String fireAndForget() {
            this.rsocketRequester
                    .route("fire-and-forget")
                    .data(new Message("客户端", "服务器"))
                    .send()
                    .block();
            return "fire and forget";
        }
    
        @GetMapping("stream")
        public String stream() {
            disposable = this.rsocketRequester
                    .route("stream")
                    .data(new Message("客户端", "服务器"))
                    .retrieveFlux(Message.class)
                    .subscribe(message -> log.info("客户端stream收到响应 {}", message));
            return "stream";
        }
    
        @GetMapping("channel")
        public String channel() {
            Mono<Duration> setting1 = Mono.just(Duration.ofSeconds(1));
            Mono<Duration> setting2 = Mono.just(Duration.ofSeconds(3)).delayElement(Duration.ofSeconds(5));
            Mono<Duration> setting3 = Mono.just(Duration.ofSeconds(5)).delayElement(Duration.ofSeconds(15));
            Flux<Duration> settings = Flux.concat(setting1, setting2, setting3)
                    .doOnNext(d -> log.info("客户端channel发送消息 {}", d.getSeconds()));
            disposable = this.rsocketRequester
                    .route("channel")
                    .data(settings)
                    .retrieveFlux(Message.class)
                    .subscribe(message -> log.info("客户端channel收到响应 {}", message));
            return "channel";
        }
    
    }
    
    • step5、启动服务端、启动客户端,打开浏览器访问localhost:8080/fire-and-forget等测试效果

    代码解析

    • @MessageMapping:Spring提供的注解,用于路由,与@GetMapping等功能类似
    • Mono:响应式编程里用于返回0-1个结果
    • Flux:响应式编程里用于返回0-N个结果
    • Disposable:断流器,为true的时候两边不能传输数据

    What Next?

    • 协议原理解析
    • 由于RSocket社区还不够活跃,Git上的代码也是刚刚起步,还在不断更新中,相关功能也在不断完善中,后续随着官方新内容的更新我也会跟着更新。
    • RSocket中很多概念如Mono、Flux、Disposable、背压、流式处理等都是响应式编程中的概念,想了解响应式编程可以查看:http://reactivex.io/ 中的文档,其中包括了RXJava等RX系列的各种语言的Demo。

    相关文章

      网友评论

          本文标题:RSocket协议初识-Springboot中使用(一)

          本文链接:https://www.haomeiwen.com/subject/vqugahtx.html