美文网首页技术分享
SpringBoot整合WebFlux实现SSE事件

SpringBoot整合WebFlux实现SSE事件

作者: 程就人生 | 来源:发表于2020-05-08 22:09 被阅读0次

    前言

    在前台页面需要不停获取服务器端的数据时,无非有两种操作,一种是通过前台页面使用轮询的方式,定时向服务器后台发送请求,以获取最新的数据;另一种就是在前台页面和后台服务之间建立长连接,服务器端一有数据产生就向前端页面推送。

    这里的SSE是服务器发送事件(Server-Sent Events) 的缩写,在WebFlux框架里,服务器端是如何向前端(或调用端)实现服务器发送事件的呢?在有前端页面的情况下,又是如何实现的呢?

    带着上面的这些疑问,来了解WebFlux框架,WebFlux框架是一款响应式编程web框架,什么是响应式编程呢,根据wikipedia上的定义:

    响应式编程是就是对于数据流和传播改变的一种声明式的编程规范。这意味着可以通过编程语言轻松地表达静态(例如数组)或动态(例如事件发射器)数据流,并且存在相关执行模型内的推断依赖性,这有助于自动传播数据流涉及的变化。

    围绕着WebFlux框架的,有这么几个关键字,异步的、非阻塞的、响应式的,那么是不是能够实现数据一有变化,就通知到对应的调用端呢,这些还有待证实。

    基于WebFlux框架的SSE应用

    首先,在pom文件中,引入webflux框架;

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-thymeleaf</artifactId>
    </dependency>
    

    第二,html代码,共有四个页面;
    sse.html页面代码:

    <!DOCTYPE html>
    <html xmlns:th="http://www.thymeleaf.org">
    <head>
        <meta charset="UTF-8"/>
        <title>服务器推送事件</title>
    </head>
    <body>
    <div>    
        <div id="data"></div>    
        <div id="result"></div><br/>
    </div>
    <script th:inline="javascript" >
    //服务器推送事件
    if (typeof (EventSource) !== "undefined") { 
        //第一种写法
        //接收服务器倒计时时间推送,使用HTML5 服务器发送事件(Server-Sent Events),参考资料:https://www.runoob.com/html/html5-serversentevents.html
        var source = new EventSource("/sse/countDown");
        console.log(source);
        
        source.addEventListener("countDown", function(e) {
            document.getElementById("result").innerHTML = e.data;
        }, false);//使用false表示在冒泡阶段处理事件,而不是捕获阶段。
        
        //第二种写法
        //随机数获取
        var source1 = new EventSource("/sse/retrieve");
        //当抓取到消息时
        source1.onmessage = function (evt) {
            document.getElementById("data").innerHTML = "股票行情:" + evt.data;
        };
    } else {
        //注意:ie浏览器不支持
        document.getElementById("result").innerHTML = "抱歉,你的浏览器不支持 server-sent 事件...";  
        var xhr;
        var xhr2;
        if (window.XMLHttpRequest){
            //IE7+, Firefox, Chrome, Opera, Safari浏览器支持该方法
            xhr=new XMLHttpRequest();
            xhr2=new XMLHttpRequest();
        }else{
            //IE6, IE5 浏览器不支持,使用ActiveXObject方法代替
            xhr=new ActiveXObject("Microsoft.XMLHTTP");
            xhr2=new ActiveXObject("Microsoft.XMLHTTP");
        }
        console.log(xhr);
        console.log(xhr2);
        xhr.open('GET', '/sse/countDown');
        xhr.send(null);//发送请求
        xhr.onreadystatechange = function() {
            console.log("s响应状态:" + xhr.readyState);
            //2是空响应,3是响应一部分,4是响应完成
            if (xhr.readyState > 2) {
                //这儿可以使用response(对应json)与responseText(对应text)
                var newData = xhr.response.substr(xhr.seenBytes);
                newData = newData.replace(/\n/g, "#");
                newData = newData.substring(0, newData.length - 1);
                var data = newData.split("#");
                console.log("获取到的数据:" + data);
                document.getElementById("result").innerHTML = data;
                //长度重新赋值,下次截取时需要使用
                xhr.seenBytes = xhr.response.length;
            }
        }
            
        xhr2.open('GET', '/sse/retrieve');
        xhr2.send(null);//发送请求
        xhr2.onreadystatechange = function() {
            console.log("s响应状态:" + xhr2.readyState);
            //0: 请求未初始化,2 请求已接收,3 请求处理中,4  请求已完成,且响应已就绪
            if (xhr2.readyState > 2) {
                //这儿可以使用response(对应json)与responseText(对应text)
                var newData1 = xhr2.response.substr(xhr2.seenBytes);
                newData1 = newData1.replace(/\n/g, "#");
                newData1 = newData1.substring(0, newData1.length - 1);
                var data1 = newData1.split("#");
                console.log("获取到的数据:" + data1);
                document.getElementById("data").innerHTML = data1;
                //长度重新赋值,下次截取时需要使用
                xhr2.seenBytes = xhr2.response.length;
            }
        }
    }
    </script>
    </body>
    </html>
    

    sse2.html页面代码:

    <!DOCTYPE html>
    <html xmlns:th="http://www.thymeleaf.org">
    <head>
        <meta charset="UTF-8"/>
        <title>服务器推送</title>
    </head>
    <body>
    <div>
        <div id="dataModule"></div><br/>
        <div id="note" style="width: 100%;" ></div>
    </div>
    <script src="https://cdn.staticfile.org/jquery/1.10.2/jquery.min.js" ></script>
    <script th:inline="javascript" >
    $(function() {
        var time=1;
        var xhr;
        if (window.XMLHttpRequest){
            //IE7+, Firefox, Chrome, Opera, Safari浏览器支持该方法
            xhr=new XMLHttpRequest();
        }else{
            //IE6, IE5 浏览器不支持,使用ActiveXObject方法代替
            xhr=new ActiveXObject("Microsoft.XMLHTTP");
        }
        console.log(xhr);
        xhr.open('GET', '/quotes');
        xhr.send(null);//发送请求
        xhr.onreadystatechange = function() {
            console.log("s响应状态:" + xhr.readyState);
            //2是空响应,3是响应一部分,4是响应完成
            if (xhr.readyState > 2) {
                //这儿可以使用response(对应json)与responseText(对应text)
                var newData = xhr.response.substr(xhr.seenBytes);
                newData = newData.replace(/\n/g, "#");
                newData = newData.substring(0, newData.length - 1);
                var data = newData.split("#");
                //显示加载次数,和大小
                $("#dataModule").append("第"+time+"次数据响应"+data.length+"条<br/>");
                
                $("#note").append("<div style='clear: both;'>第"+time+"次数据响应"+data.length+"条</div><div id='note"+time+"' style='width: 100%;'></div>");
                var html="";
                console.log("数据:" + data);          
                for(var i=0;i<data.length;i++) {
                     var obj = JSON.parse(data[i]);
                     html=html + "<div style='margin-left: 10px;margin-top: 10px; width: 80px;height: 80px;background-color: gray;float: left;'>"+obj.ticker+"</div>";
                }           
                $("#note"+time).html(html);
                time++;
                //长度重新赋值,下次截取时需要使用
                xhr.seenBytes = xhr.response.length;
            }
        }
    })
    </script>
    </body>
    </html>
    

    sse3.html页面代码:

    <!DOCTYPE html>
    <html xmlns:th="http://www.thymeleaf.org">
    <head>
        <meta charset="UTF-8"/>
        <title>服务器推送</title>
    </head>
    <body>
    <div>
        <div id="dataModule"></div><br/>
        <div id="note" style="width: 100%;" ></div>
    </div>
    <script src="https://cdn.staticfile.org/jquery/1.10.2/jquery.min.js" ></script>
    <script th:inline="javascript" >
    $(function() {
        var time=1;
        var xhr;
        if (window.XMLHttpRequest){
            //IE7+, Firefox, Chrome, Opera, Safari浏览器支持该方法
            xhr=new XMLHttpRequest();
        }else{
            //IE6, IE5 浏览器不支持,使用ActiveXObject方法代替
            xhr=new ActiveXObject("Microsoft.XMLHTTP");
        }
        console.log(xhr);
        xhr.open('POST', '/quotes');
        xhr.send(null);//发送请求
        xhr.onreadystatechange = function() {
            console.log("s响应状态:" + xhr.readyState);
            //2是空响应,3是响应一部分,4是响应完成
            if (xhr.readyState > 2) {
                //这儿可以使用response(对应json)与responseText(对应text)
                var newData = xhr.response.substr(xhr.seenBytes);
                newData = newData.replace(/\n/g, "#");
                newData = newData.substring(0, newData.length);
                console.log("数据:" + newData);
                if(newData){
                    //将字符串类型的json转成json对象
                    var data = JSON.parse(newData.split("#"));
                    //显示加载次数,和大小
                    $("#dataModule").append("第"+time+"次数据响应"+data.length+"条<br/>");
                    
                    $("#note").append("<div style='clear: both;'>第"+time+"次数据响应"+data.length+"条</div><div id='note"+time+"' style='width: 100%;'></div>");
                    var html="";
                    console.log("数据:" + data);
                    for(var i=0;i<data.length;i++) {
                         var obj = data[i];
                         html=html + "<div style='margin-left: 10px;margin-top: 10px; width: 80px;height: 80px;background-color: gray;float: left;'>"+obj.ticker+"</div>";
                    }           
                    $("#note"+time).html(html);
                    time++;
                    //长度重新赋值,下次截取时需要使用
                    xhr.seenBytes = xhr.response.length;
                }else{
                    $("#dataModule").append("响应完成!!!");
                }
            }
        }
    })
    </script>
    </body>
    </html>
    

    sse4.html页面代码:

    <!DOCTYPE html>
    <html xmlns:th="http://www.thymeleaf.org">
    <head>
        <meta charset="UTF-8"/>
        <title>服务器推送</title>
    </head>
    <body>
    <div>
        <div id="dataModule"></div><br/>
        <div id="note" style="width: 100%;" ></div>
    </div>
    <script src="https://cdn.staticfile.org/jquery/1.10.2/jquery.min.js" ></script>
    <script th:inline="javascript" >
    $(function() {
        var time=1;
        var xhr;
        if (window.XMLHttpRequest){
            //IE7+, Firefox, Chrome, Opera, Safari浏览器支持该方法
            xhr=new XMLHttpRequest();
        }else{
            //IE6, IE5 浏览器不支持,使用ActiveXObject方法代替
            xhr=new ActiveXObject("Microsoft.XMLHTTP");
        }
        console.log(xhr);
        xhr.open('POST', '/echo1');
        xhr.send(null);//发送请求
        xhr.onreadystatechange = function() {
            console.log("s响应状态:" + xhr.readyState);
            //2是空响应,3是响应一部分,4是响应完成
            if (xhr.readyState > 2) {
                //这儿可以使用response(对应json)与responseText(对应text)
                var newData = xhr.response.substr(xhr.seenBytes);
                newData = newData.replace(/\n/g, "#");
                newData = newData.substring(0, newData.length - 1);
                var data = newData.split("#");
                console.log("获取到的数据:" + data);
                document.getElementById("dataModule").innerHTML = data;
            }
        }
    })
    </script>
    </body>
    </html>
    

    注意:在前端页面,接收服务器的推送请求,需要html5的SSE支持,除了IE外,其他的浏览器都支持;

    第三,后台代码;

    import java.math.BigDecimal;
    import java.math.MathContext;
    import java.time.Instant;
    
    /**
     * 需要推送的实体类
     * @author 程就人生
     * @Date
     */
    public class Quote { 
    
        private static final MathContext MATH_CONTEXT = new MathContext(2); 
    
        private String ticker; 
    
        private BigDecimal price; 
    
        private Instant instant; 
    
        public Quote() {
    
        } 
    
        public Quote(String ticker, BigDecimal price) {
    
            this.ticker = ticker;
    
            this.price = price;
    
        } 
    
        public Quote(String ticker, Double price) {
    
            this(ticker, new BigDecimal(price, MATH_CONTEXT));
    
        }
    
        @Override
        public String toString() {
    
            return "Quote{" +
    
                    "ticker='" + ticker + '\'' +
    
                    ", price=" + price +
    
                    ", instant=" + instant +
    
                    '}';
    
        }
    
        public final String getTicker() {
            return ticker;
        }
    
        public final void setTicker(String ticker) {
            this.ticker = ticker;
        }
    
        public final BigDecimal getPrice() {
            return price;
        }
    
        public final void setPrice(BigDecimal price) {
            this.price = price;
        }
    
        public final Instant getInstant() {
            return instant;
        }
        
        public final void setInstant(Instant instant) {
            this.instant = instant;
        }
    }
    
    import java.math.BigDecimal;
    import java.math.MathContext;
    import java.time.Duration;
    import java.time.Instant;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Random;
    import java.util.stream.Collectors;
    
    import org.springframework.stereotype.Component;
    
    import com.example.entity.Quote;
    
    import reactor.core.publisher.Flux;
    
    /**
     * 推送数据,模拟生成
     * @author 程就人生
     * @Date
     */
    @Component
    public class QuoteGenerator { 
    
        private final MathContext mathContext = new MathContext(2); 
    
        private final Random random = new Random(); 
    
        private final List<Quote> prices = new ArrayList<>(); 
    
        /**
         * 生成行情数据
         */
        public QuoteGenerator() {
    
            this.prices.add(new Quote("CTXS", 82.26));
    
            this.prices.add(new Quote("DELL", 63.74));
    
            this.prices.add(new Quote("GOOG", 847.24));
    
            this.prices.add(new Quote("MSFT", 65.11));
    
            this.prices.add(new Quote("ORCL", 45.71));
    
            this.prices.add(new Quote("RHT", 84.29));
    
            this.prices.add(new Quote("VMW", 92.21));
    
        }
    
        public Flux<Quote> fetchQuoteStream(Duration period) { 
    
            // 需要周期生成值并返回,使用 Flux.interval
            return Flux.interval(period)
    
                    // In case of back-pressure, drop events
                    .onBackpressureDrop()
    
                    // For each tick, generate a list of quotes
                    .map(this::generateQuotes)
    
                    // "flatten" that List<Quote> into a Flux<Quote>
                    .flatMapIterable(quotes -> quotes)
    
                    .log("io.spring.workshop.stockquotes");//以日志的形式输出
    
        }
    
        /**
    
         * Create quotes for all tickers at a single instant.
    
         */
        private List<Quote> generateQuotes(long interval) {
    
            final Instant instant = Instant.now();
    
            return prices.stream()
    
                    .map(baseQuote -> {
    
                        BigDecimal priceChange = baseQuote.getPrice()
    
                                .multiply(new BigDecimal(0.05 * this.random.nextDouble()), this.mathContext);
    
                        Quote result = new Quote(baseQuote.getTicker(), baseQuote.getPrice().add(priceChange));
    
                        result.setInstant(instant);
    
                        return result;
    
                    })
    
                    .collect(Collectors.toList());
    
        }
    
    }
    
    import java.time.Duration;
    
    import org.springframework.http.MediaType;
    import org.springframework.stereotype.Component;
    import org.springframework.web.reactive.function.BodyInserters;
    import org.springframework.web.reactive.function.server.ServerRequest;
    import org.springframework.web.reactive.function.server.ServerResponse;
    
    import com.example.entity.Quote;
    import com.example.generator.QuoteGenerator;
    
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Mono;
    
    /**
     * 数据处理handler,相当于service层
     * @author 程就人生
     * @Date
     */
    @Component
    public class QuoteHandler { 
    
        private final Flux<Quote> quoteStream; 
    
        public QuoteHandler(QuoteGenerator quoteGenerator) {
    
            this.quoteStream = quoteGenerator.fetchQuoteStream(Duration.ofMillis(1000 * 10)).share();
    
        } 
    
        public Mono<ServerResponse> hello(ServerRequest request) {
            Long start = System.currentTimeMillis();
            return ServerResponse.ok().contentType(MediaType.TEXT_PLAIN)
    
                    .body(BodyInserters.fromObject("Hello Spring!" + start));
    
        } 
    
        public Mono<ServerResponse> echo(ServerRequest request) {
    
            return ServerResponse.ok().contentType(MediaType.TEXT_PLAIN)
    
                    .body(request.bodyToMono(String.class), String.class);
    
        } 
    
        public Mono<ServerResponse> streamQuotes(ServerRequest request) {
            
            Long start = System.currentTimeMillis();
            
            System.out.println("--------------" + start + "--------------");
    
            return ServerResponse.ok()
    
                    .contentType(MediaType.APPLICATION_STREAM_JSON) //返回多次
    
                    .body(this.quoteStream, Quote.class);
    
        } 
    
        public Mono<ServerResponse> fetchQuotes(ServerRequest request) {
    
            int size = Integer.parseInt(request.queryParam("size").orElse("10"));
    
            return ServerResponse.ok()
    
                    .contentType(MediaType.APPLICATION_JSON)         //返回一次
    
                    .body(this.quoteStream.take(size), Quote.class);
    
        }
    }
    
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.http.MediaType;
    import org.springframework.web.reactive.function.server.RequestPredicates;
    import org.springframework.web.reactive.function.server.RouterFunction;
    import org.springframework.web.reactive.function.server.RouterFunctions;
    import org.springframework.web.reactive.function.server.ServerResponse;
    
    import com.example.handler.QuoteHandler;
    
    /**
     * 路由,相当于Controller层
     * @author 程就人生
     * @Date
     */
    @Configuration
    public class QuoteRouter { 
    
       @Bean
       public RouterFunction<ServerResponse> route(QuoteHandler quoteHandler) {
    
          return RouterFunctions
    
                .route(RequestPredicates.GET("/hello1").and(RequestPredicates.accept(MediaType.TEXT_PLAIN)), quoteHandler::hello)
                
                .andRoute(RequestPredicates.POST("/echo1").and(RequestPredicates.accept(MediaType.TEXT_PLAIN).and(RequestPredicates.contentType(MediaType.TEXT_PLAIN))), quoteHandler::echo)
                //响应一次
                .andRoute(RequestPredicates.POST("/quotes").and(RequestPredicates.accept(MediaType.APPLICATION_JSON)), quoteHandler::fetchQuotes)
                //响应多次
                .andRoute(RequestPredicates.GET("/quotes").and(RequestPredicates.accept(MediaType.APPLICATION_STREAM_JSON)), quoteHandler::streamQuotes);
    
       }
    }
    
    import java.time.Duration;
    
    import org.springframework.http.MediaType;
    import org.springframework.http.codec.ServerSentEvent;
    import org.springframework.stereotype.Controller;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.ResponseBody;
    
    import reactor.core.publisher.Flux;
    import reactor.util.function.Tuples;
    
    /**
     * 服务器发送事件SSE(Server-Sent Events)
     * 页面渲染及请求
     * @author 程就人生
     * @Date
     */
    @Controller
    @RequestMapping("/sse")
    public class SseController {
    
        //三分钟倒计时
        private int count_down_sec=3*60*60;
        
        /**
         * 推送页面1
         * @return
         */
        @GetMapping
        public String sse(){
            return "sse";
        }
        
        /**
         * 推送页面2
         * @return
         */
        @GetMapping("/two")
        public String sse2(){
            return "sse2";
        }
        
        /**
         * 推送页面3
         * @return
         */
        @GetMapping("/three")
        public String sse3(){
            return "sse3";
        }
        
        /**
         * 推送页面4
         * @return
         */
        @GetMapping("/four")
        public String sse4(){
            return "sse4";
        }
    
        //报头设置为 "text/event-stream",以便于发送事件流
        @GetMapping(value="/countDown",produces = MediaType.TEXT_EVENT_STREAM_VALUE)
        @ResponseBody
        public Flux<ServerSentEvent<Object>> countDown() {
            //每一秒钟推送一次
            return Flux.interval(Duration.ofSeconds(1))
                .map(seq -> Tuples.of(seq, getCountDownSec()))
                .map(data -> ServerSentEvent.<Object>builder()
                        .event("countDown") //和前端addEventListener监听的事件一一对应
                        .id(Long.toString(data.getT1()))  //为每次发送设置一个id
                        .data(data.getT2().toString())
                        .build());
        }
        
        private String getCountDownSec() {
            if (count_down_sec>0) {
                int h = count_down_sec/(60*60);
                int m = (count_down_sec%(60*60))/60;
                int s = (count_down_sec%(60*60))%60;
                count_down_sec--;
                return "活动倒计时:"+h+" 小时 "+m+" 分钟 "+s+" 秒";
            }
            return "活动倒计时:0 小时 0 分钟 0 秒";
        }
        
        //报头设置为 "text/event-stream",以便于发送事件流,这种写法等同于MediaType.TEXT_EVENT_STREAM_VALUE "text/event-stream;charset=UTF-8"
        @GetMapping(value = "/retrieve",produces = MediaType.TEXT_EVENT_STREAM_VALUE)   
        @ResponseBody
        public double retrieve() {
            try {   
                //每0.5秒刷新数据 
                Thread.sleep(500);  
            } catch (InterruptedException e) {  
                e.printStackTrace();    
            }   
            //模拟股票实时变动数据    
            return Math.ceil(Math.random() * 10000);    
        }
    }
    

    最后,测试运行结果;

    总结
    虽然参考了很多资料,对于响应式编程还是很陌生,写个demo后,依旧没有感受到它的精华,基于WebFlux框架实现SSE事件,不难看出来还是基于长连接的,在实际场景中,基于长连接的推送事件是否适用,还值得再思考。

    参考资料:
    https://docs.spring.io/spring-framework/docs/5.0.3.RELEASE/spring-framework-reference/web-reactive.html#webflux-dispatcher-handler
    https://blog.csdn.net/wshl1234567/article/details/80320116
    https://blog.csdn.net/Message_lx/article/details/81075766
    https://www.cnblogs.com/Alandre/category/957422.html
    https://segmentfault.com/a/1190000020686218?utm_source=tag-newest
    https://my.oschina.net/bianxin/blog/3063713
    html5服务器发送事件
    https://www.runoob.com/html/html5-serversentevents.html
    https://www.xttblog.com/spring-webflux.html

    相关文章

      网友评论

        本文标题:SpringBoot整合WebFlux实现SSE事件

        本文链接:https://www.haomeiwen.com/subject/rgbqnhtx.html