美文网首页Web 前端开发
express + websocket 实现单点推送

express + websocket 实现单点推送

作者: Obvious_96 | 来源:发表于2019-12-27 14:29 被阅读0次

    本文简要介绍一个使用websocket协议实现单点推送的小demo的实现

    github:https://github.com/SMIELPF/websocket-demo

    websocket

    WebSocket 协议在2008年诞生,2011年成为国际标准。所有浏览器都已经支持了。
    它的最大特点就是,服务器可以主动向客户端推送信息,客户端也可以主动向服务器发送信息,>是真正的双向平等对话,属于服务器推送技术的一种。

    其他特点包括:

    • 建立在 TCP 协议之上,服务器端的实现比较容易。
    • 与 HTTP 协议有着良好的兼容性。默认端口也是80和443,并且握手阶段采用 HTTP 协议,因此握手时不容易屏蔽,能通过各种 HTTP 代理服务器。
    • 数据格式比较轻量,性能开销小,通信高效。
    • 可以发送文本,也可以发送二进制数据。
    • 没有同源限制,客户端可以与任意服务器通信。
    • 协议标识符是ws(如果加密,则为wss),服务器网址就是 URL。

    在浏览器端,HTML5已经提供了Websocket API,而在服务端,
    也有许多优秀的第三方库提供对websocket的支持,例如在Node.js中比较常用的就有socket.io, express-ws等,下面我们就用express-ws来实现一个简单的websocket通信的小demo

    客户端实现

    我们实现这样一个web客户端

    image
    可以通过下拉框选择消息发送者和接收者,点击发送按钮后,通过http post请求告知服务端消息的发送者,接收者以及消息内容,然后服务端通过websocket向消息接收者推送消息。
    客户端与服务端建立url为ws://{host}/ws/:name的websocket连接,其中name为消息发送者,当发送者改变时,关闭上一条连接,建立新的连接,例如消息发送方从Bob变为Alice, 则关闭ws://{host}/ws/Bob, 建立ws://{host}/ws/Alice, 这样我们就区分开了客户端,方便之后进行单点推送。

    前端html:

    <!DOCTYPE html>
    <html>
      <head>
      <title>websocket demo</title>
        <meta name="viewport" content="width=device-width, initial-scale=1" />
        <meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
        <style>
            .row {
                margin: 1rem
            }
        </style>
      </head>
      <body>
        <div class='row'>发送方: 
          <select id='sender'>
            <option value="Bob" selected>Bob</option>
            <option value="Alice">Alice</option>
            <option value="Jack">Jack</option>
          </select>
        </div>
        <div class='row'>接收方: 
          <select id='receiver'>
            <option value="Bob">Bob</option>
            <option value="Alice" selected>Alice</option>
            <option value="Jack">Jack</option>
          </select>
        </div>
        <textarea id='msg' class='row' rows="10" cols="30"></textarea>
        <div class='row'>
            <button id='sendBtn'>发送</button>
        </div>
        <h3 class='row'>收到的消息:</h3>
        <div id='conversation' class='row'></div>
        <script src="/bundle.js"></script>
      </body>
    </html>
    

    前端js代码:

    var sender = document.getElementById('sender');
    var receiver = document.getElementById('receiver');
    var conversation = document.getElementById('conversation');
    var sendBtn = document.getElementById('sendBtn');
    var socket = null;
    var createSocket = function() {
        if(socket) {
            socket.close();
        }
        var url = 'ws://' + window.location.host + '/ws/' + sender.options[sender.selectedIndex].value;
        socket = new WebSocket(url);
        socket.onopen = function() {
            console.log('connected to ' + url);
        }
        socket.onmessage = function(event) {
            var data = JSON.parse(event.data);
            conversation.innerHTML = conversation.innerHTML + data.from + ':' + data.content + '<br/>'; 
        }
        socket.onclose = function() {
            console.log('close connect to' + url);
        }
    };
    
    var sendMessage = function() {
        var msg = document.getElementById('msg').value;
        fetch('/rest/message', {
            method: 'POST',
            headers: {
                'Content-type': 'application/json'
            },
            body: JSON.stringify({
                from: sender.options[sender.selectedIndex].value,
                content: msg,
                to: receiver.options[receiver.selectedIndex].value
            }) 
        }).then(res => {
            return res.json();
        }).then(data => {
            if(!data.succeed) {
                alert(data.msg);
            }
        })
    };
    
    sender.onchange = function() {
        createSocket();
    }
    
    sendBtn.onclick = function() {
        sendMessage();
    }
    
    createSocket();
    

    服务端实现

    服务端实现依赖express和express-ws,
    主要实现两个接口,一个是websocket接口,一个是http接口

    websocket接口的实现如下:

    const app = new express();
    expressWs(app);
    
    const wsClients = {}
    app.wsClients = wsClients;
    
    app.ws('/ws/:wid',  (ws, req) => {
        if(!wsClients[req.params.wid]) {
            wsClients[req.params.wid] = []
        }
        // 将连接记录在连接池中
        wsClients[req.params.wid].push(ws);
        ws.onclose = () => {
            // 连接关闭时,wsClients进行清理
            wsClients[req.params.wid] = wsClients[req.params.wid].filter((client) => {
                return client !== ws;
            });
            if(wsClients[req.params.wid].length === 0) {
                delete wsClients[req.params.wid];
            }
        }
    });
    

    首先声明一个连接池wsClients, 这是一个对象,键为消息发送方的名字,值是一个数组,用于保存所有对应的websocket连接实例。当一个websocket连接建立时,我们把连接记录在连接池中,并在onclose方法中声明连接关闭时清理连接池的回调。
    http接口的实现如下:

    app.post('/rest/message', (req, res) => {
        const to = req.body.to; // 接收方id
        const from = req.body.from; // 发送发id
        const result = { succeed: true };
        if(wsClients[to] !== undefined) {
            wsClients[to].forEach((client) => {
                client.send(JSON.stringify({
                    from,
                    content: req.body.content
                }));
            });
        } else {
            // 如果消息接收方没有连接,则返回错误信息
            result.succeed = false;
            result.msg = '对方不在线';
        }
        res.json(result);
    });
    

    从http请求的body中获取消息发送方和接收方,然后从连接池中遍历所有消息接收方的websocket连接实例,向客户端推送消息

    完整的服务端代码如下,在实现基本功能的基础上,定时打印一下连接池中websocket连接的数量:

    const express = require('express');
    const expressWs = require('express-ws');
    
    const app = new express();
    expressWs(app);
    
    const wsClients = {}
    app.wsClients = wsClients;
    app.use(express.json());
    app.use(express.urlencoded({ extended: false }));
    app.use(express.static('./static'));
    
    app.ws('/ws/:wid',  (ws, req) => {
        if(!wsClients[req.params.wid]) {
            wsClients[req.params.wid] = []
        }
        // 将连接记录在连接池中
        wsClients[req.params.wid].push(ws);
        ws.onclose = () => {
            // 连接关闭时,wsClients进行清理
            wsClients[req.params.wid] = wsClients[req.params.wid].filter((client) => {
                return client !== ws;
            });
            if(wsClients[req.params.wid].length === 0) {
                delete wsClients[req.params.wid];
            }
        }
    });
    
    app.post('/rest/message', (req, res) => {
        const to = req.body.to; // 接收方id
        const from = req.body.from; // 发送发id
        const result = { succeed: true };
        if(wsClients[to] !== undefined) {
            wsClients[to].forEach((client) => {
                client.send(JSON.stringify({
                    from,
                    content: req.body.content
                }));
            });
        } else {
            // 如果消息接收方没有连接,则返回错误信息
            result.succeed = false;
            result.msg = '对方不在线';
        }
        res.json(result);
    });
    
    setInterval(() => {
        // 定时打印连接池数量
        console.log('websocket connection counts:')
        Object.keys(wsClients).forEach(key => {
            console.log(key, ':', wsClients[key].length);
        })
        console.log('-----------------------------');
    }, 5000);
    
    app.listen(3000, () => {
        console.log('visit http://localhost:3000');
        // child_process.execSync('start http://localhost:3000');
    });
    

    思考

    现在我们就实现了一个简单的websocket通信的小demo,但是现在这种实现方式是在处理http post请求的过程中向客户端使用websocket推送消息,如果服务端是单节点部署倒是没什么问题

    image
    但是如果服务是部署在多个节点上,就会出现部分客户端收不到服务端推送的情况
    image
    如上图所示,Bob1想给Alice发消息,但是只有跟Bob1负载在同一节点的Alice1能收到服务端的推送,Alice2就收不到了。这种时候我们就需要利用Redis的pub/sub或者kafka这样的中间件了
    image
    正所谓,在计算机领域,如果有什么事是加一个中间层搞不定的,那就加两个。
    关于websocket集群的实现,可以看一下这一篇:关于一个 websocket 多节点分布式问题的头条前端面试题

    本demo的代码:github:https://github.com/SMIELPF/websocket-demo

    觉得有帮助的话欢迎点赞,点一点star哦 : )

    相关文章

      网友评论

        本文标题:express + websocket 实现单点推送

        本文链接:https://www.haomeiwen.com/subject/daldoctx.html