sse实现流式数据接收:web、服务端
测试步骤
1.找个web项目,SSEController+SSEUtils运行起来
2.web方式: 浏览器打开sse.html
3.服务端方式:执行SSEClient main方法
SSEController
package com.example.demo.controller;
import com.example.demo.util.sse.SSEUtils;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
@RestController
@RequestMapping("/sse/websocket")
public class SSEController {
/** 正式使用注意并发量控制 */
@RequestMapping(value = "/sub", method = RequestMethod.GET, produces = {MediaType.TEXT_EVENT_STREAM_VALUE})
public SseEmitter subscribe(@RequestParam("questionId") String questionId) {
// 简单异步发消息 ====
new Thread(() -> {
try {
Thread.sleep(1000);
for (int i = 0; i < 10; i++) {
Thread.sleep(500);
SSEUtils.pubMsg(questionId, questionId + " - kingtao come " + i);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 消息发送完关闭订阅
SSEUtils.closeSub(questionId);
}
}).start();
// =================
return SSEUtils.addSub(questionId);
}
}
SSEUtils
package com.example.demo.util.sse;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class SSEUtils {
// timeout
private static Long DEFAULT_TIME_OUT = 2*60*1000L;
// 订阅表
private static Map<String, SseEmitter> subscribeMap = new ConcurrentHashMap<>();
/** 添加订阅 */
public static SseEmitter addSub(String questionId) {
if (null == questionId || "".equals(questionId)) {
return null;
}
SseEmitter emitter = subscribeMap.get(questionId);
if (null == emitter) {
emitter = new SseEmitter(DEFAULT_TIME_OUT);
subscribeMap.put(questionId, emitter);
}
return emitter;
}
/** 发消息 */
public static void pubMsg(String questionId, String msg) {
SseEmitter emitter = subscribeMap.get(questionId);
if (null != emitter) {
try {
// 更规范的消息结构看源码
emitter.send(SseEmitter.event().data(msg));
} catch (Exception e) {
// e.printStackTrace();
}
}
}
// 关闭订阅
public static void closeSub(String questionId) {
SseEmitter emitter = subscribeMap.get(questionId);
if (null != emitter) {
try {
emitter.complete();
subscribeMap.remove(questionId);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
SSEClient
package com.example.demo.util.sse;
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
public class SSEClient {
// timeout
private static Integer DEFAULT_TIME_OUT = 2*60*1000;
/** 获取SSE输入流 */
public static InputStream getSseInputStream(String urlPath, int timeoutMill) throws IOException {
HttpURLConnection urlConnection = getHttpURLConnection(urlPath, timeoutMill);
InputStream inputStream = urlConnection.getInputStream();
return new BufferedInputStream(inputStream);
}
/** 读流数据 */
public static void readStream(InputStream is, MsgHandler msgHandler) throws IOException {
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
try {
String line = "";
while ((line = reader.readLine()) != null) {
msgHandler.handleMsg(line);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
// 服务器端主动关闭时,客户端手动关闭
reader.close();
is.close();
}
}
private static HttpURLConnection getHttpURLConnection(String urlPath, int timeoutMill) throws IOException {
URL url = new URL(urlPath);
HttpURLConnection urlConnection = (HttpURLConnection) url.openConnection();
urlConnection.setDoOutput(true);
urlConnection.setDoInput(true);
urlConnection.setUseCaches(false);
urlConnection.setRequestMethod("GET");
urlConnection.setRequestProperty("Connection", "Keep-Alive");
urlConnection.setRequestProperty("Charset", "UTF-8");
// text/plain模式
urlConnection.setRequestProperty("Content-Type", "text/plain; charset=UTF-8");
// 读过期时间
urlConnection.setReadTimeout(timeoutMill);
return urlConnection;
}
/** 消息处理接口 */
interface MsgHandler {
void handleMsg(String line);
}
public static void main(String[] args) throws Exception {
// 单订阅
String urlPath = "http://localhost:8089/sse/websocket/sub?questionId=kingtao3";
InputStream inputStream = getSseInputStream(urlPath, DEFAULT_TIME_OUT);
readStream(inputStream, new MsgHandler() {
@Override
public void handleMsg(String line) {
if (line != null && line.contains("data:")) {
// 注意按约定的消息协议解析消息
String msg = line.split(":")[1];
System.out.println(msg);
}
}
});
// 并发订阅
// ExecutorService executorService = Executors.newFixedThreadPool(100);
// for (int i = 0; i < 100; i++) {
// int finalI = i;
// executorService.submit(() -> {
// try {
// String urlPath = "http://localhost:8089/sse/websocket/subscribe?questionId=kingtao" + finalI;
// InputStream inputStream = getSseInputStream(urlPath, DEFAULT_TIME_OUT);
// readStream(inputStream, new MsgHandler() {
// @Override
// public void handleMsg(String line) {
// if (line != null && line.contains("data:")) {
// // 注意按约定的消息协议解析消息
// String msg = line.split(":")[1];
// System.out.println(msg);
// }
// }
// });
// } catch (Exception e) {
// e.printStackTrace();
// }
// });
// }
// executorService.shutdown();
}
}
sse.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>sse</title>
</head>
<body>
<div>
<label>问题id</label>
<input type="text" id="questionId">
<button onclick="subscribe()">订阅</button>
<hr>
<label>F12-console控制台查看消息</label>
</div>
<script>
function subscribe() {
let questionId = document.getElementById('questionId').value;
let url = 'http://localhost:8089/sse/websocket/sub?questionId=' + questionId;
let eventSource = new EventSource(url);
eventSource.onmessage = function (e) {
console.log(e.data);
};
eventSource.onopen = function (e) {
// todo
};
eventSource.onerror = function (e) {
// todo
eventSource.close()
};
}
</script>
</body>
</html>
网友评论