服务端
import { IncomingMessage, ServerResponse, OutgoingMessage } from 'node:http';
export default fromNodeMiddleware((req: IncomingMessage, res: ServerResponse) => {
res.setHeader('Content-Type', 'text/event-stream')
res.setHeader('Cache-Control', 'no-cache')
res.setHeader('Connection', 'keep-alive')
res.flushHeaders();
sendSSEData(res)
})
// 发送 SSE 格式的数据
function sendSSEData(response: ServerResponse) {
let messageId = 0;
let id = setInterval(() => {
const eventData = {
id: messageId,
timestamp: Date.now(),
message: 'This is a SSE event message',
};
const eventDataStr = `id: ${eventData.id}\n` +
`event: message\n` +
`data: ${JSON.stringify(eventData)}\n\n`;
// response.write(JSON.stringify(eventData))
response.write(eventDataStr);
if(messageId == 20) {
response.end();
clearInterval(id)
}
messageId++;
}, 1000); // 每秒发送一次数据
// 当客户端断开连接时清除定时器
response.on('close', () => {
console.log("客户端断开链接")
clearInterval(id);
});
// 当客户端请求结束时清除定时器
response.on('end', () => {
console.log("客户端请求结束")
clearInterval(id);
});
}
客户端
// fetchStream.ts
type EvnetName = "message";
class FetchStream {
url = "";
options:any = {};
#controller:any = null;#controllerReadableStream:any = null;
#eventList:{[name : string]: (val: string) => void} = {};
#isRequest = true
constructor(url: string, options:any) {
this.url = url;
this.options = options;
this.#controller = new AbortController();
}
async #push(controller: any, reader: any) {
const { value, done } = await reader.read();
if (done || !this.#isRequest) {
this.#controller.abort();
controller.close();
} else {
this.#writeEvent('message', new TextDecoder().decode(value))
controller.enqueue(value);
this.#push(controller, reader);
}
}
async request() {
const _self = this;
useAsyncData("request", () => $fetch(this.url, {
headers: this.options?.headers || {},
responseType: "stream",
signal: _self.#controller.signal,
async onResponse({ response }) {
console.log("sse 连接成功")
const reader = response.body?.getReader();
new ReadableStream({
start(controller) {
_self.#push(controller, reader)
},
})
},
}), {
server: this.options?.server || false
})
}
async close() {
this.#isRequest = false;
}
on(eventName: EvnetName, callback: (val: string) => void) {
if(!this.#eventList.hasOwnProperty(eventName)) {
this.#eventList[eventName] = callback;
} else {
console.error(`${eventName}不能重复绑定`)
}
}
#writeEvent(evnetName: string, val:string) {
let _callback = this.#eventList[evnetName];
if(!_callback) return;
_callback(val);
}
}
export default FetchStream;
使用
import FetchStream from "./fetchStream";
fetchStream = new FetchStream(url, {
headers: {
'accept': 'text/event-stream',
'Content-Type': 'application/json',
},
});
// 发起请求
fetchStream.request();
// 断开请求
setTimeout(() => {
fetchStream.close();
}, xx);
网友评论