Websocket在JavaScript中操作字节序 之 客户端
在上一篇文章中,把页面的websocket编码写好了,那么服务端又该如何实现呢?由于该文是在上上篇demo中修改的,所以不全的代码还请参照:SpringBoot 2 整合 Netty 实现基于 DTU 的 TCP 服务器 之 服务端
SpringBoot 2 整合 Netty 实现基于 DTU 的 TCP 服务器 之 客户端
第一,在pom文件中加入websocket的引用;
<!-- websocket依赖 -->
<dependency>
<groupId>javax.websocket</groupId>
<artifactId>javax.websocket-api</artifactId>
<scope>provided</scope>
</dependency>
第二,编解码;
websocket发出的请求,其协议是BinaryWebSocketFrame,和netty的client发出请求的协议是不一样的,这里需要使用BinaryWebSocketFrame协议接收;
BinaryWebsocketDecoder解码类:
package com.example.im.codec;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.example.instant.ProtoInstant;
import com.example.util.CharacterConvert;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
/**
* 解析websocket提交的二进制信息
* @author 程就人生
* @date 2019年11月27日
* @Description
*
*/
@ChannelHandler.Sharable
public class BinaryWebsocketDecoder extends MessageToMessageDecoder<BinaryWebSocketFrame> {
private static final Logger log = LoggerFactory.getLogger(BinaryWebsocketDecoder.class);
@Override
protected void decode(ChannelHandlerContext ctx, BinaryWebSocketFrame msg, List<Object> out) throws Exception {
log.info("服务器接收到二进制消息,消息长度:[{}]", msg.content().capacity());
//获取BinaryWebSocketFrame内容
ByteBuf buf = msg.content();
buf.markReaderIndex();
//判断获取到的数据是否够字头,不沟通字头继续往下读
//字头:1位,数据串总长度:2位
if (buf.readableBytes() < ProtoInstant.FILED_LEN) {// 不够包头
log.info("不够包头,继续读!");
return;
}
//读取字头1位
int fieldHead = CharacterConvert.byteToInt(buf.readByte());
if(fieldHead != ProtoInstant.FIELD_HEAD){
log.error("字头不对:" + ctx.channel().remoteAddress());
ctx.close();
return;
}
// 读取传送过来的消息的长度2位。
int length = CharacterConvert.shortToInt(buf.readShort());
if (length < 0) {// 非法数据,关闭连接
log.error("数据长度为0,非法数据,关闭连接!" + ctx.channel().remoteAddress());
ctx.close();
}
// 读到的消息体长度如果小于传送过来的消息长度,减去字头1位,数据长度2位
int dataLength = length - ProtoInstant.FILED_LEN;
if (dataLength > buf.readableBytes()) {
// 重置读取位置
buf.resetReaderIndex();
return;
}
byte[] array;
if (buf.hasArray()) {
log.info("堆缓冲");
// 堆缓冲
ByteBuf slice = buf.slice();
array = slice.array();
} else {
log.info("直接缓冲");
// 直接缓冲
array = new byte[dataLength];
buf.readBytes(array, 0, dataLength);
}
if (array != null) {
msg.retain();
out.add(array);
}
}
}
在编码时,也需要处理一下,编码成websocket可以接收的类,BinaryWebsocketEncoder编码类:
package com.example.im.codec;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.example.instant.ProtoInstant;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
/**
* websocket消息的发送
* @author 程就人生
* @date 2019年12月6日
* @Description
*
*/
public class BinaryWebsocketEncoder extends MessageToMessageEncoder<byte[]>{
private static Logger log = LoggerFactory.getLogger(BinaryWebsocketEncoder.class);
@Override
protected void encode(ChannelHandlerContext ctx, byte[] msg, List<Object> out) throws Exception {
ByteBuf byteBuf = Unpooled.directBuffer(ProtoInstant.FILED_LEN + msg.length);
// 字头(1位)
byteBuf.writeByte(ProtoInstant.FIELD_HEAD);
// 数据长度(2位),字头1位+数据长度2位+数据位(包含校验1位)
byteBuf.writeShort(ProtoInstant.FILED_LEN + msg.length);
//消息体,包含我们要发送的数据
byteBuf.writeBytes(msg);
out.add(new BinaryWebSocketFrame(byteBuf));
log.info("websocket二进制数据出站了了了.....");
}
}
第三,websocket第一次连接http请求处理类;
websocket请求服务器,有一个流程,先发起http请求打开连接,然后再转成tcp长连接;只有在第一次请求的时候,发起的是http短连接,连接成功后就转成tcp连接;所以,要先对第一次请求做处理。
package com.example.im.handler;
import org.springframework.stereotype.Component;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
/**
* websocket连接操作,只是连接
* @author 程就人生
* @date 2019年11月22日
* @Description
*
*/
@Component
@ChannelHandler.Sharable
public class WebsocketConnectHandler extends SimpleChannelInboundHandler<Object>{
private WebSocketServerHandshaker handshaker;
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
try{
//http请求和tcp请求分开处理
if(msg instanceof HttpRequest){
//连接后的处理
handlerHttpRequest(ctx,(HttpRequest) msg);
}else{
//其他的操作交给下面的Handler操作去做
ctx.fireChannelRead(msg);
}
}catch(Exception e){
e.printStackTrace();
}
}
/**
* wetsocket第一次连接握手
* @param ctx
* @param msg
*/
@SuppressWarnings("deprecation")
private void handlerHttpRequest(ChannelHandlerContext ctx, HttpRequest req) {
// http 解码失败,如果请求头不是升级到websocket,则响应403
if(!req.getDecoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))){
sendHttpResponse(ctx, (FullHttpRequest) req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.BAD_REQUEST));
}
//可以通过url获取其他参数
WebSocketServerHandshakerFactory factory =
new WebSocketServerHandshakerFactory("ws://"+req.headers().get("Host")+"/"+req.getUri()+"",null,false
);
handshaker = factory.newHandshaker(req);
Channel channel = ctx.channel();
if(handshaker == null){
WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(channel);
}else{
//进行连接,握手
handshaker.handshake(channel, req);
//TODO
//添加监听事件
channel.newSucceededFuture().addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
if (future.isSuccess()) {
}
}
});
}
}
@SuppressWarnings("deprecation")
private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res) {
// 返回应答给客户端
if (res.getStatus().code() != 200) {
ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8);
res.content().writeBytes(buf);
buf.release();
}
// 如果是非Keep-Alive,关闭连接
ChannelFuture f = ctx.channel().writeAndFlush(res);
if (!HttpHeaders.isKeepAlive(req) || res.getStatus().code() != 200) {
f.addListener(ChannelFutureListener.CLOSE);
}
}
}
第三,由于后台是对两种协议请求进行处理,这里加了一个入口类;
入口类,主要是区分发起请求的客户端,是websocket客户端,还是netty的客户端,请求的客户端不一样,协议不一样,编解码也不一样。
入口类EntranceHandler :
package com.example.im.handler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import com.example.instant.ProtoInstant;
import com.example.util.CharacterConvert;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
* 总入口handler(websocket、netty client的判断)
* @author 程就人生
* @date 2019年12月5日
* @Description
*
*/
@Service("entranceHandler")
@ChannelHandler.Sharable
public class EntranceHandler extends SimpleChannelInboundHandler<Object>{
private static Logger log = LoggerFactory.getLogger(EntranceHandler.class);
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("....首次连接的判断....");
ByteBuf byteBuf = (ByteBuf) msg;
// 标记一下当前的readIndex的位置
byteBuf.markReaderIndex();
// 判断包头长度
if (byteBuf.readableBytes() < ProtoInstant.FILED_LEN) {// 不够包头
return;
}
// 读取字头
int head = CharacterConvert.byteToInt(byteBuf.readByte());
byteBuf.resetReaderIndex();
if (head == ProtoInstant.FIELD_HEAD) {
log.info("....节点间的/netty client的连接建立....");
//http请求响应
ctx.channel().pipeline().remove("httpRequestDecoder");
ctx.channel().pipeline().remove("httpResponseEncoder");
//websocket连接的建立
ctx.channel().pipeline().remove("websocketConnectHandler");
//websocket编码解码
ctx.channel().pipeline().remove("binaryDeCoder");
ctx.channel().pipeline().remove("binaryEncoder");
} else {
log.info("....websocket连接建立....");
ctx.channel().pipeline().remove("bdeCoder");
ctx.channel().pipeline().remove("benCoder");
}
byteBuf.retain();
ctx.fireChannelRead(byteBuf);
//删除首次连接通道,一个连接只用到一次
ctx.channel().pipeline().remove(this);
}
}
第四,netty服务端加入编解码及处理类;
package com.example.im;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import com.example.im.codec.BinaryWebsocketDecoder;
import com.example.im.codec.BinaryWebsocketEncoder;
import com.example.im.codec.ByteArrayDecoder;
import com.example.im.codec.ByteArrayEncoder;
import com.example.im.handler.EntranceHandler;
import com.example.im.handler.ExceptionHandler;
import com.example.im.handler.HeartBeatServerHandler;
import com.example.im.handler.LoginRequestHandler;
import com.example.im.handler.WebsocketConnectHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
/**
* netty dtu服务器
* @author 程就人生
* @date 2020年7月13日
* @Description
*
*/
@Component
public class SocketServer {
private static Logger log = LoggerFactory.getLogger(SocketServer.class);
private final EventLoopGroup bossGroup = new NioEventLoopGroup();
//开启两个线程池
private final EventLoopGroup workGroup = new NioEventLoopGroup();
//启动装饰类
private final ServerBootstrap serverBootstrap = new ServerBootstrap();
//本机的ip地址
private String ip;
//启动端口获取
@Value("${server.port}")
private int port;
@Autowired
private LoginRequestHandler loginRequestHandler;
@Autowired
private ExceptionHandler exceptionHandler;
@Autowired
private WebsocketConnectHandler websocketConnectHandler;
//入口handler
@Autowired
private EntranceHandler entranceHandler;
/**
* 启动服务
* @param port
*/
public void start(){
try {
//获取本机的ip地址
ip = InetAddress.getLocalHost().getHostAddress();
} catch (UnknownHostException e1) {
e1.printStackTrace();
}
serverBootstrap.group(bossGroup,workGroup)
//非阻塞
.channel(NioServerSocketChannel.class)
//连接缓冲池的大小
.option(ChannelOption.SO_BACKLOG, 1024)
//设置通道Channel的分配器
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
//总入口handler,区分是websocket连接还是netty client的连接
pipeline.addFirst("entranceHandler", entranceHandler);
//添加编解码和处理器(节点间通讯用)
pipeline.addLast("bdeCoder", new ByteArrayDecoder());
pipeline.addLast("benCoder", new ByteArrayEncoder());
// HttpServerCodec:将请求和应答消息解码为HTTP消息
pipeline.addLast("httpRequestDecoder", new HttpRequestDecoder());
pipeline.addLast("httpResponseEncoder", new HttpResponseEncoder());
//建立连接(web端http的握手)
pipeline.addLast("websocketConnectHandler", websocketConnectHandler);
//websocket解码、编码
pipeline.addLast("binaryDeCoder", new BinaryWebsocketDecoder());
pipeline.addLast("binaryEncoder", new BinaryWebsocketEncoder());
//管理后台登录
pipeline.addLast("loginHandler", loginRequestHandler);
//心跳检测
pipeline.addLast("heartBeat", new HeartBeatServerHandler());
//异常处理
pipeline.addLast("exception", exceptionHandler);
}
});
ChannelFuture channelFuture = null;
//启动成功标识
boolean startFlag = false;
//启动失败时,多次启动,直到启动成功为止
while(!startFlag){
try{
channelFuture = serverBootstrap.bind(port).sync();
startFlag = true;
}catch(Exception e){
log.info("端口号:" + port + "已被占用!");
port++;
log.info("尝试一个新的端口:" + port);
//重新便规定端口号
serverBootstrap.localAddress(new InetSocketAddress(port));
}
}
//服务端启动监听事件
channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
//启动成功后的处理
if (future.isSuccess()) {
log.info("netty dtu服务器启动成功,Started Successed:" + ip + ":" + port);
} else {
log.info("netty dtu服务器启动失败,Started Failed:" + ip + ":" + port);
}
}
});
try {
// 7 监听通道关闭事件
// 应用程序会一直等待,直到channel关闭
ChannelFuture closeFuture = channelFuture.channel().closeFuture();
closeFuture.sync();
} catch (Exception e) {
e.printStackTrace();
log.error("发生其他异常", e);
} finally {
// 8 优雅关闭EventLoopGroup,
// 释放掉所有资源包括创建的线程
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
最后,测试,完美验收;
分别启动客户端和服务端,点击客户端的按钮,发起websocket请求,看看能否登陆成功,并保持心跳。
客户端接收的信息
网友评论