美文网首页
Websocket操作字节序 之 服务端

Websocket操作字节序 之 服务端

作者: 程就人生 | 来源:发表于2020-08-18 16:25 被阅读0次

    Websocket在JavaScript中操作字节序 之 客户端
    在上一篇文章中,把页面的websocket编码写好了,那么服务端又该如何实现呢?由于该文是在上上篇demo中修改的,所以不全的代码还请参照:SpringBoot 2 整合 Netty 实现基于 DTU 的 TCP 服务器 之 服务端
    SpringBoot 2 整合 Netty 实现基于 DTU 的 TCP 服务器 之 客户端

    第一,在pom文件中加入websocket的引用;

    <!-- websocket依赖 -->
            <dependency>
                <groupId>javax.websocket</groupId>
                <artifactId>javax.websocket-api</artifactId>
                <scope>provided</scope>
            </dependency>
    

    第二,编解码;
    websocket发出的请求,其协议是BinaryWebSocketFrame,和netty的client发出请求的协议是不一样的,这里需要使用BinaryWebSocketFrame协议接收;
    BinaryWebsocketDecoder解码类:

    package com.example.im.codec;
    
    import java.util.List;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.example.instant.ProtoInstant;
    import com.example.util.CharacterConvert;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.MessageToMessageDecoder;
    import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
    
    /**
     * 解析websocket提交的二进制信息
     * @author 程就人生
     * @date 2019年11月27日
     * @Description
     *
     */
    @ChannelHandler.Sharable
    public class BinaryWebsocketDecoder extends MessageToMessageDecoder<BinaryWebSocketFrame> {
    
        private static final Logger log = LoggerFactory.getLogger(BinaryWebsocketDecoder.class);
    
        @Override
        protected void decode(ChannelHandlerContext ctx, BinaryWebSocketFrame msg, List<Object> out) throws Exception {
            log.info("服务器接收到二进制消息,消息长度:[{}]", msg.content().capacity());
            //获取BinaryWebSocketFrame内容
             ByteBuf buf = msg.content();
    
            buf.markReaderIndex();
            //判断获取到的数据是否够字头,不沟通字头继续往下读
            //字头:1位,数据串总长度:2位
            if (buf.readableBytes() < ProtoInstant.FILED_LEN) {// 不够包头
                log.info("不够包头,继续读!");
                return;
            }
            //读取字头1位
            int fieldHead = CharacterConvert.byteToInt(buf.readByte());
            if(fieldHead != ProtoInstant.FIELD_HEAD){
                log.error("字头不对:" + ctx.channel().remoteAddress());
                ctx.close();
                return;
            }
            // 读取传送过来的消息的长度2位。
            int length = CharacterConvert.shortToInt(buf.readShort());
            if (length < 0) {// 非法数据,关闭连接
                log.error("数据长度为0,非法数据,关闭连接!" + ctx.channel().remoteAddress());
                ctx.close();
            }
            // 读到的消息体长度如果小于传送过来的消息长度,减去字头1位,数据长度2位
            int dataLength = length - ProtoInstant.FILED_LEN;
            if (dataLength > buf.readableBytes()) {
                // 重置读取位置
                buf.resetReaderIndex();
                return;
            }
    
            byte[] array;
            if (buf.hasArray()) {
                log.info("堆缓冲");
                // 堆缓冲
                ByteBuf slice = buf.slice();
                array = slice.array();
            } else {
                log.info("直接缓冲");
                // 直接缓冲
                array = new byte[dataLength];
                buf.readBytes(array, 0, dataLength);
            }
            if (array != null) {
                msg.retain();
                out.add(array);
            }
        }
    }
    

    在编码时,也需要处理一下,编码成websocket可以接收的类,BinaryWebsocketEncoder编码类:

    package com.example.im.codec;
    
    import java.util.List;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.example.instant.ProtoInstant;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.MessageToMessageEncoder;
    import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
    
    /**
     * websocket消息的发送
     * @author 程就人生
     * @date 2019年12月6日
     * @Description
     *
     */
    public class BinaryWebsocketEncoder extends MessageToMessageEncoder<byte[]>{
    
        private static Logger log = LoggerFactory.getLogger(BinaryWebsocketEncoder.class);
    
        @Override
        protected void encode(ChannelHandlerContext ctx, byte[] msg, List<Object> out) throws Exception {
            ByteBuf byteBuf = Unpooled.directBuffer(ProtoInstant.FILED_LEN + msg.length);
            // 字头(1位)
            byteBuf.writeByte(ProtoInstant.FIELD_HEAD);
            // 数据长度(2位),字头1位+数据长度2位+数据位(包含校验1位)
            byteBuf.writeShort(ProtoInstant.FILED_LEN + msg.length);
            //消息体,包含我们要发送的数据
            byteBuf.writeBytes(msg);
            out.add(new BinaryWebSocketFrame(byteBuf));
            log.info("websocket二进制数据出站了了了.....");
        }
    }
    

    第三,websocket第一次连接http请求处理类;
    websocket请求服务器,有一个流程,先发起http请求打开连接,然后再转成tcp长连接;只有在第一次请求的时候,发起的是http短连接,连接成功后就转成tcp连接;所以,要先对第一次请求做处理。

    package com.example.im.handler;
    
    import org.springframework.stereotype.Component;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.handler.codec.http.DefaultFullHttpResponse;
    import io.netty.handler.codec.http.FullHttpRequest;
    import io.netty.handler.codec.http.HttpHeaders;
    import io.netty.handler.codec.http.HttpRequest;
    import io.netty.handler.codec.http.HttpResponseStatus;
    import io.netty.handler.codec.http.HttpVersion;
    import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
    import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
    import io.netty.util.CharsetUtil;
    import io.netty.util.concurrent.Future;
    import io.netty.util.concurrent.GenericFutureListener;
    
    /**
     * websocket连接操作,只是连接
     * @author 程就人生
     * @date 2019年11月22日
     * @Description
     *
     */
    @Component
    @ChannelHandler.Sharable
    public class WebsocketConnectHandler extends SimpleChannelInboundHandler<Object>{
    
        private WebSocketServerHandshaker handshaker;
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
            try{
                //http请求和tcp请求分开处理
                if(msg instanceof HttpRequest){
                    //连接后的处理
                    handlerHttpRequest(ctx,(HttpRequest) msg);
                }else{
                    //其他的操作交给下面的Handler操作去做
                    ctx.fireChannelRead(msg);
                }
            }catch(Exception e){
                e.printStackTrace();
            }
        }
    
        /**
         * wetsocket第一次连接握手
         * @param ctx
         * @param msg
         */
        @SuppressWarnings("deprecation")
        private void handlerHttpRequest(ChannelHandlerContext ctx, HttpRequest req) {
    
            // http 解码失败,如果请求头不是升级到websocket,则响应403
            if(!req.getDecoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))){
                sendHttpResponse(ctx, (FullHttpRequest) req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.BAD_REQUEST));
            }
            //可以通过url获取其他参数
            WebSocketServerHandshakerFactory factory =
                    new WebSocketServerHandshakerFactory("ws://"+req.headers().get("Host")+"/"+req.getUri()+"",null,false
            );
    
            handshaker = factory.newHandshaker(req);
    
            Channel channel = ctx.channel();
            if(handshaker == null){
                WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(channel);
            }else{
                //进行连接,握手
                handshaker.handshake(channel, req);
                //TODO
                
                //添加监听事件
                channel.newSucceededFuture().addListener(new GenericFutureListener<Future<? super Void>>() {
                    @Override
                    public void operationComplete(Future<? super Void> future) throws Exception {
                        if (future.isSuccess()) {
    
                        }
                    }
    
                });
            }
        }
    
        @SuppressWarnings("deprecation")
        private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res) {
            // 返回应答给客户端
            if (res.getStatus().code() != 200) {
                ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8);
                res.content().writeBytes(buf);
                buf.release();
            }
            // 如果是非Keep-Alive,关闭连接
            ChannelFuture f = ctx.channel().writeAndFlush(res);
            if (!HttpHeaders.isKeepAlive(req) || res.getStatus().code() != 200) {
                f.addListener(ChannelFutureListener.CLOSE);
            }
        }
    
    }
    

    第三,由于后台是对两种协议请求进行处理,这里加了一个入口类;
    入口类,主要是区分发起请求的客户端,是websocket客户端,还是netty的客户端,请求的客户端不一样,协议不一样,编解码也不一样。
    入口类EntranceHandler :

    package com.example.im.handler;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.stereotype.Service;
    
    import com.example.instant.ProtoInstant;
    import com.example.util.CharacterConvert;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    
    /**
     * 总入口handler(websocket、netty client的判断)
     * @author 程就人生
     * @date 2019年12月5日
     * @Description
     *
     */
    @Service("entranceHandler")
    @ChannelHandler.Sharable
    public class EntranceHandler extends SimpleChannelInboundHandler<Object>{
    
        private static Logger log = LoggerFactory.getLogger(EntranceHandler.class);
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
            log.info("....首次连接的判断....");
            ByteBuf byteBuf = (ByteBuf) msg;
    
            // 标记一下当前的readIndex的位置
            byteBuf.markReaderIndex();
            // 判断包头长度
            if (byteBuf.readableBytes() < ProtoInstant.FILED_LEN) {// 不够包头
                return;
            }
            // 读取字头
            int head = CharacterConvert.byteToInt(byteBuf.readByte());
            byteBuf.resetReaderIndex();
            if (head == ProtoInstant.FIELD_HEAD) {
                log.info("....节点间的/netty client的连接建立....");
                //http请求响应
                ctx.channel().pipeline().remove("httpRequestDecoder");
                ctx.channel().pipeline().remove("httpResponseEncoder");
                //websocket连接的建立
                ctx.channel().pipeline().remove("websocketConnectHandler");
                //websocket编码解码
                ctx.channel().pipeline().remove("binaryDeCoder");
                ctx.channel().pipeline().remove("binaryEncoder");
            } else {
                log.info("....websocket连接建立....");
                ctx.channel().pipeline().remove("bdeCoder");
                ctx.channel().pipeline().remove("benCoder");
    
            }
            byteBuf.retain();
            ctx.fireChannelRead(byteBuf);
            //删除首次连接通道,一个连接只用到一次
            ctx.channel().pipeline().remove(this);
        }
    }
    

    第四,netty服务端加入编解码及处理类;

    package com.example.im;
    
    import java.net.InetAddress;
    import java.net.InetSocketAddress;
    import java.net.UnknownHostException;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;
    
    import com.example.im.codec.BinaryWebsocketDecoder;
    import com.example.im.codec.BinaryWebsocketEncoder;
    import com.example.im.codec.ByteArrayDecoder;
    import com.example.im.codec.ByteArrayEncoder;
    import com.example.im.handler.EntranceHandler;
    import com.example.im.handler.ExceptionHandler;
    import com.example.im.handler.HeartBeatServerHandler;
    import com.example.im.handler.LoginRequestHandler;
    import com.example.im.handler.WebsocketConnectHandler;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.buffer.PooledByteBufAllocator;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.http.HttpRequestDecoder;
    import io.netty.handler.codec.http.HttpResponseEncoder;
    import io.netty.util.concurrent.Future;
    import io.netty.util.concurrent.GenericFutureListener;
    
    /**
     * netty dtu服务器
     * @author 程就人生
     * @date 2020年7月13日
     * @Description 
     *
     */
    @Component
    public class SocketServer {
                
        private static Logger log = LoggerFactory.getLogger(SocketServer.class);
        
        private final EventLoopGroup bossGroup = new NioEventLoopGroup();
        //开启两个线程池
        private final EventLoopGroup workGroup = new NioEventLoopGroup();
        
        //启动装饰类
        private final ServerBootstrap serverBootstrap = new ServerBootstrap();  
        
        //本机的ip地址
        private String ip;
        
        //启动端口获取
        @Value("${server.port}")
        private int port;
        
        @Autowired
        private LoginRequestHandler loginRequestHandler;
        
        @Autowired
        private ExceptionHandler exceptionHandler;
        
        @Autowired
        private WebsocketConnectHandler websocketConnectHandler;
        
        //入口handler
        @Autowired
        private EntranceHandler entranceHandler;
            
        /**
         * 启动服务
         * @param port
         */
        public void start(){
            
            try {
                //获取本机的ip地址
                ip = InetAddress.getLocalHost().getHostAddress();
            } catch (UnknownHostException e1) {
                e1.printStackTrace();
            }
            
            serverBootstrap.group(bossGroup,workGroup)
                  //非阻塞
                  .channel(NioServerSocketChannel.class)
                  //连接缓冲池的大小
                  .option(ChannelOption.SO_BACKLOG, 1024)
                   //设置通道Channel的分配器
                  .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                  .childHandler(new ChannelInitializer<SocketChannel>(){
                      
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {              
                    ChannelPipeline pipeline = socketChannel.pipeline();
                    
                    //总入口handler,区分是websocket连接还是netty client的连接
                    pipeline.addFirst("entranceHandler", entranceHandler);
    
                    //添加编解码和处理器(节点间通讯用)
                    pipeline.addLast("bdeCoder", new ByteArrayDecoder());
                    pipeline.addLast("benCoder", new ByteArrayEncoder());
    
                    // HttpServerCodec:将请求和应答消息解码为HTTP消息
                    pipeline.addLast("httpRequestDecoder", new HttpRequestDecoder());
                    pipeline.addLast("httpResponseEncoder", new HttpResponseEncoder());
    
                    //建立连接(web端http的握手)
                    pipeline.addLast("websocketConnectHandler", websocketConnectHandler);
    
                    //websocket解码、编码
                    pipeline.addLast("binaryDeCoder", new BinaryWebsocketDecoder());
                    pipeline.addLast("binaryEncoder", new BinaryWebsocketEncoder());
                    
                    //管理后台登录
                    pipeline.addLast("loginHandler", loginRequestHandler);
                    //心跳检测
                    pipeline.addLast("heartBeat", new HeartBeatServerHandler());      
                    //异常处理
                    pipeline.addLast("exception", exceptionHandler);
    
                }
            });
            
            ChannelFuture channelFuture = null;
            //启动成功标识
            boolean startFlag = false;
            //启动失败时,多次启动,直到启动成功为止
            while(!startFlag){
                try{
                    channelFuture = serverBootstrap.bind(port).sync();
                    startFlag = true;
                }catch(Exception e){
                    log.info("端口号:" + port + "已被占用!");
                    port++;
                    log.info("尝试一个新的端口:" + port);
                    //重新便规定端口号
                    serverBootstrap.localAddress(new InetSocketAddress(port));  
                }           
            }
            
            //服务端启动监听事件
            channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() {
                @Override
                public void operationComplete(Future<? super Void> future) throws Exception {
                    //启动成功后的处理
                    if (future.isSuccess()) {       
                       log.info("netty dtu服务器启动成功,Started Successed:" + ip + ":" + port);                     
                    } else {
                       log.info("netty dtu服务器启动失败,Started Failed:" + ip + ":" + port);
                    }
                }
            }); 
            
            try {
                // 7 监听通道关闭事件
                // 应用程序会一直等待,直到channel关闭
                ChannelFuture closeFuture = channelFuture.channel().closeFuture();
                closeFuture.sync();
            } catch (Exception e) {
                e.printStackTrace();
                log.error("发生其他异常", e);
            } finally {
                // 8 优雅关闭EventLoopGroup,
                // 释放掉所有资源包括创建的线程
                bossGroup.shutdownGracefully();
                workGroup.shutdownGracefully();
            }
        }
        
    }
    

    最后,测试,完美验收;
    分别启动客户端和服务端,点击客户端的按钮,发起websocket请求,看看能否登陆成功,并保持心跳。

    服务端接收到的信息
    客户端接收的信息

    相关文章

      网友评论

          本文标题:Websocket操作字节序 之 服务端

          本文链接:https://www.haomeiwen.com/subject/ufckjktx.html