美文网首页
【网络编程】Netty之Websocket编解码

【网络编程】Netty之Websocket编解码

作者: 程就人生 | 来源:发表于2023-03-14 21:21 被阅读0次

前几天,有人在简书上给我留言,三年前写的一个关于 WebSocket 的 demo,在传输文本遇到中文时乱码了。这次维护旧项目刚好把 WebSocket 撸一遍,顺便把网友说的发送文本的功能实现。先上个图,提前对本文做了什么有所了解。

一、WebSocket 简介

WebSocket 是由 Web + Socket组成,意思就是在 Web 浏览器上使用的 Socket。

Web 浏览器一直围绕着 http 的请求/响应模式,请求一次响应一次。如果客户端要和服务器不停地进行交互怎么办?只能使用长轮询。长轮询还是离不开请求/响应模式,而且非常耗性能。

WebSocket 就是将网络套接字引入到 Web 浏览器,在 Web 浏览器和服务器之间建立长连接,双方随时都可以发送数据给对方,代替原来的请求/响应模式。

WebSocket 是 HTML5 提供的一种浏览器与服务器进行全双工通信的网络技术,目的就是取代长轮询。WebSocket 基于 TCP 双向全双工进行消息传递,在同一时刻,既可以发送消息,也可以接收消息。 HTTP 协议的特点和 WebSocket 的特点有必要再了解一下。HTTP 的特点:

  • HTTP 协议为半双工协议。半双工协议是指数据可以在客户端和服务器端两个方向上传输,但是不能同时传输。也就是在同一时刻,只有一个方向上的数据传送。
  • HTTP 消息包含消息头、消息体、换行符等等,通常又采用文本方式传输,相较于二进制通信协议,冗长繁琐。
  • 使用长轮询实现消息推送,需要浏览器和服务器之间不停交互,消息冗长,而且容易被黑客利用。

WebSocket 的特点:

  • 单一的 TCP 连接,采用全双工模式通信;

  • 对代理、防火墙和路由器透明;

  • 无头部信息、Cookie 和身份验证;

  • 无安全开销;

  • 通过 “ping/pong” 帧保持链路激活;

  • 服务器可以主动传递消息给客户端,不需要客户端轮询。

二、WebSocket 使用

建立 WebSocket 的连接时,客户端浏览器首先向服务器发起一个 HTTP 请求。这个 HTTP 请求和普通的 HTTP 请求不同,包含一些附加头信息。附加头信息 “Upgrade:WebSocket” 表明这是一个申请协议升级的 HTTP 请求。

服务器端会解析附加头信息,然后生成应答信息返回给浏览器,客户端浏览器和服务器的 WebSocket 连接就建立好了。双方可以通过这个通道自由地传递信息,并且这个连接会持续到浏览器或服务器的某一方主动关闭。

客户端的重连机制。在服务端和客户端通信期间,如果发生网络故障,客户端和服务器端失联,这就需要客户端主动发起重连操作,直到连接服务器成功。

链路的关闭。服务器端遇到客户端主动关闭和异常关闭的情况,就需要监听客户端链接的关闭,以便于根据业务及时作出响应。

关于心跳。要保持客户端和服务器端的长连接,在没有数据发送时,需要使用心跳机制来维持长连接。

三、服务器端编码

在 61 行以匿名内部类的方法加入编解码处理器和业务处理器。

在 67 行加入了 HttpServerCodec 编解码类,它继承了 CombinedChannelDuplexHandler 类,包含了 HttpRequestDecoder 解码类和 HttpResponseEncoder 编码类,并且实现了 HttpServerUpgradeHandler.SourceCodec 接口。

HttpServerCodec = HttpRequestDecoder + HttpResponseEncoder + HttpServerUpgradeHandler.SourceCodec。

到底是使用合成的编解码类,还是使用分开的编解码类,主要看业务需要。

在 94 行,对服务器的启动进行监听。

客户端第一次请求,发起的是 http 协议,运行到 138 行的判断里。升级后的WebSocket 发起的是 WebSocketFrame,运行到 141 行的判断里。

在 handleHttpRequest 方法中,判断进来的 http 是不是升级到 websocket ,如果不是则返回 BAD_REQUEST 响应。如果是,则运行到 198 行,使用 WebSocketServerHandshakerFactory 构造握手信息并返回。

在 handleWebSocketFrameRequest 方法中,对请求过来的 WebSocketFrame 进行处理。WebSocketFrame 是一个抽象类,其下还有 6 个子类。通过判断接收到的对象是哪个子类的实例来进行对应的处理。

接下来看完整的代码。

import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.ContinuationWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
/**
 * WebSocket编解码示例
 * @author 程就人生
 * @Date
 */
public class WebSocketServer {
  
  private static Logger log = LoggerFactory.getLogger(WebSocketServer.class);
  
  public void run(final int port) throws Exception{
    // 专门处理新的连接
    EventLoopGroup bossGroup = new NioEventLoopGroup();
    // 专门处理I/O读写事件
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    
    try{
      // 服务器启动辅助类
      ServerBootstrap serverBootstrap = new ServerBootstrap();
      serverBootstrap.group(bossGroup, workerGroup)
      .channel(NioServerSocketChannel.class)
      // 以匿名内部类的方法加入编解码处理器和业务处理器
      .childHandler(new ChannelInitializer<SocketChannel>(){
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
          
          ChannelPipeline pipeline = ch.pipeline();
          // 首次请求为http协议,将请求和应答消息编码或解码成Http消息
          pipeline.addLast("http-codec", new HttpServerCodec());
          // 将http消息的多个部分组合成一条完整的http消息
          pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
          // 向客户端发送html5文件
          pipeline.addLast("http-chunked", new ChunkedWriteHandler());
          // 进行设置心跳检测
          pipeline.addLast(new IdleStateHandler(20,10,30, TimeUnit.SECONDS));
          // 业务处理类
          pipeline.addLast("handler", new WebSocketHandler());
        }        
      });
        ChannelFuture channelFuture = null;
          // 启动成功标识
          boolean startFlag = false;
          // 启动失败时,多次启动,直到启动成功为止
          while(!startFlag){
              try{
                  channelFuture = serverBootstrap.bind(port).sync();
                  startFlag = true;
              }catch(Exception e){
                  log.info("端口号:" + port + "已被占用!");
                  log.info("尝试一个新的端口:" + port + 1);
                  //重新便规定端口号
                  serverBootstrap.localAddress(new InetSocketAddress(port + 1));  
              }           
          }
      //服务端启动监听事件
          channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() {
              public void operationComplete(Future<? super Void> future) throws Exception {
                  //启动成功后的处理
                  if (future.isSuccess()) {       
                     log.info("netty websocket 服务器端启动成功,Started Successed:{}", port);                     
                  } else {
                     log.info("netty websocket 服务器端启动失败,Started Failed:{}", port);
                  }
              }
          }); 
          ChannelFuture closeFuture = channelFuture.channel().closeFuture();
            closeFuture.sync();
    }finally{
      bossGroup.shutdownGracefully();
      workerGroup.shutdownGracefully();
    }
  }

  public static void main(String[] argo){
    try {
      new WebSocketServer().run(8080);
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}
/**
 * 业务处理类
 * @author 程就人生
 * @Date
 */
class WebSocketHandler extends SimpleChannelInboundHandler<Object>{
  
    private static Logger log = LoggerFactory.getLogger(WebSocketHandler.class);
    
    private WebSocketServerHandshaker handshaker;
    
    // 对请求消息计数
    private static int count = 0;

  @Override
  protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
    // 传统的http接入
    if(msg instanceof FullHttpRequest){
      handleHttpRequest(ctx, (FullHttpRequest) msg);
      // WebSocket接入
    } else if(msg instanceof WebSocketFrame){
      handleWebSocketFrameRequest(ctx, (WebSocketFrame) msg);
    }
  }

  private void handleWebSocketFrameRequest(ChannelHandlerContext ctx, WebSocketFrame frame) {
    // 判断是否是关闭链路的指令
        if(frame instanceof CloseWebSocketFrame){
            log.info("【"+ctx.channel().remoteAddress()+"】已关闭(服务器端)");
            handshaker.close(ctx.channel(), (CloseWebSocketFrame)frame.retain());
            return;
        }
        // 判断是否是ping消息
        if(frame instanceof PingWebSocketFrame){
            log.info("【ping】");
            PongWebSocketFrame pong = new PongWebSocketFrame(frame.content().retain());
            ctx.channel().writeAndFlush(pong);
            return ;
        }
        // 判断实时是pong消息
        if(frame instanceof PongWebSocketFrame){
            log.info("【pong】");
            return ;
        }
        // 判断是不是片段
        if(frame instanceof ContinuationWebSocketFrame){
            log.info("断续帧");
            return;
        }
//        // 判断是不是发送的正常消息
//        if(frame instanceof BinaryWebSocketFrame){
//         // 交给下面的handler处理
//            ctx.fireChannelRead(frame);
//        }
        
        // 本例子只支持文本信息
        if(!(frame instanceof TextWebSocketFrame)){
            log.info("【不支持二进制】");
            throw new UnsupportedOperationException("不支持二进制");
        }
        
        count++;
        
        // 返回信息应答
        String request = ((TextWebSocketFrame)frame).text();
        log.info("接收到的信息:{}:{}", count, request);
        
        // 返回应答消息给客户端端
        ctx.channel().writeAndFlush(new TextWebSocketFrame("你好,客户端" + count));
  }

  private void handleHttpRequest(final ChannelHandlerContext ctx, FullHttpRequest req) {
        // http 解码失败,返回 HTTP 异常
        if(!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))){
            sendHttpResponse(ctx, (FullHttpRequest) req,new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.BAD_REQUEST));
            return;
        }
        // 构造握手响应返回,可以通过url获取其他参数
        WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory(
                "ws://"+req.headers().get("Host")+"/"+req.uri()+"",null,false
        );
        handshaker = factory.newHandshaker(req);
        if(handshaker == null){
            WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
        }else{
            // 进行连接
            handshaker.handshake(ctx.channel(),req);
            // 添加监听事件
            ctx.channel().newSucceededFuture().addListener(new GenericFutureListener<Future<? super Void>>() {
                public void operationComplete(Future<? super Void> future) throws Exception {
                    if (future.isSuccess()) {
                        log.info("加入一个websocket客户端{}!", ctx.channel().remoteAddress());
                    }
                }

            });
        }
  }

  /**
   * 返回应答消息给浏览器
   * @param ctx
   * @param req
   * @param res
   */
  private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) {
    // 返回应答消息给客户端
    if(res.status().code()  != 200){
      ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);
      res.content().writeBytes(buf);
      buf.release();
    }
    // 如果是非Keep-alive,关闭连接
    ChannelFuture channelFuture = ctx.channel().writeAndFlush(res);
    if(!ctx.channel().isActive() || res.status().code() != 200){
      channelFuture.addListener(ChannelFutureListener.CLOSE);
    }
  }
  
  /**
     * 这里是保持服务器与客户端长连接  进行心跳检测 避免连接断开
     * @param ctx
     * @param evt
     * @throws Exception
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if(evt instanceof IdleStateEvent){
            IdleStateEvent stateEvent = (IdleStateEvent) evt;
            PingWebSocketFrame ping = new PingWebSocketFrame();
            switch (stateEvent.state()){
                //读空闲(服务器端)
                case READER_IDLE:
                    log.info("【"+ctx.channel().remoteAddress()+"】读空闲(服务器端)");
                    ctx.writeAndFlush(ping);
                    break;
                    //写空闲(客户端)
                case WRITER_IDLE:
                    log.info("【"+ctx.channel().remoteAddress()+"】写空闲(客户端)");
                    ctx.writeAndFlush(ping);
                    break;
                case ALL_IDLE:
                    log.info("【"+ctx.channel().remoteAddress()+"】读写空闲");
                    ctx.writeAndFlush(ping);
                    break;
            }
        }
    }
    
    /**
     * 出现异常时
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
        ctx.close();
        log.info("【"+ctx.channel().remoteAddress()+"】已关闭(服务器端)");
    }
    
    /**
     * 监听客户端正常关闭和异常关闭
     */
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        ctx.close();
        log.info("【"+ctx.channel().remoteAddress()+"】监听到客户端关闭!");
    }
}

服务器端运行结果:

四、客户端编码

在客户端,通过 window.WebSocket 代码判断当前浏览器是否支持WebSocket 。如果支持 WebSocket ,在 32 行通过 new WebSocket("ws://localhost:8080/websocket") 初始化 WebSocket 对象,把 IP 和端口替换成服务器端的 IP 地址和端口。

在 33 行通过 onmessage 函数监听服务端返回的数据。在 37 行通过 onopen 函数监听链接是否打开。在 41 行通过 onclose 函数监听链接的关闭。在 45 行通过 onerror 函数监听链路中的错误。

当 WebSocket 的 readyState 等于 OPEN 时,链路正常。在链路正常的情况下,可以通过 send 方法给服务器端发送数据。


一个简易的客户端页面。第一行是一个输入框,用来输入要发送给服务器端的文本。第二行的第 1 个按钮用于打开 WebSocket 链接,第 2 个按钮用于获取输入框的内容发送给服务器端,第 3 个按钮开启心跳发送,第 4 个按钮用于关闭 WebSocket链接。在链路发生异常时,客户端每隔 10s 重连一次服务器,直到连接上为止。下面看客户端完整的代码。

<!DOCTYPE html>
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<title>netty webSocket 测试</title>
</head>
<body style="text-align:center;">
   <input type="text" name="msg" id="msg" value="你好,服务器" />
   <br/>
   <br/>
   <button type="button" onclick="openWebSocket()" >打开WebSocket</button>
   <button type="button" onclick="send()" >发送消息</button>
   <button type="button" onclick="sendHeart()" >发送心跳信息</button>
   <button type="button" onclick="closeWebSocket()" >关闭WebSocket</button>
   <hr/>
   <h3>webSocket客户端与服务器之间的消息交互</h3>
   <div id="responseText" style="margin-left:40%;width:400px;height:300px;overflow:scroll;" ></div>
</body>
<script type="text/javascript" src="https://cdn.bootcss.com/jquery/3.4.1/jquery.min.js" ></script>
<script type="text/javascript">
  var webSocket;
  // 判断连接是否建立
  var isOpen;
  var tt;
  // 和服务器建立连接
  function openWebSocket(){
    // 判断当前浏览器是否支持WebSocket
    if(!window.WebSocket){
     window.WebSocket = window.MozWebSocket;
    }
   if(window.WebSocket){
    webSocket = new WebSocket("ws://localhost:8080/websocket");
    webSocket.onmessage = function(event){
        var html = "<span style='float:left;' >" + event.data + "</span>";
      $("#responseText").html($("#responseText").html() + "<br/>" + html);
    };
    webSocket.onopen = function(event){
      $("#responseText").html("打开WebSocket服务!");
      isOpen = true;
    };
    webSocket.onclose = function(event){
      $("#responseText").html($("#responseText").html() + "<br/> WebSocket关闭!");
      isOpen = false;
    }
    webSocket.onerror = function() {
            console.log('发生异常了');
            //出错后重新连接
            reconnect();
        };
    }
    else{
     alert("您所使用的浏览器不支持WebSocket协议!");  
    }
  }
  // 给服务器发送信息
  function send(){
   sendMsg($("#msg").val());  
  }
  function sendMsg(val){
   if(!window.WebSocket){
    return;
   }
   if(webSocket.readyState == WebSocket.OPEN){
     isOpen = true;
     webSocket.send(val);
     var html = "<span style='float:right;' >" + val + "</span>";
     $("#responseText").html($("#responseText").html() + "<br/>" + html);
   }else{
     isOpen = false;
   }
  }
  // 给服务器发送心跳信息
  function sendHeart(){
   setTimeout(sendHeart, 1000 * 40);
   sendMsg("给服务器发送心跳");
  }
  // 关闭WebSocket
  function closeWebSocket(){
   if(!window.WebSocket){
    return;
   }
   if(webSocket.readyState == WebSocket.OPEN){
    webSocket.close();
   }
  }
    //重新连接
  function reconnect() {
    if(isOpen) {
    return;
    };
    isOpen = true;
    //没连接上会一直重连,设置延迟避免请求过多
    tt && clearTimeout(tt);
    tt = setTimeout(function () {
    openWebSocket();
    isOpen = false;
    }, 10000);
  }
    //心跳检测
    var heartCheck = {
        timeout: 20000,
        timeoutObj: null,
        serverTimeoutObj: null,
        start: function(){
              console.log(getNowTime() +" Socket 心跳检测");  
            var self = this;
            this.timeoutObj && clearTimeout(this.timeoutObj);
            this.serverTimeoutObj && clearTimeout(this.serverTimeoutObj);
            this.timeoutObj = setTimeout(function(){
                //这里发送一个心跳,后端收到后,返回一个心跳消息,
                //onmessage拿到返回的心跳就说明连接正常
                console.log(getNowTime() +' Socket 连接重试');
                //socket.send("连接成功");
                self.serverTimeoutObj = setTimeout(function() {
                    console.log(webSocket);
                    webSocket.close();
                }, self.timeout);
            }, this.timeout)
        }
    }
</script>
</html>

客户端运行结果,依次点击按钮。

再看下服务器的控制台输出:

通过截图可以看到服务器接收到了客户端发送的消息,也监听到了客户端连接的关闭。

可能有朋友说,服务器端的心跳这样写不好吧。当然可以把心跳单独拎出去,与业务逻辑的处理进行解耦。干脆把业务逻辑也拎出去,这个类仅负责 WebSocket 的链接也是可以的。
本文通过一个简易的 Demo ,把可以发送文本信息的 WebSocket 走了一遍,希望对你有帮助。

相关文章

网友评论

      本文标题:【网络编程】Netty之Websocket编解码

      本文链接:https://www.haomeiwen.com/subject/qemkrdtx.html