美文网首页
Nuxt3 实现SSE

Nuxt3 实现SSE

作者: 空腹无才 | 来源:发表于2023-10-29 15:39 被阅读0次

服务端

import { IncomingMessage, ServerResponse, OutgoingMessage } from 'node:http';

export default fromNodeMiddleware((req: IncomingMessage, res: ServerResponse) => {

    res.setHeader('Content-Type', 'text/event-stream')
    res.setHeader('Cache-Control', 'no-cache')
    res.setHeader('Connection', 'keep-alive')
    res.flushHeaders();

    sendSSEData(res)
})

// 发送 SSE 格式的数据
function sendSSEData(response: ServerResponse) {
    let messageId = 0;

    let id = setInterval(() => {
        const eventData = {
            id: messageId,
            timestamp: Date.now(),
            message: 'This is a SSE event message',
        };

        const eventDataStr = `id: ${eventData.id}\n` +
            `event: message\n` +
            `data: ${JSON.stringify(eventData)}\n\n`;

        // response.write(JSON.stringify(eventData))
        response.write(eventDataStr);
        if(messageId == 20) {
            response.end();
            clearInterval(id)
        }
        messageId++;
    }, 1000); // 每秒发送一次数据

    // 当客户端断开连接时清除定时器
    response.on('close', () => {
        console.log("客户端断开链接")
        clearInterval(id);
    });

    // 当客户端请求结束时清除定时器
    response.on('end', () => {
        console.log("客户端请求结束")
        clearInterval(id);
    });
}

客户端

// fetchStream.ts
type EvnetName  = "message";


class FetchStream {
    url = "";
    options:any = {};
    #controller:any = null;#controllerReadableStream:any = null;
    #eventList:{[name : string]: (val: string) => void} = {};
    #isRequest = true

    constructor(url: string, options:any) {
        this.url = url;
        this.options = options;
        this.#controller = new AbortController();
    }

    async #push(controller: any, reader: any) {
        const { value, done } = await reader.read();
        if (done || !this.#isRequest) {
            this.#controller.abort();
            controller.close();
        } else {
            this.#writeEvent('message', new TextDecoder().decode(value))
            controller.enqueue(value);
            this.#push(controller, reader);
        }
    }


    async request() {
        const _self = this;
        useAsyncData("request",  () =>  $fetch(this.url, {
            headers: this.options?.headers || {},
            responseType: "stream",
            signal: _self.#controller.signal,
            async onResponse({ response }) {
                console.log("sse 连接成功")
                const reader = response.body?.getReader();
                new ReadableStream({
                    start(controller) {
                        _self.#push(controller, reader)
                    },
                })
            },
        }), {
            server: this.options?.server || false
        })
    }

    async close() {
        this.#isRequest = false;
    }

    on(eventName: EvnetName, callback: (val: string) => void) {
        if(!this.#eventList.hasOwnProperty(eventName)) {
            this.#eventList[eventName] = callback;
        } else {
            console.error(`${eventName}不能重复绑定`)
        }
    }

    #writeEvent(evnetName: string, val:string) {
        let _callback = this.#eventList[evnetName];

        if(!_callback) return;
        _callback(val);
    }
}

export default FetchStream;

使用

import FetchStream from "./fetchStream";

fetchStream = new FetchStream(url, {
        headers: {
            'accept': 'text/event-stream',
            'Content-Type': 'application/json',
        },
 });

// 发起请求
fetchStream.request();

// 断开请求
setTimeout(() => {
       fetchStream.close();
}, xx);

相关文章

网友评论

      本文标题:Nuxt3 实现SSE

      本文链接:https://www.haomeiwen.com/subject/fojvidtx.html