美文网首页
WebSocket 的实现

WebSocket 的实现

作者: 空腹无才 | 来源:发表于2020-08-23 16:30 被阅读0次

    网络协议的

    长连接: 一个链接上可以连续发送多个数据包,在链接期间,如果没有数据包发送,需要双方发链路检查包

    TCP/IP: TCP/IP 属于传输层,主要解决网络中的数据传输问题,只管传输数据。但这样对传输的数据没有一个规范的封装、解析等处理。使得传输的数据难以识别,所以才有了应用层协议对数据进行的封装、解析等,如http协议。

    HTTP: HTTP协议是应用层协议,用于分装解析传输数据。 从HTTP1.1开始其实就默认开启了长链接,也就是请求头header中可以看到Connection:Keep-alive。但是长连接只是说保持了(服务器可以告诉客户端保持时间Keep-Alive:timeout=20;max=20;)这个TCP通道,并采用服务器和客户端应答模式(Request-Response),不需要再创建一个链接通道,做到一个性能优化。

    socket: 与HTTP协议不一样,socket不是协议,他是在程序层面上对传输层协议(像TCP/IP)的接口封装。我们知道传输层的协议,是解决数据在网络中传输的问题的,那么socket(套接字)就是传输通道两端的接口。

    Websocket: WebSocket是包装成了一个应用层协议作为socket,从而能够让客户端和远程服务端通过web建立全双工通信。

    WebSocket API (客户端)

    WebSocket API 是HTML5 推出的东西。在客户端我们可以通过HTML5 所提供的API 对websocket 进行创建、发送数据、监听信息、监听报错等功能(HTML5 WebSocket)

    if("WebSocket" in window) {
        //创建WebSocket实例,可以使用ws和wss。第二个参数可以选填自定义协议,如果多协议,可以以数组方式
        let ws = new WebSocket("ws://127.0.0.1:8091");
        
        // 用于指定连接成功后的回调函数。
        ws.onopen = (event) => {
            console.log("WebSocket 链接成功");
        }
        
        // 用于指定当从服务器接受到信息时的回调函数。
        ws.onmessage = (data) => {
            console.log(data);
        }
        
        // 用于指定连接关闭后的回调函数。
        ws.onerror = (e) => {
            console.error("WebSocket error observed:", e);
        }
        
        //....
    else {
        alert("该浏览器不支持 WebSocket")
    }
    

    WebSocket (Node 服务端)

    我们知道WebSocket 是在Socket的基础上实现的,所以我们要做的是对现有的Socket协议进行升级。

    步骤: 客户端发送websocket请求-->服务端接受并识别该请求-->对该请求协议进行升级--> 返回给客户端 --> websocket 通道建立 --> 客户端/服务端发送数据

    协议升级

    // 创建websocket
    const http = require("http");
    const crypto = require("crypto");
    
    const MAGIC_STRINC = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; // 固定值
    const port = 8091;
    
    let server = http.createServer();
    
    // http服务器部分
    server.on("request", (req, res) => {
        res.end("websocket test");
    })
    
    // upgrade 请求(升级通讯协议)
    server.on("upgrade", (req, socket, head) => {
        // 加密 Sec-Websocket-Accept 值
        const swa = crypto.createHash('sha1').update(req.headers['sec-websocket-key'] + MAGIC_STRINC).digest('base64');
        
        // 构造响应头部
        let resHeaders = ([
            'HTTP/1.1 101 Switching Protocols', //必需。响应头。状态码为101。任何非101的响应都为握手未完成。但是HTTP语义是存在的。
            'Upgrade: websocket', // 必需。升级类型。
            'Connection: Upgrade', //必需。本次连接类型为升级。
            `Sec-Websocket-Accept: ${swa}` //必需。表明服务器是否愿意接受连接。如果接受,值就必须是通过上面算法得到的值。
        ]).concat('','').join('\r\n');
        
        // 返回升级协议信息  完成WebSocket通道建立
        socket.write(resHeaders);
    })
    
    // 启动服务器
    server.listen(port, ()=> {
        let dateTime = 
            (new Date()).getFullYear() +
            "-" + ((new Date()).getMonth() + 1) +
            "-" + (new Date()).getDate() +
            " " + (new Date()).getHours() +
            ":" + (new Date()).getMinutes() +
            ":" + (new Date()).getSeconds();
        console.log(`${dateTime} server start success: 127.0.0.1:${port}`)
    })
    
    

    在这里需要注意的是头部信息和头部信息中的Sec-Websocket-Accept的值。

    该值需要是一个通过base64加密的哈希值(sha1)。 而该加密所用的数据是客户端传过来的sec-websocket-key的值和MAGIC_STRINC内的固定值。 对MAGIC_STRINC的说明

    WebSocket 数据传输

    Webscoket 中传输的数据是数据帧(frame)

    数据帧有多种类型 主要有:文本型、二进制数据

    数据帧结构

    68747470733a2f2f692e696d6775722e636f6d2f79506350784a332e706e67.png

    每一列代表一个字节,一个字节8位,每一位又代表一个二进制数。

    • fin: 标识这一帧数据是否是该分块的最后一帧。
        1 为最后一帧
        0 不是最后一帧。需要分为多个帧传输
    
    • RSV1、RSV2、RSV3
    扩展字段,除非一个扩展经过协商赋予了非零值的某种含义,否则必须为0
    
    • opcode
    解释 payload data 的类型,如果收到识别不了的opcode,直接断开。
    分类值如下:
    %x0:连续的帧 
    %x1:text帧 
    %x2:binary帧 
    %x3 - 7:为非控制帧而预留的 
    %x8:关闭握手帧 
    %x9:ping帧 
    %xA:pong帧 
    %xB - F:为非控制帧而预留的
    
    • masked: 占第二个字节的一位,定义了masking-key是否存在。并且使用masking-key掩码解析Payload data。
        1 客户端发送数据到服务端
        0 服务端发送数据到客户端
    
    • payload length: 表示Payload data的总长度。占7位,或者7+2个字节、或者7+8个字节。
        0-125,则是payload的真实长度
        126,则后面2个字节形成的16位无符号整型数的值是payload的真实长度,125<数据长度<65535
        127,则后面8个字节形成的64位无符号整型数的值是payload的真实长度,数据长度>65535
    
    • masking key: 0或4字节,当masked为1的时候才存在,为4个字节,否则为0,用于对我们需要的数据进行解密
    • payload data: 我们需要的数据,如果masked为1,该数据会被加密,要通过masking key进行异或运算解密才能获取到真实数据。

    创建数据帧

    // 创建数据帧
    function createDate(data) {
        let dataType = Buffer.isBuffer(data); // 判断数据是否是buffer类型
        let dataBuf, // 需要发送的二进制数据
            dataLength, // 数据真实长度
            dataIndex = 2; // 数据的起始位置
        let frame; // 用来存储封装好的数据帧
    
        if(dataType) {
            dataBuf = data;
        } else {
            dataBuf = Buffer.from(data);
        }
    
        dataLength = dataBuf.byteLength;
    
        // 计算payload data在frame中的起始位置
        dataIndex = dataIndex + (dataLength > 65535 ? 8 : (dataLength > 125 ? 2 : 0));
        // 创建多大空间
        frame = new Buffer.alloc(dataIndex + dataLength);
    
        // 第一个字节, fin = 1, opcode = 1
        frame[0] = parseInt(10000001, 2);
    
        // 长度超过65535 的由8个字节表示
        if(dataLength > 65535) {
            frame[1] = 127; // 第二个字节
            frame.writeUInt32BE(0, 2); // (值, 写入之前要跳过的位置)
            frame.writeUInt32BE(dataLength, 6);
        } else if (dataLength > 125) {
            frame[1] = 126;
            frame.writeUInt16BE(dataLength, 2);
        } else {
            frame[1] = dataLength;
        }
    
        // 发送给您客户端的数据
        frame.write(dataBuf.toString(), dataIndex); // (数据, 数据写入到buffer的位置)
    
        // this.write() == socket.write()  发送数据给客户端
        this.write(frame);
    
    }
    

    解数据帧

    // 获取客户端数据状态
    function getHandleDateState(data) {
            let dataIndex = 2; // 数据索引 应为第一个字节和第二个字节肯定不是数据,所以数据从初始值2开始
            let secondeByte = data[1]; // 代表masked位和可能是payloadLength位的第二个字节
            let hasMask = secondeByte >= 128; // 如果大于大于或等一128, 说明masked为1
    
            let dataLength, maskedData;
    
            // 如果数据为126, 则后面16位长度为数据位,如果为127 则后面64位长度的数据为数据长度
            if(secondeByte == 126) {
                dataIndex += 2;
                dataLength = data.readUInt16BE(2);
    
            } else if(secondeByte == 127) {
                dataIndex += 8;
                dataLength = data.readUInt32BE(2) + data.readUInt32BE(6);
            } else {
                dataLength = secondeByte;
            }
    
            // 如果有掩码, 则获取32位的二进制masking key, 同时更新index
            if(hasMask) {
                maskedData = data.slice(dataIndex, dataIndex + 4);
                dataIndex += 4;
            }
    
            // 数据量最大位10kb
            if(dataLength > 10240) {
                this.send("warning : data limit 10kb");
            } else {
                // dataIndex 位数据位的起始位置, datalength 位数据长度, maskedData 为二进制的解密数据
                this.stat = {
                    index: dataIndex,
                    totalLength: dataLength,
                    length: dataLength,
                    maskedData: maskedData,
                    opcode: parseInt(data[0].toString(16).split("")[1], 16),
                }
            }
    }
    
    
    
    // 解数据帧
    function decode(data, key) {
        getHandleDateState.apply(this, [data]);
        let stat;
        this.datas;
    
        if(!(stat = this.stat)) return;
    
        // 如果opcode为9, 则发送pong响应, 如果opcode为10则置pingtimes 为0
        if(stat.opcode === 9 || stat === 10) {
            (stat.opcode === 9) ? (this.sendPong()) : (this.pingTimes = 0);
            this.reset();
            return;
        } 
        else if(stat.opcode === 8) {
            console.log(key)
            socketMap.delete(key);
            this.end();
            return;
        }
    
        let result;
        if (stat.maskedData) {
            result = Buffer.alloc(data.length-stat.index);
            for (var i = stat.index, j = 0; i < data.length; i++, j++) {
                //对每个字节进行异或运算,masked是4个字节,所以%4,借此循环
                result[j] = data[i] ^ stat.maskedData[j % 4];
            }
        } else {
            result = data.slice(stat.index, data.length);
        }
    
        this.datasd = result;
        stat.length -= (data.length - stat.index);
        //当长度为0,说明当前帧为最后帧
        if (stat.length == 0) {
            var buf = Buffer.concat(this.datas, stat.totalLength);
            console.log(stat.opcode)
            if (stat.opcode == 8) {
                this.close(buf.toString());
            } else {
                this.emit("message", buf.toString());
            }
    
            this.reset();
        }
        // 打印客户端信息
        console.log(this.datasd.toString())
    }
    

    心跳检查

    由于websocket 不进行交互会关闭通道所以,才有了心跳检查。

    // 心跳检测
    function checkHeartBeat (index) {
        this.pingTimes = 0; // 记录心跳次数
    
        let c =  setInterval(() => {
           if(this.pingTimes > 4) {
               this.end();
               socketMap.delete(index);
               clearInterval(c);
           } else {
               sendPring.apply(this, []);
               this.pingTimes++;
           }
        }, 5000);
    }
    
    // 发送心跳
    function sendPring() {
        this.write(Buffer.from(['0x89', '0x0']))
    }
    

    完整代码(服务端)

    // 创建websocket
    const http = require("http");
    const crypto = require("crypto")
    
    const port = 8091;
    const MAGIC_STRINC = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; // 固定值
    let socketMap = new Map();
    let count = 0;
    
    let server = http.createServer();
    
    
    // http服务器部分
    server.on("request", (req, res) => {
        res.end("websocket test");
    })
    
    // upgrade 请求(升级通讯协议)
    server.on("upgrade", (req, socket, head) => {
        // 加密 Sec-Websocket-Accept 值
        const swa = crypto.createHash('sha1').update(req.headers['sec-websocket-key'] + MAGIC_STRINC).digest('base64');
    
        // 构造响应头部
        let resHeaders = ([
            'HTTP/1.1 101 Switching Protocols',
            'Upgrade: websocket',
            'Connection: Upgrade',
            `Sec-Websocket-Accept: ${swa}`
        ]).concat('','').join('\r\n');
    
        socket.write(resHeaders);
        crateSocketMap(socket);
    })
    
    // socket Map
    function crateSocketMap (socket) {
        let index = count++;
        socketMap.set(index, socket);
    
        let number = parseInt(Math.random() * 10); 
        let c = setTimeout(() => {
            if(socketMap.get(index)) {
                createDate.call(socket, number.toString());
                number++;
            } else {
                clearInterval(c)
            }
        }, 2000);
    
        socket.on("data", (data) => {
            decode.apply(socket, [data, index]);
        })
    
        socket.on("error", (err) => {
            console.log(err)
        })
    
        checkHeartBeat.apply(socket, [index]);
    }
    
    // 心跳检测
    function checkHeartBeat (index) {
        this.pingTimes = 0; // 记录心跳次数
    
        let c =  setInterval(() => {
           if(this.pingTimes > 4) {
               this.end();
               socketMap.delete(index);
               clearInterval(c);
           } else {
               sendPring.apply(this, []);
               this.pingTimes++;
           }
        }, 5000);
    }
    
    // 发送心跳
    function sendPring() {
        this.write(Buffer.from(['0x89', '0x0']))
    }
    
    
    
    
    // 解数据帧
    function decode(data, key) {
        getHandleDateState.apply(this, [data]);
        let stat;
        this.datas;
    
        if(!(stat = this.stat)) return;
    
        // 如果opcode为9, 则发送pong响应, 如果opcode为10则置pingtimes 为0
        if(stat.opcode === 9 || stat === 10) {
            (stat.opcode === 9) ? (this.sendPong()) : (this.pingTimes = 0);
            this.reset();
            return;
        } 
        else if(stat.opcode === 8) {
            console.log(key)
            socketMap.delete(key);
            this.end();
            return;
        }
    
        let result;
        if (stat.maskedData) {
            result = Buffer.alloc(data.length-stat.index);
            for (var i = stat.index, j = 0; i < data.length; i++, j++) {
                //对每个字节进行异或运算,masked是4个字节,所以%4,借此循环
                result[j] = data[i] ^ stat.maskedData[j % 4];
            }
        } else {
            result = data.slice(stat.index, data.length);
        }
    
        this.datasd = result;
        stat.length -= (data.length - stat.index);
        //当长度为0,说明当前帧为最后帧
        if (stat.length == 0) {
            var buf = Buffer.concat(this.datas, stat.totalLength);
            console.log(stat.opcode)
            if (stat.opcode == 8) {
                this.close(buf.toString());
            } else {
                this.emit("message", buf.toString());
            }
    
            this.reset();
        }
        // 打印客户端信息
        console.log(this.datasd.toString())
    }
    
    
    // 获取客户端数据状态
    function getHandleDateState(data) {
            let dataIndex = 2; // 数据索引 应为第一个字节和第二个字节肯定不是数据,所以数据从初始值2开始
            let secondeByte = data[1]; // 代表masked位和可能是payloadLength位的第二个字节
            let hasMask = secondeByte >= 128; // 如果大于大于或等一128, 说明masked为1
    
            let dataLength, maskedData;
    
            // 如果数据为126, 则后面16位长度为数据位,如果为127 则后面64位长度的数据为数据长度
            if(secondeByte == 126) {
                dataIndex += 2;
                dataLength = data.readUInt16BE(2);
    
            } else if(secondeByte == 127) {
                dataIndex += 8;
                dataLength = data.readUInt32BE(2) + data.readUInt32BE(6);
            } else {
                dataLength = secondeByte;
            }
    
            // 如果有掩码, 则获取32位的二进制masking key, 同时更新index
            if(hasMask) {
                maskedData = data.slice(dataIndex, dataIndex + 4);
                dataIndex += 4;
            }
    
            // 数据量最大位10kb
            if(dataLength > 10240) {
                this.send("warning : data limit 10kb");
            } else {
                // dataIndex 位数据位的起始位置, datalength 位数据长度, maskedData 为二进制的解密数据
                this.stat = {
                    index: dataIndex,
                    totalLength: dataLength,
                    length: dataLength,
                    maskedData: maskedData,
                    opcode: parseInt(data[0].toString(16).split("")[1], 16),
                }
            }
    }
    
    // 创建数据帧
    function createDate(data) {
        let dataType = Buffer.isBuffer(data); // 判断数据是否是buffer类型
        let dataBuf, // 需要发送的二进制数据
            dataLength, // 数据真实长度
            dataIndex = 2; // 数据的起始位置
        let frame; // 用来存储封装好的数据帧
    
        if(dataType) {
            dataBuf = data;
        } else {
            dataBuf = Buffer.from(data);
        }
    
        dataLength = dataBuf.byteLength;
    
        // 计算payload data在frame中的起始位置
        dataIndex = dataIndex + (dataLength > 65535 ? 8 : (dataLength > 125 ? 2 : 0));
        // 创建多大空间
        frame = new Buffer.alloc(dataIndex + dataLength);
    
        // 第一个字节, fin = 1, opcode = 1
        frame[0] = parseInt(10000001, 2);
    
        // 长度超过65535 的由8个字节表示
        if(dataLength > 65535) {
            frame[1] = 127; // 第二个字节
            frame.writeUInt32BE(0, 2); // (值, 写入之前要跳过的位置)
            frame.writeUInt32BE(dataLength, 6);
        } else if (dataLength > 125) {
            frame[1] = 126;
            frame.writeUInt16BE(dataLength, 2);
        } else {
            frame[1] = dataLength;
        }
    
        // 发送给您客户端的数据
        frame.write(dataBuf.toString(), dataIndex); // (数据, 数据写入到buffer的位置)
    
        // this.write() == socket.write()  发送数据给客户端
        this.write(frame);
    
    }
    
    
    // 启动服务器
    server.listen(port, ()=> {
        let dateTime = 
            (new Date()).getFullYear() +
            "-" + ((new Date()).getMonth() + 1) +
            "-" + (new Date()).getDate() +
            " " + (new Date()).getHours() +
            ":" + (new Date()).getMinutes() +
            ":" + (new Date()).getSeconds();
        console.log(`${dateTime} server start success: 127.0.0.1:${port}`)
    })
    

    参考

    websocket与和他http的区别

    基于node实现websocket协议

    使用nodeJS在HTTP上实现WebSocket

    如何让我的服务器返回正确的Sec-WebSocket-Accept标头值

    学习WebSocket协议—从顶层到底层的实现原理

    websocket 协议帧 解析

    nodejs实现Websocket的数据接收发送

    相关文章

      网友评论

          本文标题:WebSocket 的实现

          本文链接:https://www.haomeiwen.com/subject/nmtwjktx.html