前言
在前台页面需要不停获取服务器端的数据时,无非有两种操作,一种是通过前台页面使用轮询的方式,定时向服务器后台发送请求,以获取最新的数据;另一种就是在前台页面和后台服务之间建立长连接,服务器端一有数据产生就向前端页面推送。
这里的SSE是服务器发送事件(Server-Sent Events) 的缩写,在WebFlux框架里,服务器端是如何向前端(或调用端)实现服务器发送事件的呢?在有前端页面的情况下,又是如何实现的呢?
带着上面的这些疑问,来了解WebFlux框架,WebFlux框架是一款响应式编程web框架,什么是响应式编程呢,根据wikipedia上的定义:
响应式编程是就是对于数据流和传播改变的一种声明式的编程规范。这意味着可以通过编程语言轻松地表达静态(例如数组)或动态(例如事件发射器)数据流,并且存在相关执行模型内的推断依赖性,这有助于自动传播数据流涉及的变化。
围绕着WebFlux框架的,有这么几个关键字,异步的、非阻塞的、响应式的,那么是不是能够实现数据一有变化,就通知到对应的调用端呢,这些还有待证实。
基于WebFlux框架的SSE应用
首先,在pom文件中,引入webflux框架;
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>
第二,html代码,共有四个页面;
sse.html页面代码:
<!DOCTYPE html>
<html xmlns:th="http://www.thymeleaf.org">
<head>
<meta charset="UTF-8"/>
<title>服务器推送事件</title>
</head>
<body>
<div>
<div id="data"></div>
<div id="result"></div><br/>
</div>
<script th:inline="javascript" >
//服务器推送事件
if (typeof (EventSource) !== "undefined") {
//第一种写法
//接收服务器倒计时时间推送,使用HTML5 服务器发送事件(Server-Sent Events),参考资料:https://www.runoob.com/html/html5-serversentevents.html
var source = new EventSource("/sse/countDown");
console.log(source);
source.addEventListener("countDown", function(e) {
document.getElementById("result").innerHTML = e.data;
}, false);//使用false表示在冒泡阶段处理事件,而不是捕获阶段。
//第二种写法
//随机数获取
var source1 = new EventSource("/sse/retrieve");
//当抓取到消息时
source1.onmessage = function (evt) {
document.getElementById("data").innerHTML = "股票行情:" + evt.data;
};
} else {
//注意:ie浏览器不支持
document.getElementById("result").innerHTML = "抱歉,你的浏览器不支持 server-sent 事件...";
var xhr;
var xhr2;
if (window.XMLHttpRequest){
//IE7+, Firefox, Chrome, Opera, Safari浏览器支持该方法
xhr=new XMLHttpRequest();
xhr2=new XMLHttpRequest();
}else{
//IE6, IE5 浏览器不支持,使用ActiveXObject方法代替
xhr=new ActiveXObject("Microsoft.XMLHTTP");
xhr2=new ActiveXObject("Microsoft.XMLHTTP");
}
console.log(xhr);
console.log(xhr2);
xhr.open('GET', '/sse/countDown');
xhr.send(null);//发送请求
xhr.onreadystatechange = function() {
console.log("s响应状态:" + xhr.readyState);
//2是空响应,3是响应一部分,4是响应完成
if (xhr.readyState > 2) {
//这儿可以使用response(对应json)与responseText(对应text)
var newData = xhr.response.substr(xhr.seenBytes);
newData = newData.replace(/\n/g, "#");
newData = newData.substring(0, newData.length - 1);
var data = newData.split("#");
console.log("获取到的数据:" + data);
document.getElementById("result").innerHTML = data;
//长度重新赋值,下次截取时需要使用
xhr.seenBytes = xhr.response.length;
}
}
xhr2.open('GET', '/sse/retrieve');
xhr2.send(null);//发送请求
xhr2.onreadystatechange = function() {
console.log("s响应状态:" + xhr2.readyState);
//0: 请求未初始化,2 请求已接收,3 请求处理中,4 请求已完成,且响应已就绪
if (xhr2.readyState > 2) {
//这儿可以使用response(对应json)与responseText(对应text)
var newData1 = xhr2.response.substr(xhr2.seenBytes);
newData1 = newData1.replace(/\n/g, "#");
newData1 = newData1.substring(0, newData1.length - 1);
var data1 = newData1.split("#");
console.log("获取到的数据:" + data1);
document.getElementById("data").innerHTML = data1;
//长度重新赋值,下次截取时需要使用
xhr2.seenBytes = xhr2.response.length;
}
}
}
</script>
</body>
</html>
sse2.html页面代码:
<!DOCTYPE html>
<html xmlns:th="http://www.thymeleaf.org">
<head>
<meta charset="UTF-8"/>
<title>服务器推送</title>
</head>
<body>
<div>
<div id="dataModule"></div><br/>
<div id="note" style="width: 100%;" ></div>
</div>
<script src="https://cdn.staticfile.org/jquery/1.10.2/jquery.min.js" ></script>
<script th:inline="javascript" >
$(function() {
var time=1;
var xhr;
if (window.XMLHttpRequest){
//IE7+, Firefox, Chrome, Opera, Safari浏览器支持该方法
xhr=new XMLHttpRequest();
}else{
//IE6, IE5 浏览器不支持,使用ActiveXObject方法代替
xhr=new ActiveXObject("Microsoft.XMLHTTP");
}
console.log(xhr);
xhr.open('GET', '/quotes');
xhr.send(null);//发送请求
xhr.onreadystatechange = function() {
console.log("s响应状态:" + xhr.readyState);
//2是空响应,3是响应一部分,4是响应完成
if (xhr.readyState > 2) {
//这儿可以使用response(对应json)与responseText(对应text)
var newData = xhr.response.substr(xhr.seenBytes);
newData = newData.replace(/\n/g, "#");
newData = newData.substring(0, newData.length - 1);
var data = newData.split("#");
//显示加载次数,和大小
$("#dataModule").append("第"+time+"次数据响应"+data.length+"条<br/>");
$("#note").append("<div style='clear: both;'>第"+time+"次数据响应"+data.length+"条</div><div id='note"+time+"' style='width: 100%;'></div>");
var html="";
console.log("数据:" + data);
for(var i=0;i<data.length;i++) {
var obj = JSON.parse(data[i]);
html=html + "<div style='margin-left: 10px;margin-top: 10px; width: 80px;height: 80px;background-color: gray;float: left;'>"+obj.ticker+"</div>";
}
$("#note"+time).html(html);
time++;
//长度重新赋值,下次截取时需要使用
xhr.seenBytes = xhr.response.length;
}
}
})
</script>
</body>
</html>
sse3.html页面代码:
<!DOCTYPE html>
<html xmlns:th="http://www.thymeleaf.org">
<head>
<meta charset="UTF-8"/>
<title>服务器推送</title>
</head>
<body>
<div>
<div id="dataModule"></div><br/>
<div id="note" style="width: 100%;" ></div>
</div>
<script src="https://cdn.staticfile.org/jquery/1.10.2/jquery.min.js" ></script>
<script th:inline="javascript" >
$(function() {
var time=1;
var xhr;
if (window.XMLHttpRequest){
//IE7+, Firefox, Chrome, Opera, Safari浏览器支持该方法
xhr=new XMLHttpRequest();
}else{
//IE6, IE5 浏览器不支持,使用ActiveXObject方法代替
xhr=new ActiveXObject("Microsoft.XMLHTTP");
}
console.log(xhr);
xhr.open('POST', '/quotes');
xhr.send(null);//发送请求
xhr.onreadystatechange = function() {
console.log("s响应状态:" + xhr.readyState);
//2是空响应,3是响应一部分,4是响应完成
if (xhr.readyState > 2) {
//这儿可以使用response(对应json)与responseText(对应text)
var newData = xhr.response.substr(xhr.seenBytes);
newData = newData.replace(/\n/g, "#");
newData = newData.substring(0, newData.length);
console.log("数据:" + newData);
if(newData){
//将字符串类型的json转成json对象
var data = JSON.parse(newData.split("#"));
//显示加载次数,和大小
$("#dataModule").append("第"+time+"次数据响应"+data.length+"条<br/>");
$("#note").append("<div style='clear: both;'>第"+time+"次数据响应"+data.length+"条</div><div id='note"+time+"' style='width: 100%;'></div>");
var html="";
console.log("数据:" + data);
for(var i=0;i<data.length;i++) {
var obj = data[i];
html=html + "<div style='margin-left: 10px;margin-top: 10px; width: 80px;height: 80px;background-color: gray;float: left;'>"+obj.ticker+"</div>";
}
$("#note"+time).html(html);
time++;
//长度重新赋值,下次截取时需要使用
xhr.seenBytes = xhr.response.length;
}else{
$("#dataModule").append("响应完成!!!");
}
}
}
})
</script>
</body>
</html>
sse4.html页面代码:
<!DOCTYPE html>
<html xmlns:th="http://www.thymeleaf.org">
<head>
<meta charset="UTF-8"/>
<title>服务器推送</title>
</head>
<body>
<div>
<div id="dataModule"></div><br/>
<div id="note" style="width: 100%;" ></div>
</div>
<script src="https://cdn.staticfile.org/jquery/1.10.2/jquery.min.js" ></script>
<script th:inline="javascript" >
$(function() {
var time=1;
var xhr;
if (window.XMLHttpRequest){
//IE7+, Firefox, Chrome, Opera, Safari浏览器支持该方法
xhr=new XMLHttpRequest();
}else{
//IE6, IE5 浏览器不支持,使用ActiveXObject方法代替
xhr=new ActiveXObject("Microsoft.XMLHTTP");
}
console.log(xhr);
xhr.open('POST', '/echo1');
xhr.send(null);//发送请求
xhr.onreadystatechange = function() {
console.log("s响应状态:" + xhr.readyState);
//2是空响应,3是响应一部分,4是响应完成
if (xhr.readyState > 2) {
//这儿可以使用response(对应json)与responseText(对应text)
var newData = xhr.response.substr(xhr.seenBytes);
newData = newData.replace(/\n/g, "#");
newData = newData.substring(0, newData.length - 1);
var data = newData.split("#");
console.log("获取到的数据:" + data);
document.getElementById("dataModule").innerHTML = data;
}
}
})
</script>
</body>
</html>
注意:在前端页面,接收服务器的推送请求,需要html5的SSE支持,除了IE外,其他的浏览器都支持;
第三,后台代码;
import java.math.BigDecimal;
import java.math.MathContext;
import java.time.Instant;
/**
* 需要推送的实体类
* @author 程就人生
* @Date
*/
public class Quote {
private static final MathContext MATH_CONTEXT = new MathContext(2);
private String ticker;
private BigDecimal price;
private Instant instant;
public Quote() {
}
public Quote(String ticker, BigDecimal price) {
this.ticker = ticker;
this.price = price;
}
public Quote(String ticker, Double price) {
this(ticker, new BigDecimal(price, MATH_CONTEXT));
}
@Override
public String toString() {
return "Quote{" +
"ticker='" + ticker + '\'' +
", price=" + price +
", instant=" + instant +
'}';
}
public final String getTicker() {
return ticker;
}
public final void setTicker(String ticker) {
this.ticker = ticker;
}
public final BigDecimal getPrice() {
return price;
}
public final void setPrice(BigDecimal price) {
this.price = price;
}
public final Instant getInstant() {
return instant;
}
public final void setInstant(Instant instant) {
this.instant = instant;
}
}
import java.math.BigDecimal;
import java.math.MathContext;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;
import org.springframework.stereotype.Component;
import com.example.entity.Quote;
import reactor.core.publisher.Flux;
/**
* 推送数据,模拟生成
* @author 程就人生
* @Date
*/
@Component
public class QuoteGenerator {
private final MathContext mathContext = new MathContext(2);
private final Random random = new Random();
private final List<Quote> prices = new ArrayList<>();
/**
* 生成行情数据
*/
public QuoteGenerator() {
this.prices.add(new Quote("CTXS", 82.26));
this.prices.add(new Quote("DELL", 63.74));
this.prices.add(new Quote("GOOG", 847.24));
this.prices.add(new Quote("MSFT", 65.11));
this.prices.add(new Quote("ORCL", 45.71));
this.prices.add(new Quote("RHT", 84.29));
this.prices.add(new Quote("VMW", 92.21));
}
public Flux<Quote> fetchQuoteStream(Duration period) {
// 需要周期生成值并返回,使用 Flux.interval
return Flux.interval(period)
// In case of back-pressure, drop events
.onBackpressureDrop()
// For each tick, generate a list of quotes
.map(this::generateQuotes)
// "flatten" that List<Quote> into a Flux<Quote>
.flatMapIterable(quotes -> quotes)
.log("io.spring.workshop.stockquotes");//以日志的形式输出
}
/**
* Create quotes for all tickers at a single instant.
*/
private List<Quote> generateQuotes(long interval) {
final Instant instant = Instant.now();
return prices.stream()
.map(baseQuote -> {
BigDecimal priceChange = baseQuote.getPrice()
.multiply(new BigDecimal(0.05 * this.random.nextDouble()), this.mathContext);
Quote result = new Quote(baseQuote.getTicker(), baseQuote.getPrice().add(priceChange));
result.setInstant(instant);
return result;
})
.collect(Collectors.toList());
}
}
import java.time.Duration;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import com.example.entity.Quote;
import com.example.generator.QuoteGenerator;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* 数据处理handler,相当于service层
* @author 程就人生
* @Date
*/
@Component
public class QuoteHandler {
private final Flux<Quote> quoteStream;
public QuoteHandler(QuoteGenerator quoteGenerator) {
this.quoteStream = quoteGenerator.fetchQuoteStream(Duration.ofMillis(1000 * 10)).share();
}
public Mono<ServerResponse> hello(ServerRequest request) {
Long start = System.currentTimeMillis();
return ServerResponse.ok().contentType(MediaType.TEXT_PLAIN)
.body(BodyInserters.fromObject("Hello Spring!" + start));
}
public Mono<ServerResponse> echo(ServerRequest request) {
return ServerResponse.ok().contentType(MediaType.TEXT_PLAIN)
.body(request.bodyToMono(String.class), String.class);
}
public Mono<ServerResponse> streamQuotes(ServerRequest request) {
Long start = System.currentTimeMillis();
System.out.println("--------------" + start + "--------------");
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_STREAM_JSON) //返回多次
.body(this.quoteStream, Quote.class);
}
public Mono<ServerResponse> fetchQuotes(ServerRequest request) {
int size = Integer.parseInt(request.queryParam("size").orElse("10"));
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON) //返回一次
.body(this.quoteStream.take(size), Quote.class);
}
}
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.server.RequestPredicates;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;
import com.example.handler.QuoteHandler;
/**
* 路由,相当于Controller层
* @author 程就人生
* @Date
*/
@Configuration
public class QuoteRouter {
@Bean
public RouterFunction<ServerResponse> route(QuoteHandler quoteHandler) {
return RouterFunctions
.route(RequestPredicates.GET("/hello1").and(RequestPredicates.accept(MediaType.TEXT_PLAIN)), quoteHandler::hello)
.andRoute(RequestPredicates.POST("/echo1").and(RequestPredicates.accept(MediaType.TEXT_PLAIN).and(RequestPredicates.contentType(MediaType.TEXT_PLAIN))), quoteHandler::echo)
//响应一次
.andRoute(RequestPredicates.POST("/quotes").and(RequestPredicates.accept(MediaType.APPLICATION_JSON)), quoteHandler::fetchQuotes)
//响应多次
.andRoute(RequestPredicates.GET("/quotes").and(RequestPredicates.accept(MediaType.APPLICATION_STREAM_JSON)), quoteHandler::streamQuotes);
}
}
import java.time.Duration;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import reactor.core.publisher.Flux;
import reactor.util.function.Tuples;
/**
* 服务器发送事件SSE(Server-Sent Events)
* 页面渲染及请求
* @author 程就人生
* @Date
*/
@Controller
@RequestMapping("/sse")
public class SseController {
//三分钟倒计时
private int count_down_sec=3*60*60;
/**
* 推送页面1
* @return
*/
@GetMapping
public String sse(){
return "sse";
}
/**
* 推送页面2
* @return
*/
@GetMapping("/two")
public String sse2(){
return "sse2";
}
/**
* 推送页面3
* @return
*/
@GetMapping("/three")
public String sse3(){
return "sse3";
}
/**
* 推送页面4
* @return
*/
@GetMapping("/four")
public String sse4(){
return "sse4";
}
//报头设置为 "text/event-stream",以便于发送事件流
@GetMapping(value="/countDown",produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@ResponseBody
public Flux<ServerSentEvent<Object>> countDown() {
//每一秒钟推送一次
return Flux.interval(Duration.ofSeconds(1))
.map(seq -> Tuples.of(seq, getCountDownSec()))
.map(data -> ServerSentEvent.<Object>builder()
.event("countDown") //和前端addEventListener监听的事件一一对应
.id(Long.toString(data.getT1())) //为每次发送设置一个id
.data(data.getT2().toString())
.build());
}
private String getCountDownSec() {
if (count_down_sec>0) {
int h = count_down_sec/(60*60);
int m = (count_down_sec%(60*60))/60;
int s = (count_down_sec%(60*60))%60;
count_down_sec--;
return "活动倒计时:"+h+" 小时 "+m+" 分钟 "+s+" 秒";
}
return "活动倒计时:0 小时 0 分钟 0 秒";
}
//报头设置为 "text/event-stream",以便于发送事件流,这种写法等同于MediaType.TEXT_EVENT_STREAM_VALUE "text/event-stream;charset=UTF-8"
@GetMapping(value = "/retrieve",produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@ResponseBody
public double retrieve() {
try {
//每0.5秒刷新数据
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
//模拟股票实时变动数据
return Math.ceil(Math.random() * 10000);
}
}
最后,测试运行结果;
总结
虽然参考了很多资料,对于响应式编程还是很陌生,写个demo后,依旧没有感受到它的精华,基于WebFlux框架实现SSE事件,不难看出来还是基于长连接的,在实际场景中,基于长连接的推送事件是否适用,还值得再思考。
参考资料:
https://docs.spring.io/spring-framework/docs/5.0.3.RELEASE/spring-framework-reference/web-reactive.html#webflux-dispatcher-handler
https://blog.csdn.net/wshl1234567/article/details/80320116
https://blog.csdn.net/Message_lx/article/details/81075766
https://www.cnblogs.com/Alandre/category/957422.html
https://segmentfault.com/a/1190000020686218?utm_source=tag-newest
https://my.oschina.net/bianxin/blog/3063713
html5服务器发送事件
https://www.runoob.com/html/html5-serversentevents.html
https://www.xttblog.com/spring-webflux.html
网友评论