美文网首页
【网络编程】Netty之Websocket编解码

【网络编程】Netty之Websocket编解码

作者: 程就人生 | 来源:发表于2023-03-14 21:21 被阅读0次

    前几天,有人在简书上给我留言,三年前写的一个关于 WebSocket 的 demo,在传输文本遇到中文时乱码了。这次维护旧项目刚好把 WebSocket 撸一遍,顺便把网友说的发送文本的功能实现。先上个图,提前对本文做了什么有所了解。

    一、WebSocket 简介

    WebSocket 是由 Web + Socket组成,意思就是在 Web 浏览器上使用的 Socket。

    Web 浏览器一直围绕着 http 的请求/响应模式,请求一次响应一次。如果客户端要和服务器不停地进行交互怎么办?只能使用长轮询。长轮询还是离不开请求/响应模式,而且非常耗性能。

    WebSocket 就是将网络套接字引入到 Web 浏览器,在 Web 浏览器和服务器之间建立长连接,双方随时都可以发送数据给对方,代替原来的请求/响应模式。

    WebSocket 是 HTML5 提供的一种浏览器与服务器进行全双工通信的网络技术,目的就是取代长轮询。WebSocket 基于 TCP 双向全双工进行消息传递,在同一时刻,既可以发送消息,也可以接收消息。 HTTP 协议的特点和 WebSocket 的特点有必要再了解一下。HTTP 的特点:

    • HTTP 协议为半双工协议。半双工协议是指数据可以在客户端和服务器端两个方向上传输,但是不能同时传输。也就是在同一时刻,只有一个方向上的数据传送。
    • HTTP 消息包含消息头、消息体、换行符等等,通常又采用文本方式传输,相较于二进制通信协议,冗长繁琐。
    • 使用长轮询实现消息推送,需要浏览器和服务器之间不停交互,消息冗长,而且容易被黑客利用。

    WebSocket 的特点:

    • 单一的 TCP 连接,采用全双工模式通信;

    • 对代理、防火墙和路由器透明;

    • 无头部信息、Cookie 和身份验证;

    • 无安全开销;

    • 通过 “ping/pong” 帧保持链路激活;

    • 服务器可以主动传递消息给客户端,不需要客户端轮询。

    二、WebSocket 使用

    建立 WebSocket 的连接时,客户端浏览器首先向服务器发起一个 HTTP 请求。这个 HTTP 请求和普通的 HTTP 请求不同,包含一些附加头信息。附加头信息 “Upgrade:WebSocket” 表明这是一个申请协议升级的 HTTP 请求。

    服务器端会解析附加头信息,然后生成应答信息返回给浏览器,客户端浏览器和服务器的 WebSocket 连接就建立好了。双方可以通过这个通道自由地传递信息,并且这个连接会持续到浏览器或服务器的某一方主动关闭。

    客户端的重连机制。在服务端和客户端通信期间,如果发生网络故障,客户端和服务器端失联,这就需要客户端主动发起重连操作,直到连接服务器成功。

    链路的关闭。服务器端遇到客户端主动关闭和异常关闭的情况,就需要监听客户端链接的关闭,以便于根据业务及时作出响应。

    关于心跳。要保持客户端和服务器端的长连接,在没有数据发送时,需要使用心跳机制来维持长连接。

    三、服务器端编码

    在 61 行以匿名内部类的方法加入编解码处理器和业务处理器。

    在 67 行加入了 HttpServerCodec 编解码类,它继承了 CombinedChannelDuplexHandler 类,包含了 HttpRequestDecoder 解码类和 HttpResponseEncoder 编码类,并且实现了 HttpServerUpgradeHandler.SourceCodec 接口。

    HttpServerCodec = HttpRequestDecoder + HttpResponseEncoder + HttpServerUpgradeHandler.SourceCodec。

    到底是使用合成的编解码类,还是使用分开的编解码类,主要看业务需要。

    在 94 行,对服务器的启动进行监听。

    客户端第一次请求,发起的是 http 协议,运行到 138 行的判断里。升级后的WebSocket 发起的是 WebSocketFrame,运行到 141 行的判断里。

    在 handleHttpRequest 方法中,判断进来的 http 是不是升级到 websocket ,如果不是则返回 BAD_REQUEST 响应。如果是,则运行到 198 行,使用 WebSocketServerHandshakerFactory 构造握手信息并返回。

    在 handleWebSocketFrameRequest 方法中,对请求过来的 WebSocketFrame 进行处理。WebSocketFrame 是一个抽象类,其下还有 6 个子类。通过判断接收到的对象是哪个子类的实例来进行对应的处理。

    接下来看完整的代码。

    import java.net.InetSocketAddress;
    import java.util.concurrent.TimeUnit;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.http.DefaultFullHttpResponse;
    import io.netty.handler.codec.http.FullHttpRequest;
    import io.netty.handler.codec.http.FullHttpResponse;
    import io.netty.handler.codec.http.HttpObjectAggregator;
    import io.netty.handler.codec.http.HttpResponseStatus;
    import io.netty.handler.codec.http.HttpServerCodec;
    import io.netty.handler.codec.http.HttpVersion;
    import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
    import io.netty.handler.codec.http.websocketx.ContinuationWebSocketFrame;
    import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
    import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
    import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
    import io.netty.handler.codec.http.websocketx.WebSocketFrame;
    import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
    import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
    import io.netty.handler.stream.ChunkedWriteHandler;
    import io.netty.handler.timeout.IdleStateEvent;
    import io.netty.handler.timeout.IdleStateHandler;
    import io.netty.util.CharsetUtil;
    import io.netty.util.concurrent.Future;
    import io.netty.util.concurrent.GenericFutureListener;
    /**
     * WebSocket编解码示例
     * @author 程就人生
     * @Date
     */
    public class WebSocketServer {
      
      private static Logger log = LoggerFactory.getLogger(WebSocketServer.class);
      
      public void run(final int port) throws Exception{
        // 专门处理新的连接
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        // 专门处理I/O读写事件
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        
        try{
          // 服务器启动辅助类
          ServerBootstrap serverBootstrap = new ServerBootstrap();
          serverBootstrap.group(bossGroup, workerGroup)
          .channel(NioServerSocketChannel.class)
          // 以匿名内部类的方法加入编解码处理器和业务处理器
          .childHandler(new ChannelInitializer<SocketChannel>(){
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
              
              ChannelPipeline pipeline = ch.pipeline();
              // 首次请求为http协议,将请求和应答消息编码或解码成Http消息
              pipeline.addLast("http-codec", new HttpServerCodec());
              // 将http消息的多个部分组合成一条完整的http消息
              pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
              // 向客户端发送html5文件
              pipeline.addLast("http-chunked", new ChunkedWriteHandler());
              // 进行设置心跳检测
              pipeline.addLast(new IdleStateHandler(20,10,30, TimeUnit.SECONDS));
              // 业务处理类
              pipeline.addLast("handler", new WebSocketHandler());
            }        
          });
            ChannelFuture channelFuture = null;
              // 启动成功标识
              boolean startFlag = false;
              // 启动失败时,多次启动,直到启动成功为止
              while(!startFlag){
                  try{
                      channelFuture = serverBootstrap.bind(port).sync();
                      startFlag = true;
                  }catch(Exception e){
                      log.info("端口号:" + port + "已被占用!");
                      log.info("尝试一个新的端口:" + port + 1);
                      //重新便规定端口号
                      serverBootstrap.localAddress(new InetSocketAddress(port + 1));  
                  }           
              }
          //服务端启动监听事件
              channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() {
                  public void operationComplete(Future<? super Void> future) throws Exception {
                      //启动成功后的处理
                      if (future.isSuccess()) {       
                         log.info("netty websocket 服务器端启动成功,Started Successed:{}", port);                     
                      } else {
                         log.info("netty websocket 服务器端启动失败,Started Failed:{}", port);
                      }
                  }
              }); 
              ChannelFuture closeFuture = channelFuture.channel().closeFuture();
                closeFuture.sync();
        }finally{
          bossGroup.shutdownGracefully();
          workerGroup.shutdownGracefully();
        }
      }
    
      public static void main(String[] argo){
        try {
          new WebSocketServer().run(8080);
        } catch (Exception e) {
          e.printStackTrace();
        }
      }
    }
    /**
     * 业务处理类
     * @author 程就人生
     * @Date
     */
    class WebSocketHandler extends SimpleChannelInboundHandler<Object>{
      
        private static Logger log = LoggerFactory.getLogger(WebSocketHandler.class);
        
        private WebSocketServerHandshaker handshaker;
        
        // 对请求消息计数
        private static int count = 0;
    
      @Override
      protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 传统的http接入
        if(msg instanceof FullHttpRequest){
          handleHttpRequest(ctx, (FullHttpRequest) msg);
          // WebSocket接入
        } else if(msg instanceof WebSocketFrame){
          handleWebSocketFrameRequest(ctx, (WebSocketFrame) msg);
        }
      }
    
      private void handleWebSocketFrameRequest(ChannelHandlerContext ctx, WebSocketFrame frame) {
        // 判断是否是关闭链路的指令
            if(frame instanceof CloseWebSocketFrame){
                log.info("【"+ctx.channel().remoteAddress()+"】已关闭(服务器端)");
                handshaker.close(ctx.channel(), (CloseWebSocketFrame)frame.retain());
                return;
            }
            // 判断是否是ping消息
            if(frame instanceof PingWebSocketFrame){
                log.info("【ping】");
                PongWebSocketFrame pong = new PongWebSocketFrame(frame.content().retain());
                ctx.channel().writeAndFlush(pong);
                return ;
            }
            // 判断实时是pong消息
            if(frame instanceof PongWebSocketFrame){
                log.info("【pong】");
                return ;
            }
            // 判断是不是片段
            if(frame instanceof ContinuationWebSocketFrame){
                log.info("断续帧");
                return;
            }
    //        // 判断是不是发送的正常消息
    //        if(frame instanceof BinaryWebSocketFrame){
    //         // 交给下面的handler处理
    //            ctx.fireChannelRead(frame);
    //        }
            
            // 本例子只支持文本信息
            if(!(frame instanceof TextWebSocketFrame)){
                log.info("【不支持二进制】");
                throw new UnsupportedOperationException("不支持二进制");
            }
            
            count++;
            
            // 返回信息应答
            String request = ((TextWebSocketFrame)frame).text();
            log.info("接收到的信息:{}:{}", count, request);
            
            // 返回应答消息给客户端端
            ctx.channel().writeAndFlush(new TextWebSocketFrame("你好,客户端" + count));
      }
    
      private void handleHttpRequest(final ChannelHandlerContext ctx, FullHttpRequest req) {
            // http 解码失败,返回 HTTP 异常
            if(!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))){
                sendHttpResponse(ctx, (FullHttpRequest) req,new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.BAD_REQUEST));
                return;
            }
            // 构造握手响应返回,可以通过url获取其他参数
            WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory(
                    "ws://"+req.headers().get("Host")+"/"+req.uri()+"",null,false
            );
            handshaker = factory.newHandshaker(req);
            if(handshaker == null){
                WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
            }else{
                // 进行连接
                handshaker.handshake(ctx.channel(),req);
                // 添加监听事件
                ctx.channel().newSucceededFuture().addListener(new GenericFutureListener<Future<? super Void>>() {
                    public void operationComplete(Future<? super Void> future) throws Exception {
                        if (future.isSuccess()) {
                            log.info("加入一个websocket客户端{}!", ctx.channel().remoteAddress());
                        }
                    }
    
                });
            }
      }
    
      /**
       * 返回应答消息给浏览器
       * @param ctx
       * @param req
       * @param res
       */
      private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) {
        // 返回应答消息给客户端
        if(res.status().code()  != 200){
          ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);
          res.content().writeBytes(buf);
          buf.release();
        }
        // 如果是非Keep-alive,关闭连接
        ChannelFuture channelFuture = ctx.channel().writeAndFlush(res);
        if(!ctx.channel().isActive() || res.status().code() != 200){
          channelFuture.addListener(ChannelFutureListener.CLOSE);
        }
      }
      
      /**
         * 这里是保持服务器与客户端长连接  进行心跳检测 避免连接断开
         * @param ctx
         * @param evt
         * @throws Exception
         */
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if(evt instanceof IdleStateEvent){
                IdleStateEvent stateEvent = (IdleStateEvent) evt;
                PingWebSocketFrame ping = new PingWebSocketFrame();
                switch (stateEvent.state()){
                    //读空闲(服务器端)
                    case READER_IDLE:
                        log.info("【"+ctx.channel().remoteAddress()+"】读空闲(服务器端)");
                        ctx.writeAndFlush(ping);
                        break;
                        //写空闲(客户端)
                    case WRITER_IDLE:
                        log.info("【"+ctx.channel().remoteAddress()+"】写空闲(客户端)");
                        ctx.writeAndFlush(ping);
                        break;
                    case ALL_IDLE:
                        log.info("【"+ctx.channel().remoteAddress()+"】读写空闲");
                        ctx.writeAndFlush(ping);
                        break;
                }
            }
        }
        
        /**
         * 出现异常时
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            super.exceptionCaught(ctx, cause);
            ctx.close();
            log.info("【"+ctx.channel().remoteAddress()+"】已关闭(服务器端)");
        }
        
        /**
         * 监听客户端正常关闭和异常关闭
         */
        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
            ctx.close();
            log.info("【"+ctx.channel().remoteAddress()+"】监听到客户端关闭!");
        }
    }
    

    服务器端运行结果:

    四、客户端编码

    在客户端,通过 window.WebSocket 代码判断当前浏览器是否支持WebSocket 。如果支持 WebSocket ,在 32 行通过 new WebSocket("ws://localhost:8080/websocket") 初始化 WebSocket 对象,把 IP 和端口替换成服务器端的 IP 地址和端口。

    在 33 行通过 onmessage 函数监听服务端返回的数据。在 37 行通过 onopen 函数监听链接是否打开。在 41 行通过 onclose 函数监听链接的关闭。在 45 行通过 onerror 函数监听链路中的错误。

    当 WebSocket 的 readyState 等于 OPEN 时,链路正常。在链路正常的情况下,可以通过 send 方法给服务器端发送数据。


    一个简易的客户端页面。第一行是一个输入框,用来输入要发送给服务器端的文本。第二行的第 1 个按钮用于打开 WebSocket 链接,第 2 个按钮用于获取输入框的内容发送给服务器端,第 3 个按钮开启心跳发送,第 4 个按钮用于关闭 WebSocket链接。在链路发生异常时,客户端每隔 10s 重连一次服务器,直到连接上为止。下面看客户端完整的代码。

    <!DOCTYPE html>
    <html>
    <head>
    <meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
    <title>netty webSocket 测试</title>
    </head>
    <body style="text-align:center;">
       <input type="text" name="msg" id="msg" value="你好,服务器" />
       <br/>
       <br/>
       <button type="button" onclick="openWebSocket()" >打开WebSocket</button>
       <button type="button" onclick="send()" >发送消息</button>
       <button type="button" onclick="sendHeart()" >发送心跳信息</button>
       <button type="button" onclick="closeWebSocket()" >关闭WebSocket</button>
       <hr/>
       <h3>webSocket客户端与服务器之间的消息交互</h3>
       <div id="responseText" style="margin-left:40%;width:400px;height:300px;overflow:scroll;" ></div>
    </body>
    <script type="text/javascript" src="https://cdn.bootcss.com/jquery/3.4.1/jquery.min.js" ></script>
    <script type="text/javascript">
      var webSocket;
      // 判断连接是否建立
      var isOpen;
      var tt;
      // 和服务器建立连接
      function openWebSocket(){
        // 判断当前浏览器是否支持WebSocket
        if(!window.WebSocket){
         window.WebSocket = window.MozWebSocket;
        }
       if(window.WebSocket){
        webSocket = new WebSocket("ws://localhost:8080/websocket");
        webSocket.onmessage = function(event){
            var html = "<span style='float:left;' >" + event.data + "</span>";
          $("#responseText").html($("#responseText").html() + "<br/>" + html);
        };
        webSocket.onopen = function(event){
          $("#responseText").html("打开WebSocket服务!");
          isOpen = true;
        };
        webSocket.onclose = function(event){
          $("#responseText").html($("#responseText").html() + "<br/> WebSocket关闭!");
          isOpen = false;
        }
        webSocket.onerror = function() {
                console.log('发生异常了');
                //出错后重新连接
                reconnect();
            };
        }
        else{
         alert("您所使用的浏览器不支持WebSocket协议!");  
        }
      }
      // 给服务器发送信息
      function send(){
       sendMsg($("#msg").val());  
      }
      function sendMsg(val){
       if(!window.WebSocket){
        return;
       }
       if(webSocket.readyState == WebSocket.OPEN){
         isOpen = true;
         webSocket.send(val);
         var html = "<span style='float:right;' >" + val + "</span>";
         $("#responseText").html($("#responseText").html() + "<br/>" + html);
       }else{
         isOpen = false;
       }
      }
      // 给服务器发送心跳信息
      function sendHeart(){
       setTimeout(sendHeart, 1000 * 40);
       sendMsg("给服务器发送心跳");
      }
      // 关闭WebSocket
      function closeWebSocket(){
       if(!window.WebSocket){
        return;
       }
       if(webSocket.readyState == WebSocket.OPEN){
        webSocket.close();
       }
      }
        //重新连接
      function reconnect() {
        if(isOpen) {
        return;
        };
        isOpen = true;
        //没连接上会一直重连,设置延迟避免请求过多
        tt && clearTimeout(tt);
        tt = setTimeout(function () {
        openWebSocket();
        isOpen = false;
        }, 10000);
      }
        //心跳检测
        var heartCheck = {
            timeout: 20000,
            timeoutObj: null,
            serverTimeoutObj: null,
            start: function(){
                  console.log(getNowTime() +" Socket 心跳检测");  
                var self = this;
                this.timeoutObj && clearTimeout(this.timeoutObj);
                this.serverTimeoutObj && clearTimeout(this.serverTimeoutObj);
                this.timeoutObj = setTimeout(function(){
                    //这里发送一个心跳,后端收到后,返回一个心跳消息,
                    //onmessage拿到返回的心跳就说明连接正常
                    console.log(getNowTime() +' Socket 连接重试');
                    //socket.send("连接成功");
                    self.serverTimeoutObj = setTimeout(function() {
                        console.log(webSocket);
                        webSocket.close();
                    }, self.timeout);
                }, this.timeout)
            }
        }
    </script>
    </html>
    

    客户端运行结果,依次点击按钮。

    再看下服务器的控制台输出:

    通过截图可以看到服务器接收到了客户端发送的消息,也监听到了客户端连接的关闭。

    可能有朋友说,服务器端的心跳这样写不好吧。当然可以把心跳单独拎出去,与业务逻辑的处理进行解耦。干脆把业务逻辑也拎出去,这个类仅负责 WebSocket 的链接也是可以的。
    本文通过一个简易的 Demo ,把可以发送文本信息的 WebSocket 走了一遍,希望对你有帮助。

    相关文章

      网友评论

          本文标题:【网络编程】Netty之Websocket编解码

          本文链接:https://www.haomeiwen.com/subject/qemkrdtx.html