SpringBoot 2 整合 Netty 实现基于 DTU 的 TCP 服务器 之 服务端
SpringBoot 2 整合 Netty 实现基于 DTU 的 TCP 服务器 之 客户端
在上面的两篇文章中,使用SpringBoot2整合了基于DTU的TCP服务器,还可以进行客户端服务端通讯的模拟。但有一个场景未加考虑,那就是用户通过页面给DTU的服务端发送命令,不是通过后端,而是通过前端页面,这就用到了Websocket。
也曾使用Websocket写过即时通讯的客户端demo,那么使用Websocket如何操作字节序呢?不由得想起以前走过的一段弯路,在这段弯路里遇到了一个很好用的js工具类ByteBuffer,这个类就是为操作字节序而生的。
关于字节序,在一开始还不知道怎么称呼,称它为字节数组,这种叫法显然是错误的。正确的说法应该是字节序。不同数据类型的数据占用的字节大小是不一样的,不同数据类型的字节组成的一个序列。数组里面的数据类型则是同一种,字节序和字节数组根本就不是一个概念,千万不要再搞错了!
下面就看看在页面里如何使用websocket结合ByteBuffer发起及处理字节序的操作。
第一,web端页面代码;
页面上的代码很简单,调用封装好的websocket即可;流程也很简单,建立连接,发起登录操作,然后保持心跳;
<html xmlns:th="http://www.thymeleaf.org" xmlns:sec="http://www.thymeleaf.org/thymeleaf-extras-springsecurity5" >
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>测试</title>
</head>
<body>
<button onclick="clickWebsocket()" >按钮</button>
<script th:src="@{/js/websocket.js}"></script>
<script th:inline="javascript">
// webSocket连接
var webSocket;
function clickWebsocket(){
if (webSocket == null) { // 初次连接websocket
console.log("初次连接websocket...");
//进行登录操作
var param={"deviceId":2};
webSocket = new WsSocket(function () {
webSocket.login();
},param);
webSocket.createWebSocket();
} else {
console.log("webSocket连接已建立...");
}
}
</script>
</body>
</html>
第二,websocket核心代码;
这里对websocket做了一个封装,方便使用。建立连接、发起登录、保持心跳的代码都在这里。其他的业务逻辑根据需要也可以添加在这里。这段代码的核心,就是对字节序操作封包、解包,请求服务器时封包,处理服务器返回的数据时解包,随便加入一个byte、short、int类型的数据都很方便。
document.write("<script type='text/javascript' src='/js/ByteBuffer.js'></script>");
/**
* 字头,固定
*/
let FIELD_HEAD = 0XCC;
/**
* 长度(数据长度2位 + 1位校验位)
*/
let FILED_LEN = 3;
//心跳
let HEART_BEAT = 0;
//登录
let LOGIN = 1;
WsSocket = function (onOpen,param) {
this.socket = null;
//服务器地址
this.wsUrl = "ws://127.0.0.1:8502/websocket/";
//避免重复连接
this.lockReconnect = false;
//重连标识
this.reconnectFlag = null;
// 最大重连次数
this.maxReconnectNum = 3;
// 当前重连次数
this.currentReconnectNum = 0;
// 重连间隔,4s
this.reconnectTime = 4000;
// 心跳响应
this.heartCheckTimeoutObj = null;
// websocket默认是传输字符串的,需要改为arraybuffer二进制传输类型
this.binaryType = "arraybuffer";
// 连接建立后处理的方法
this.onOpen = onOpen;
//传递过来的参数
this.param = param;
// 对消息的处理过程
this.onMessage = function(evt) {};
// 建立连接
this.createWebSocket = function () {
try {
if (typeof (WebSocket) == "undefined") {
console.log("您的浏览器不支持WebSocket");
} else {
console.log("您的浏览器支持WebSocket");
}
this.socket = new WebSocket(this.wsUrl);
//websocket默认是传输字符串的,需要改为arraybuffer二进制传输类型
this.socket.binaryType = this.binaryType;
//初始化
this.init();
} catch (e) {
this.reconnect();
}
};
//重新连接
this.reconnect = function () {
let _self = this;
if (this.lockReconnect) {
return;
}
this.lockReconnect = true;
//没连接上会一直重连,设置延迟避免请求过多
this.reconnectFlag && clearTimeout(this.reconnectFlag);
this.reconnectFlag = setTimeout(function () {
if (_self.currentReconnectNum < _self.maxReconnectNum) { // 限制重连次数
console.log("第" + _self.currentReconnectNum + '次重连');
_self.currentReconnectNum++;
_self.createWebSocket();
} else { // 重连失败,弹出错误信息
console.log('重连失败!')
// TODO:页面弹出错误提示框
}
_self.lockReconnect = false;
}, _self.reconnectTime);
};
// websocket连接成功后,初始化配置
this.init = function () {
let _self = this;
this.socket.onclose = function () {
console.log('链接关闭');
_self.reconnect();
};
this.socket.onerror = function () {
console.log('发生异常了');
_self.reconnect();
};
this.socket.onopen = function () {
console.log("连接成功!");
_self.currentReconnectNum = 0; // 重置当前重连次数
_self.onOpen();
};
this.socket.onmessage = function (event) {
//把接收到的数据转换成ByteBuffer
var bytebuf = new ByteBuffer(event.data);
//var arr = bytebuf.byte().short().byte().byte().byte().unpack();//结尾调用解包方法;
var arr = bytebuf.byte().short().byte().unpack();//结尾调用解包方法;
console.log(arr);
if (arr[2] == LOGIN) { //登录成功
var arr1 = bytebuf.byte().byte().unpack();
console.log("已经成功登录~!" + arr1);
//登录成功后,开始发送心跳
_self.heartCheck();
}else if (arr[2] == HEART_BEAT) { // 消息类型是 [心跳响应]
//第二次读取
var arr1 = bytebuf.byte().byte().unpack();
console.log("HEART_BEAT" + arr1);
//继续发送心跳
_self.heartCheck();
}
};
};
// 登录操作
this.login = function() {
//组装消息
let reqMsg = new ByteBuffer();
//信息封装成二进制
reqMsg.byte(FIELD_HEAD).byte(LOGIN).byte(this.param.deviceId);
let dataLength = reqMsg.blength() + FILED_LEN; //加数据长度本身2位,加上校验位1位
//数据长度放在第二位
reqMsg.short(dataLength, 1);
//计算求和
let sum = this.sum([FIELD_HEAD, dataLength, LOGIN, this.param.deviceId]);
//取得低八位
let verify = this.getLow8(sum);
//校验位
reqMsg.byte(verify);
console.log("数据长度为:" + dataLength);
let buf = reqMsg.pack();
this.socket.send(buf);
};
//WebSocket心跳检测
this.heartCheck = function () {
let timeout = 50000; //心跳检测时间
let _self = this;
this.heartCheckTimeoutObj && clearTimeout(this.heartCheckTimeoutObj);
this.heartCheckTimeoutObj = setTimeout(function () {
//这里发送一个心跳,后端收到后,返回一个心跳消息,
//onmessage拿到返回的心跳就说明连接正常
console.log(_self.getNowTime() + ' socket心跳检测');
//发送心跳
let heartBeat = new ByteBuffer();
heartBeat.byte(FIELD_HEAD).byte(HEART_BEAT).byte(_self.param.deviceId);
let dataLength = heartBeat.blength() + FILED_LEN; //加数据长度本身2位,加上校验位1位
//数据长度放在第二位
heartBeat.short(dataLength, 1);
let sum = _self.sum([FIELD_HEAD, dataLength, HEART_BEAT, _self.param.deviceId]);
//获取低八位
let verify = _self.getLow8(sum);
//校验位
heartBeat.byte(verify);
console.log("数据长度为:" + dataLength);
let buf = heartBeat.pack();
_self.socket.send(buf);
}, timeout)
}
this.getNowTime = function () {
var myDate = new Date();
//获取当前年
var year = myDate.getFullYear();
//获取当前月
var month = myDate.getMonth() + 1;
//获取当前日
var date = myDate.getDate();
var h = myDate.getHours(); //获取当前小时数(0-23)
var m = myDate.getMinutes(); //获取当前分钟数(0-59)
var s = myDate.getSeconds();
return year + '-' + this.p(month) + "-" + this.p(date) + " " + this.p(h) + ':' + this.p(m) + ":" + this.p(s);
};
//处理时间的
this.p = function (s) {
return s < 10 ? '0' + s : s;
};
// int/byte/short型数据,低八位获取
this.getLow8 = function (number) {
return number & 0xff;
};
// 求和
this.sum = function (integers) {
let result = 0;
for (let i = 0; i < integers.length; i++) {
result += integers[i];
}
return result;
};
}
第三,这是ByteBuffer.js类,里面是对字节序的操作做了封装;
另外由于编码的需要,在里面加了blength()方法,用来返回已ByteBuffer的长度,这是很有必要的。
/*
* 构造方法
* 源码出处:https://github.com/play175/ByteBuffer
* @param org_buf 需要解包的二进制
* @param offset 指定数据在二进制的初始位置 默认是0
*/
ByteBuffer = function (arrayBuf, offset) {
var Type_Byte = 1;
var Type_Short = 2;
var Type_UShort = 3;
var Type_Int32 = 4;
var Type_UInt32 = 5;
var Type_String = 6;//变长字符串,前两个字节表示长度
var Type_VString = 7;//定长字符串
var Type_Int64 = 8;
var Type_Float = 9;
var Type_Double = 10;
var Type_ByteArray = 11;
var _org_buf = arrayBuf ? (arrayBuf.constructor == DataView ? arrayBuf : (arrayBuf.constructor == Uint8Array ? new DataView(arrayBuf.buffer, offset) : new DataView(arrayBuf, offset))) : new DataView(new Uint8Array([]).buffer);
var _offset = offset || 0;
var _list = [];
var _littleEndian = false;
//指定字节序 为BigEndian
this.bigEndian = function(){
_littleEndian = false;
return this;
};
//指定字节序 为LittleEndian
this.littleEndian = function(){
_littleEndian = true;
return this;
};
if (!ArrayBuffer.prototype.slice) {
ArrayBuffer.prototype.slice = function (start, end) {
var that = new Uint8Array(this);
if (end == undefined) end = that.length;
var result = new ArrayBuffer(end - start);
var resultArray = new Uint8Array(result);
for (var i = 0; i < resultArray.length; i++)
resultArray[i] = that[i + start];
return result;
}
}
function utf8Write(view, offset, str) {
var c = 0;
for (var i = 0, l = str.length; i < l; i++) {
c = str.charCodeAt(i);
if (c < 0x80) {
view.setUint8(offset++, c);
}
else if (c < 0x800) {
view.setUint8(offset++, 0xc0 | (c >> 6));
view.setUint8(offset++, 0x80 | (c & 0x3f));
}
else if (c < 0xd800 || c >= 0xe000) {
view.setUint8(offset++, 0xe0 | (c >> 12));
view.setUint8(offset++, 0x80 | (c >> 6) & 0x3f);
view.setUint8(offset++, 0x80 | (c & 0x3f));
}
else {
i++;
c = 0x10000 + (((c & 0x3ff) << 10) | (str.charCodeAt(i) & 0x3ff));
view.setUint8(offset++, 0xf0 | (c >> 18));
view.setUint8(offset++, 0x80 | (c >> 12) & 0x3f);
view.setUint8(offset++, 0x80 | (c >> 6) & 0x3f);
view.setUint8(offset++, 0x80 | (c & 0x3f));
}
}
}
function utf8Read(view, offset, length) {
var string = '', chr = 0;
for (var i = offset, end = offset + length; i < end; i++) {
var byte = view.getUint8(i);
if ((byte & 0x80) === 0x00) {
string += String.fromCharCode(byte);
continue;
}
if ((byte & 0xe0) === 0xc0) {
string += String.fromCharCode(
((byte & 0x0f) << 6) |
(view.getUint8(++i) & 0x3f)
);
continue;
}
if ((byte & 0xf0) === 0xe0) {
string += String.fromCharCode(
((byte & 0x0f) << 12) |
((view.getUint8(++i) & 0x3f) << 6) |
((view.getUint8(++i) & 0x3f) << 0)
);
continue;
}
if ((byte & 0xf8) === 0xf0) {
chr = ((byte & 0x07) << 18) |
((view.getUint8(++i) & 0x3f) << 12) |
((view.getUint8(++i) & 0x3f) << 6) |
((view.getUint8(++i) & 0x3f) << 0);
if (chr >= 0x010000) { // surrogate pair
chr -= 0x010000;
string += String.fromCharCode((chr >>> 10) + 0xD800, (chr & 0x3FF) + 0xDC00);
} else {
string += String.fromCharCode(chr);
}
continue;
}
throw new Error('Invalid byte ' + byte.toString(16));
}
return string;
}
function utf8Length(str) {
var c = 0, length = 0;
for (var i = 0, l = str.length; i < l; i++) {
c = str.charCodeAt(i);
if (c < 0x80) {
length += 1;
}
else if (c < 0x800) {
length += 2;
}
else if (c < 0xd800 || c >= 0xe000) {
length += 3;
}
else {
i++;
length += 4;
}
}
return length;
}
this.byte = function (val, index) {
if (arguments.length == 0) {
_list.push(_org_buf.getUint8(_offset,_littleEndian));
_offset += 1;
} else {
_list.splice(index != undefined ? index : _list.length, 0, { t: Type_Byte, d: val, l: 1 });
_offset += 1;
}
return this;
};
this.short = function (val, index) {
if (arguments.length == 0) {
_list.push(_org_buf.getInt16(_offset,_littleEndian));
_offset += 2;
} else {
_list.splice(index != undefined ? index : _list.length, 0, { t: Type_Short, d: val, l: 2 });
_offset += 2;
}
return this;
};
this.ushort = function (val, index) {
if (arguments.length == 0) {
_list.push(_org_buf.getUint16(_offset,_littleEndian));
_offset += 2;
} else {
_list.splice(index != undefined ? index : _list.length, 0, { t: Type_UShort, d: val, l: 2 });
_offset += 2;
}
return this;
};
this.int32 = function (val, index) {
if (arguments.length == 0) {
_list.push(_org_buf.getInt32(_offset,_littleEndian));
_offset += 4;
} else {
_list.splice(index != undefined ? index : _list.length, 0, { t: Type_Int32, d: val, l: 4 });
_offset += 4;
}
return this;
};
this.uint32 = function (val, index) {
if (arguments.length == 0) {
_list.push(_org_buf.getUint32(_offset,_littleEndian));
_offset += 4;
} else {
_list.splice(index != undefined ? index : _list.length, 0, { t: Type_UInt32, d: val, l: 4 });
_offset += 4;
}
return this;
};
/**
* 新加的方法,获取bytebuffer的长度
*/
this.blength = function(){
return _offset;
};
/**
* 变长字符串 前4个字节表示字符串长度
**/
this.string = function (val, index) {
if (arguments.length == 0) {
var len = _org_buf.getInt32(_offset,_littleEndian);
_offset += 4;
_list.push(utf8Read(_org_buf,_offset,len));
_offset += len;
} else {
var len = 0;
if (val) {
len = utf8Length(val);
}
_list.splice(index != undefined ? index : _list.length, 0, { t: Type_String, d: val, l: len });
_offset += len + 4;
}
return this;
};
/**
* 定长字符串 val为null时,读取定长字符串(需指定长度len)
**/
this.vstring = function (val, len, index) {
if (!len) {
throw new Error('vstring must got len argument');
return this;
}
if (val == undefined || val == null) {
var vlen = 0;//实际长度
for (var i = _offset; i < _offset + len; i++) {
if (_org_buf.getUint8(i) > 0) vlen++;
}
_list.push(utf8Read(_org_buf,_offset,vlen));
_offset += len;
} else {
_list.splice(index != undefined ? index : _list.length, 0, { t: Type_VString, d: val, l: len });
_offset += len;
}
return this;
};
this.int64 = function (val, index) {
if (arguments.length == 0) {
_list.push(_org_buf.getFloat64(_offset,_littleEndian));
_offset += 8;
} else {
_list.splice(index != undefined ? index : _list.length, 0, { t: Type_Int64, d: val, l: 8 });
_offset += 8;
}
return this;
};
this.float = function (val, index) {
if (arguments.length == 0) {
_list.push(_org_buf.getFloat32(_offset,_littleEndian));
_offset += 4;
} else {
_list.splice(index != undefined ? index : _list.length, 0, { t: Type_Float, d: val, l: 4 });
_offset += 4;
}
return this;
};
this.double = function (val, index) {
if (arguments.length == 0) {
_list.push(_org_buf.getFloat64(_offset,_littleEndian));
_offset += 8;
} else {
_list.splice(index != undefined ? index : _list.length, 0, { t: Type_Double, d: val, l: 8 });
_offset += 8;
}
return this;
};
/**
* 写入或读取一段字节数组
**/
this.byteArray = function (val, len, index) {
if (!len) {
throw new Error('byteArray must got len argument');
return this;
}
if (val == undefined || val == null) {
var arr = new Uint8Array(_org_buf.buffer.slice(_offset, _offset + len));
_list.push(arr);
_offset += len;
} else {
_list.splice(index != undefined ? index : _list.length, 0, { t: Type_ByteArray, d: val, l: len });
_offset += len;
}
return this;
};
/**
* 解包成数据数组
**/
this.unpack = function () {
return _list;
};
/**
* 打包成二进制,在前面加上4个字节表示包长
**/
this.packWithHead = function () {
return this.pack(true);
};
/**
* 打包成二进制
* @param ifHead 是否在前面加上4个字节表示包长
**/
this.pack = function (ifHead) {
_org_buf = new DataView(new ArrayBuffer((ifHead) ? _offset + 4 : _offset));
var offset = 0;
if (ifHead) {
_org_buf.setUint32(offset, _offset,_littleEndian);
offset += 4;
}
for (var i = 0; i < _list.length; i++) {
switch (_list[i].t) {
case Type_Byte:
_org_buf.setInt8(offset, _list[i].d);
offset += _list[i].l;
break;
case Type_Short:
_org_buf.setInt16(offset, _list[i].d,_littleEndian);
offset += _list[i].l;
break;
case Type_UShort:
_org_buf.setUint16(offset, _list[i].d,_littleEndian);
offset += _list[i].l;
break;
case Type_Int32:
_org_buf.setInt32(offset, _list[i].d,_littleEndian);
offset += _list[i].l;
break;
case Type_UInt32:
_org_buf.setUint32(offset, _list[i].d,_littleEndian);
offset += _list[i].l;
break;
case Type_String:
//前4个字节表示字符串长度
_org_buf.setUint32(offset, _list[i].l,_littleEndian);
offset += 4;
utf8Write(_org_buf,offset,_list[i].d);
offset += _list[i].l;
break;
case Type_VString:
utf8Write(_org_buf,offset,_list[i].d);
var vlen = utf8Length(_list[i].d);//字符串实际长度
//补齐\0
for (var j = offset + vlen; j < offset + _list[i].l; j++) {
_org_buf.setUint8(j, 0);
}
offset += _list[i].l;
break;
case Type_Int64:
_org_buf.setFloat64(offset, _list[i].d,_littleEndian);
offset += _list[i].l;
break;
case Type_Float:
_org_buf.setFloat32(offset, _list[i].d,_littleEndian);
offset += _list[i].l;
break;
case Type_Double:
_org_buf.setFloat64(offset, _list[i].d,_littleEndian);
offset += _list[i].l;
break;
case Type_ByteArray:
var indx = 0;
for (var j = offset; j < offset + _list[i].l; j++) {
if (indx < _list[i].d.length) {
_org_buf.setUint8(j, _list[i].d[indx]);
} else {//不够的话,后面补齐0x00
_org_buf.setUint8(j, 0);
}
indx++
}
offset += _list[i].l;
break;
}
}
return _org_buf.buffer;
};
/**
* 未读数据长度
**/
this.getAvailable = function () {
if (!_org_buf) return _offset;
return _org_buf.buffer.byteLength - _offset;
};
}
服务器端的代码,敬请期待下一篇文稿。
网友评论