前几天,有人在简书上给我留言,三年前写的一个关于 WebSocket 的 demo,在传输文本遇到中文时乱码了。这次维护旧项目刚好把 WebSocket 撸一遍,顺便把网友说的发送文本的功能实现。先上个图,提前对本文做了什么有所了解。
![](https://img.haomeiwen.com/i3816895/fa845283692ee284.png)
一、WebSocket 简介
WebSocket 是由 Web + Socket组成,意思就是在 Web 浏览器上使用的 Socket。
Web 浏览器一直围绕着 http 的请求/响应模式,请求一次响应一次。如果客户端要和服务器不停地进行交互怎么办?只能使用长轮询。长轮询还是离不开请求/响应模式,而且非常耗性能。
WebSocket 就是将网络套接字引入到 Web 浏览器,在 Web 浏览器和服务器之间建立长连接,双方随时都可以发送数据给对方,代替原来的请求/响应模式。
WebSocket 是 HTML5 提供的一种浏览器与服务器进行全双工通信的网络技术,目的就是取代长轮询。WebSocket 基于 TCP 双向全双工进行消息传递,在同一时刻,既可以发送消息,也可以接收消息。 HTTP 协议的特点和 WebSocket 的特点有必要再了解一下。HTTP 的特点:
- HTTP 协议为半双工协议。半双工协议是指数据可以在客户端和服务器端两个方向上传输,但是不能同时传输。也就是在同一时刻,只有一个方向上的数据传送。
- HTTP 消息包含消息头、消息体、换行符等等,通常又采用文本方式传输,相较于二进制通信协议,冗长繁琐。
- 使用长轮询实现消息推送,需要浏览器和服务器之间不停交互,消息冗长,而且容易被黑客利用。
WebSocket 的特点:
-
单一的 TCP 连接,采用全双工模式通信;
-
对代理、防火墙和路由器透明;
-
无头部信息、Cookie 和身份验证;
-
无安全开销;
-
通过 “ping/pong” 帧保持链路激活;
-
服务器可以主动传递消息给客户端,不需要客户端轮询。
二、WebSocket 使用
建立 WebSocket 的连接时,客户端浏览器首先向服务器发起一个 HTTP 请求。这个 HTTP 请求和普通的 HTTP 请求不同,包含一些附加头信息。附加头信息 “Upgrade:WebSocket” 表明这是一个申请协议升级的 HTTP 请求。
服务器端会解析附加头信息,然后生成应答信息返回给浏览器,客户端浏览器和服务器的 WebSocket 连接就建立好了。双方可以通过这个通道自由地传递信息,并且这个连接会持续到浏览器或服务器的某一方主动关闭。
客户端的重连机制。在服务端和客户端通信期间,如果发生网络故障,客户端和服务器端失联,这就需要客户端主动发起重连操作,直到连接服务器成功。
链路的关闭。服务器端遇到客户端主动关闭和异常关闭的情况,就需要监听客户端链接的关闭,以便于根据业务及时作出响应。
关于心跳。要保持客户端和服务器端的长连接,在没有数据发送时,需要使用心跳机制来维持长连接。
三、服务器端编码
![](https://img.haomeiwen.com/i3816895/ea8541388132c74a.png)
在 61 行以匿名内部类的方法加入编解码处理器和业务处理器。
在 67 行加入了 HttpServerCodec 编解码类,它继承了 CombinedChannelDuplexHandler 类,包含了 HttpRequestDecoder 解码类和 HttpResponseEncoder 编码类,并且实现了 HttpServerUpgradeHandler.SourceCodec 接口。
HttpServerCodec = HttpRequestDecoder + HttpResponseEncoder + HttpServerUpgradeHandler.SourceCodec。
到底是使用合成的编解码类,还是使用分开的编解码类,主要看业务需要。
![](https://img.haomeiwen.com/i3816895/c40c4286a2efb0d2.png)
在 94 行,对服务器的启动进行监听。
![](https://img.haomeiwen.com/i3816895/9a6ba37820b06c6c.png)
客户端第一次请求,发起的是 http 协议,运行到 138 行的判断里。升级后的WebSocket 发起的是 WebSocketFrame,运行到 141 行的判断里。
![](https://img.haomeiwen.com/i3816895/9cffc5d5fd1cb269.png)
在 handleHttpRequest 方法中,判断进来的 http 是不是升级到 websocket ,如果不是则返回 BAD_REQUEST 响应。如果是,则运行到 198 行,使用 WebSocketServerHandshakerFactory 构造握手信息并返回。
![](https://img.haomeiwen.com/i3816895/df6fb45dd103b5c7.png)
在 handleWebSocketFrameRequest 方法中,对请求过来的 WebSocketFrame 进行处理。WebSocketFrame 是一个抽象类,其下还有 6 个子类。通过判断接收到的对象是哪个子类的实例来进行对应的处理。
![](https://img.haomeiwen.com/i3816895/42da24ed7c4dbbbc.png)
接下来看完整的代码。
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.ContinuationWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
/**
* WebSocket编解码示例
* @author 程就人生
* @Date
*/
public class WebSocketServer {
private static Logger log = LoggerFactory.getLogger(WebSocketServer.class);
public void run(final int port) throws Exception{
// 专门处理新的连接
EventLoopGroup bossGroup = new NioEventLoopGroup();
// 专门处理I/O读写事件
EventLoopGroup workerGroup = new NioEventLoopGroup();
try{
// 服务器启动辅助类
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
// 以匿名内部类的方法加入编解码处理器和业务处理器
.childHandler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 首次请求为http协议,将请求和应答消息编码或解码成Http消息
pipeline.addLast("http-codec", new HttpServerCodec());
// 将http消息的多个部分组合成一条完整的http消息
pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
// 向客户端发送html5文件
pipeline.addLast("http-chunked", new ChunkedWriteHandler());
// 进行设置心跳检测
pipeline.addLast(new IdleStateHandler(20,10,30, TimeUnit.SECONDS));
// 业务处理类
pipeline.addLast("handler", new WebSocketHandler());
}
});
ChannelFuture channelFuture = null;
// 启动成功标识
boolean startFlag = false;
// 启动失败时,多次启动,直到启动成功为止
while(!startFlag){
try{
channelFuture = serverBootstrap.bind(port).sync();
startFlag = true;
}catch(Exception e){
log.info("端口号:" + port + "已被占用!");
log.info("尝试一个新的端口:" + port + 1);
//重新便规定端口号
serverBootstrap.localAddress(new InetSocketAddress(port + 1));
}
}
//服务端启动监听事件
channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() {
public void operationComplete(Future<? super Void> future) throws Exception {
//启动成功后的处理
if (future.isSuccess()) {
log.info("netty websocket 服务器端启动成功,Started Successed:{}", port);
} else {
log.info("netty websocket 服务器端启动失败,Started Failed:{}", port);
}
}
});
ChannelFuture closeFuture = channelFuture.channel().closeFuture();
closeFuture.sync();
}finally{
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] argo){
try {
new WebSocketServer().run(8080);
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 业务处理类
* @author 程就人生
* @Date
*/
class WebSocketHandler extends SimpleChannelInboundHandler<Object>{
private static Logger log = LoggerFactory.getLogger(WebSocketHandler.class);
private WebSocketServerHandshaker handshaker;
// 对请求消息计数
private static int count = 0;
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
// 传统的http接入
if(msg instanceof FullHttpRequest){
handleHttpRequest(ctx, (FullHttpRequest) msg);
// WebSocket接入
} else if(msg instanceof WebSocketFrame){
handleWebSocketFrameRequest(ctx, (WebSocketFrame) msg);
}
}
private void handleWebSocketFrameRequest(ChannelHandlerContext ctx, WebSocketFrame frame) {
// 判断是否是关闭链路的指令
if(frame instanceof CloseWebSocketFrame){
log.info("【"+ctx.channel().remoteAddress()+"】已关闭(服务器端)");
handshaker.close(ctx.channel(), (CloseWebSocketFrame)frame.retain());
return;
}
// 判断是否是ping消息
if(frame instanceof PingWebSocketFrame){
log.info("【ping】");
PongWebSocketFrame pong = new PongWebSocketFrame(frame.content().retain());
ctx.channel().writeAndFlush(pong);
return ;
}
// 判断实时是pong消息
if(frame instanceof PongWebSocketFrame){
log.info("【pong】");
return ;
}
// 判断是不是片段
if(frame instanceof ContinuationWebSocketFrame){
log.info("断续帧");
return;
}
// // 判断是不是发送的正常消息
// if(frame instanceof BinaryWebSocketFrame){
// // 交给下面的handler处理
// ctx.fireChannelRead(frame);
// }
// 本例子只支持文本信息
if(!(frame instanceof TextWebSocketFrame)){
log.info("【不支持二进制】");
throw new UnsupportedOperationException("不支持二进制");
}
count++;
// 返回信息应答
String request = ((TextWebSocketFrame)frame).text();
log.info("接收到的信息:{}:{}", count, request);
// 返回应答消息给客户端端
ctx.channel().writeAndFlush(new TextWebSocketFrame("你好,客户端" + count));
}
private void handleHttpRequest(final ChannelHandlerContext ctx, FullHttpRequest req) {
// http 解码失败,返回 HTTP 异常
if(!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))){
sendHttpResponse(ctx, (FullHttpRequest) req,new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.BAD_REQUEST));
return;
}
// 构造握手响应返回,可以通过url获取其他参数
WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory(
"ws://"+req.headers().get("Host")+"/"+req.uri()+"",null,false
);
handshaker = factory.newHandshaker(req);
if(handshaker == null){
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
}else{
// 进行连接
handshaker.handshake(ctx.channel(),req);
// 添加监听事件
ctx.channel().newSucceededFuture().addListener(new GenericFutureListener<Future<? super Void>>() {
public void operationComplete(Future<? super Void> future) throws Exception {
if (future.isSuccess()) {
log.info("加入一个websocket客户端{}!", ctx.channel().remoteAddress());
}
}
});
}
}
/**
* 返回应答消息给浏览器
* @param ctx
* @param req
* @param res
*/
private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) {
// 返回应答消息给客户端
if(res.status().code() != 200){
ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);
res.content().writeBytes(buf);
buf.release();
}
// 如果是非Keep-alive,关闭连接
ChannelFuture channelFuture = ctx.channel().writeAndFlush(res);
if(!ctx.channel().isActive() || res.status().code() != 200){
channelFuture.addListener(ChannelFutureListener.CLOSE);
}
}
/**
* 这里是保持服务器与客户端长连接 进行心跳检测 避免连接断开
* @param ctx
* @param evt
* @throws Exception
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if(evt instanceof IdleStateEvent){
IdleStateEvent stateEvent = (IdleStateEvent) evt;
PingWebSocketFrame ping = new PingWebSocketFrame();
switch (stateEvent.state()){
//读空闲(服务器端)
case READER_IDLE:
log.info("【"+ctx.channel().remoteAddress()+"】读空闲(服务器端)");
ctx.writeAndFlush(ping);
break;
//写空闲(客户端)
case WRITER_IDLE:
log.info("【"+ctx.channel().remoteAddress()+"】写空闲(客户端)");
ctx.writeAndFlush(ping);
break;
case ALL_IDLE:
log.info("【"+ctx.channel().remoteAddress()+"】读写空闲");
ctx.writeAndFlush(ping);
break;
}
}
}
/**
* 出现异常时
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
ctx.close();
log.info("【"+ctx.channel().remoteAddress()+"】已关闭(服务器端)");
}
/**
* 监听客户端正常关闭和异常关闭
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
ctx.close();
log.info("【"+ctx.channel().remoteAddress()+"】监听到客户端关闭!");
}
}
服务器端运行结果:
![](https://img.haomeiwen.com/i3816895/7b52685befbc9b9e.png)
四、客户端编码
![](https://img.haomeiwen.com/i3816895/5344c55ad1311350.png)
在客户端,通过 window.WebSocket 代码判断当前浏览器是否支持WebSocket 。如果支持 WebSocket ,在 32 行通过 new WebSocket("ws://localhost:8080/websocket") 初始化 WebSocket 对象,把 IP 和端口替换成服务器端的 IP 地址和端口。
在 33 行通过 onmessage 函数监听服务端返回的数据。在 37 行通过 onopen 函数监听链接是否打开。在 41 行通过 onclose 函数监听链接的关闭。在 45 行通过 onerror 函数监听链路中的错误。
![](https://img.haomeiwen.com/i3816895/058a3d8ff1f9468a.png)
当 WebSocket 的 readyState 等于 OPEN 时,链路正常。在链路正常的情况下,可以通过 send 方法给服务器端发送数据。
![](https://img.haomeiwen.com/i3816895/6f328c24363e7835.png)
一个简易的客户端页面。第一行是一个输入框,用来输入要发送给服务器端的文本。第二行的第 1 个按钮用于打开 WebSocket 链接,第 2 个按钮用于获取输入框的内容发送给服务器端,第 3 个按钮开启心跳发送,第 4 个按钮用于关闭 WebSocket链接。在链路发生异常时,客户端每隔 10s 重连一次服务器,直到连接上为止。下面看客户端完整的代码。
<!DOCTYPE html>
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<title>netty webSocket 测试</title>
</head>
<body style="text-align:center;">
<input type="text" name="msg" id="msg" value="你好,服务器" />
<br/>
<br/>
<button type="button" onclick="openWebSocket()" >打开WebSocket</button>
<button type="button" onclick="send()" >发送消息</button>
<button type="button" onclick="sendHeart()" >发送心跳信息</button>
<button type="button" onclick="closeWebSocket()" >关闭WebSocket</button>
<hr/>
<h3>webSocket客户端与服务器之间的消息交互</h3>
<div id="responseText" style="margin-left:40%;width:400px;height:300px;overflow:scroll;" ></div>
</body>
<script type="text/javascript" src="https://cdn.bootcss.com/jquery/3.4.1/jquery.min.js" ></script>
<script type="text/javascript">
var webSocket;
// 判断连接是否建立
var isOpen;
var tt;
// 和服务器建立连接
function openWebSocket(){
// 判断当前浏览器是否支持WebSocket
if(!window.WebSocket){
window.WebSocket = window.MozWebSocket;
}
if(window.WebSocket){
webSocket = new WebSocket("ws://localhost:8080/websocket");
webSocket.onmessage = function(event){
var html = "<span style='float:left;' >" + event.data + "</span>";
$("#responseText").html($("#responseText").html() + "<br/>" + html);
};
webSocket.onopen = function(event){
$("#responseText").html("打开WebSocket服务!");
isOpen = true;
};
webSocket.onclose = function(event){
$("#responseText").html($("#responseText").html() + "<br/> WebSocket关闭!");
isOpen = false;
}
webSocket.onerror = function() {
console.log('发生异常了');
//出错后重新连接
reconnect();
};
}
else{
alert("您所使用的浏览器不支持WebSocket协议!");
}
}
// 给服务器发送信息
function send(){
sendMsg($("#msg").val());
}
function sendMsg(val){
if(!window.WebSocket){
return;
}
if(webSocket.readyState == WebSocket.OPEN){
isOpen = true;
webSocket.send(val);
var html = "<span style='float:right;' >" + val + "</span>";
$("#responseText").html($("#responseText").html() + "<br/>" + html);
}else{
isOpen = false;
}
}
// 给服务器发送心跳信息
function sendHeart(){
setTimeout(sendHeart, 1000 * 40);
sendMsg("给服务器发送心跳");
}
// 关闭WebSocket
function closeWebSocket(){
if(!window.WebSocket){
return;
}
if(webSocket.readyState == WebSocket.OPEN){
webSocket.close();
}
}
//重新连接
function reconnect() {
if(isOpen) {
return;
};
isOpen = true;
//没连接上会一直重连,设置延迟避免请求过多
tt && clearTimeout(tt);
tt = setTimeout(function () {
openWebSocket();
isOpen = false;
}, 10000);
}
//心跳检测
var heartCheck = {
timeout: 20000,
timeoutObj: null,
serverTimeoutObj: null,
start: function(){
console.log(getNowTime() +" Socket 心跳检测");
var self = this;
this.timeoutObj && clearTimeout(this.timeoutObj);
this.serverTimeoutObj && clearTimeout(this.serverTimeoutObj);
this.timeoutObj = setTimeout(function(){
//这里发送一个心跳,后端收到后,返回一个心跳消息,
//onmessage拿到返回的心跳就说明连接正常
console.log(getNowTime() +' Socket 连接重试');
//socket.send("连接成功");
self.serverTimeoutObj = setTimeout(function() {
console.log(webSocket);
webSocket.close();
}, self.timeout);
}, this.timeout)
}
}
</script>
</html>
客户端运行结果,依次点击按钮。
![](https://img.haomeiwen.com/i3816895/97093600c5330206.png)
再看下服务器的控制台输出:
![](https://img.haomeiwen.com/i3816895/f42a04df0962df5a.png)
通过截图可以看到服务器接收到了客户端发送的消息,也监听到了客户端连接的关闭。
可能有朋友说,服务器端的心跳这样写不好吧。当然可以把心跳单独拎出去,与业务逻辑的处理进行解耦。干脆把业务逻辑也拎出去,这个类仅负责 WebSocket 的链接也是可以的。
本文通过一个简易的 Demo ,把可以发送文本信息的 WebSocket 走了一遍,希望对你有帮助。
网友评论