美文网首页
Websocket操作字节序 之 服务端

Websocket操作字节序 之 服务端

作者: 程就人生 | 来源:发表于2020-08-18 16:25 被阅读0次

Websocket在JavaScript中操作字节序 之 客户端
在上一篇文章中,把页面的websocket编码写好了,那么服务端又该如何实现呢?由于该文是在上上篇demo中修改的,所以不全的代码还请参照:SpringBoot 2 整合 Netty 实现基于 DTU 的 TCP 服务器 之 服务端
SpringBoot 2 整合 Netty 实现基于 DTU 的 TCP 服务器 之 客户端

第一,在pom文件中加入websocket的引用;

<!-- websocket依赖 -->
        <dependency>
            <groupId>javax.websocket</groupId>
            <artifactId>javax.websocket-api</artifactId>
            <scope>provided</scope>
        </dependency>

第二,编解码;
websocket发出的请求,其协议是BinaryWebSocketFrame,和netty的client发出请求的协议是不一样的,这里需要使用BinaryWebSocketFrame协议接收;
BinaryWebsocketDecoder解码类:

package com.example.im.codec;

import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.example.instant.ProtoInstant;
import com.example.util.CharacterConvert;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;

/**
 * 解析websocket提交的二进制信息
 * @author 程就人生
 * @date 2019年11月27日
 * @Description
 *
 */
@ChannelHandler.Sharable
public class BinaryWebsocketDecoder extends MessageToMessageDecoder<BinaryWebSocketFrame> {

    private static final Logger log = LoggerFactory.getLogger(BinaryWebsocketDecoder.class);

    @Override
    protected void decode(ChannelHandlerContext ctx, BinaryWebSocketFrame msg, List<Object> out) throws Exception {
        log.info("服务器接收到二进制消息,消息长度:[{}]", msg.content().capacity());
        //获取BinaryWebSocketFrame内容
         ByteBuf buf = msg.content();

        buf.markReaderIndex();
        //判断获取到的数据是否够字头,不沟通字头继续往下读
        //字头:1位,数据串总长度:2位
        if (buf.readableBytes() < ProtoInstant.FILED_LEN) {// 不够包头
            log.info("不够包头,继续读!");
            return;
        }
        //读取字头1位
        int fieldHead = CharacterConvert.byteToInt(buf.readByte());
        if(fieldHead != ProtoInstant.FIELD_HEAD){
            log.error("字头不对:" + ctx.channel().remoteAddress());
            ctx.close();
            return;
        }
        // 读取传送过来的消息的长度2位。
        int length = CharacterConvert.shortToInt(buf.readShort());
        if (length < 0) {// 非法数据,关闭连接
            log.error("数据长度为0,非法数据,关闭连接!" + ctx.channel().remoteAddress());
            ctx.close();
        }
        // 读到的消息体长度如果小于传送过来的消息长度,减去字头1位,数据长度2位
        int dataLength = length - ProtoInstant.FILED_LEN;
        if (dataLength > buf.readableBytes()) {
            // 重置读取位置
            buf.resetReaderIndex();
            return;
        }

        byte[] array;
        if (buf.hasArray()) {
            log.info("堆缓冲");
            // 堆缓冲
            ByteBuf slice = buf.slice();
            array = slice.array();
        } else {
            log.info("直接缓冲");
            // 直接缓冲
            array = new byte[dataLength];
            buf.readBytes(array, 0, dataLength);
        }
        if (array != null) {
            msg.retain();
            out.add(array);
        }
    }
}

在编码时,也需要处理一下,编码成websocket可以接收的类,BinaryWebsocketEncoder编码类:

package com.example.im.codec;

import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.example.instant.ProtoInstant;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;

/**
 * websocket消息的发送
 * @author 程就人生
 * @date 2019年12月6日
 * @Description
 *
 */
public class BinaryWebsocketEncoder extends MessageToMessageEncoder<byte[]>{

    private static Logger log = LoggerFactory.getLogger(BinaryWebsocketEncoder.class);

    @Override
    protected void encode(ChannelHandlerContext ctx, byte[] msg, List<Object> out) throws Exception {
        ByteBuf byteBuf = Unpooled.directBuffer(ProtoInstant.FILED_LEN + msg.length);
        // 字头(1位)
        byteBuf.writeByte(ProtoInstant.FIELD_HEAD);
        // 数据长度(2位),字头1位+数据长度2位+数据位(包含校验1位)
        byteBuf.writeShort(ProtoInstant.FILED_LEN + msg.length);
        //消息体,包含我们要发送的数据
        byteBuf.writeBytes(msg);
        out.add(new BinaryWebSocketFrame(byteBuf));
        log.info("websocket二进制数据出站了了了.....");
    }
}

第三,websocket第一次连接http请求处理类;
websocket请求服务器,有一个流程,先发起http请求打开连接,然后再转成tcp长连接;只有在第一次请求的时候,发起的是http短连接,连接成功后就转成tcp连接;所以,要先对第一次请求做处理。

package com.example.im.handler;

import org.springframework.stereotype.Component;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;

/**
 * websocket连接操作,只是连接
 * @author 程就人生
 * @date 2019年11月22日
 * @Description
 *
 */
@Component
@ChannelHandler.Sharable
public class WebsocketConnectHandler extends SimpleChannelInboundHandler<Object>{

    private WebSocketServerHandshaker handshaker;

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        try{
            //http请求和tcp请求分开处理
            if(msg instanceof HttpRequest){
                //连接后的处理
                handlerHttpRequest(ctx,(HttpRequest) msg);
            }else{
                //其他的操作交给下面的Handler操作去做
                ctx.fireChannelRead(msg);
            }
        }catch(Exception e){
            e.printStackTrace();
        }
    }

    /**
     * wetsocket第一次连接握手
     * @param ctx
     * @param msg
     */
    @SuppressWarnings("deprecation")
    private void handlerHttpRequest(ChannelHandlerContext ctx, HttpRequest req) {

        // http 解码失败,如果请求头不是升级到websocket,则响应403
        if(!req.getDecoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))){
            sendHttpResponse(ctx, (FullHttpRequest) req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.BAD_REQUEST));
        }
        //可以通过url获取其他参数
        WebSocketServerHandshakerFactory factory =
                new WebSocketServerHandshakerFactory("ws://"+req.headers().get("Host")+"/"+req.getUri()+"",null,false
        );

        handshaker = factory.newHandshaker(req);

        Channel channel = ctx.channel();
        if(handshaker == null){
            WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(channel);
        }else{
            //进行连接,握手
            handshaker.handshake(channel, req);
            //TODO
            
            //添加监听事件
            channel.newSucceededFuture().addListener(new GenericFutureListener<Future<? super Void>>() {
                @Override
                public void operationComplete(Future<? super Void> future) throws Exception {
                    if (future.isSuccess()) {

                    }
                }

            });
        }
    }

    @SuppressWarnings("deprecation")
    private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res) {
        // 返回应答给客户端
        if (res.getStatus().code() != 200) {
            ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8);
            res.content().writeBytes(buf);
            buf.release();
        }
        // 如果是非Keep-Alive,关闭连接
        ChannelFuture f = ctx.channel().writeAndFlush(res);
        if (!HttpHeaders.isKeepAlive(req) || res.getStatus().code() != 200) {
            f.addListener(ChannelFutureListener.CLOSE);
        }
    }

}

第三,由于后台是对两种协议请求进行处理,这里加了一个入口类;
入口类,主要是区分发起请求的客户端,是websocket客户端,还是netty的客户端,请求的客户端不一样,协议不一样,编解码也不一样。
入口类EntranceHandler :

package com.example.im.handler;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import com.example.instant.ProtoInstant;
import com.example.util.CharacterConvert;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/**
 * 总入口handler(websocket、netty client的判断)
 * @author 程就人生
 * @date 2019年12月5日
 * @Description
 *
 */
@Service("entranceHandler")
@ChannelHandler.Sharable
public class EntranceHandler extends SimpleChannelInboundHandler<Object>{

    private static Logger log = LoggerFactory.getLogger(EntranceHandler.class);

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info("....首次连接的判断....");
        ByteBuf byteBuf = (ByteBuf) msg;

        // 标记一下当前的readIndex的位置
        byteBuf.markReaderIndex();
        // 判断包头长度
        if (byteBuf.readableBytes() < ProtoInstant.FILED_LEN) {// 不够包头
            return;
        }
        // 读取字头
        int head = CharacterConvert.byteToInt(byteBuf.readByte());
        byteBuf.resetReaderIndex();
        if (head == ProtoInstant.FIELD_HEAD) {
            log.info("....节点间的/netty client的连接建立....");
            //http请求响应
            ctx.channel().pipeline().remove("httpRequestDecoder");
            ctx.channel().pipeline().remove("httpResponseEncoder");
            //websocket连接的建立
            ctx.channel().pipeline().remove("websocketConnectHandler");
            //websocket编码解码
            ctx.channel().pipeline().remove("binaryDeCoder");
            ctx.channel().pipeline().remove("binaryEncoder");
        } else {
            log.info("....websocket连接建立....");
            ctx.channel().pipeline().remove("bdeCoder");
            ctx.channel().pipeline().remove("benCoder");

        }
        byteBuf.retain();
        ctx.fireChannelRead(byteBuf);
        //删除首次连接通道,一个连接只用到一次
        ctx.channel().pipeline().remove(this);
    }
}

第四,netty服务端加入编解码及处理类;

package com.example.im;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import com.example.im.codec.BinaryWebsocketDecoder;
import com.example.im.codec.BinaryWebsocketEncoder;
import com.example.im.codec.ByteArrayDecoder;
import com.example.im.codec.ByteArrayEncoder;
import com.example.im.handler.EntranceHandler;
import com.example.im.handler.ExceptionHandler;
import com.example.im.handler.HeartBeatServerHandler;
import com.example.im.handler.LoginRequestHandler;
import com.example.im.handler.WebsocketConnectHandler;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;

/**
 * netty dtu服务器
 * @author 程就人生
 * @date 2020年7月13日
 * @Description 
 *
 */
@Component
public class SocketServer {
            
    private static Logger log = LoggerFactory.getLogger(SocketServer.class);
    
    private final EventLoopGroup bossGroup = new NioEventLoopGroup();
    //开启两个线程池
    private final EventLoopGroup workGroup = new NioEventLoopGroup();
    
    //启动装饰类
    private final ServerBootstrap serverBootstrap = new ServerBootstrap();  
    
    //本机的ip地址
    private String ip;
    
    //启动端口获取
    @Value("${server.port}")
    private int port;
    
    @Autowired
    private LoginRequestHandler loginRequestHandler;
    
    @Autowired
    private ExceptionHandler exceptionHandler;
    
    @Autowired
    private WebsocketConnectHandler websocketConnectHandler;
    
    //入口handler
    @Autowired
    private EntranceHandler entranceHandler;
        
    /**
     * 启动服务
     * @param port
     */
    public void start(){
        
        try {
            //获取本机的ip地址
            ip = InetAddress.getLocalHost().getHostAddress();
        } catch (UnknownHostException e1) {
            e1.printStackTrace();
        }
        
        serverBootstrap.group(bossGroup,workGroup)
              //非阻塞
              .channel(NioServerSocketChannel.class)
              //连接缓冲池的大小
              .option(ChannelOption.SO_BACKLOG, 1024)
               //设置通道Channel的分配器
              .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
              .childHandler(new ChannelInitializer<SocketChannel>(){
                  
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {              
                ChannelPipeline pipeline = socketChannel.pipeline();
                
                //总入口handler,区分是websocket连接还是netty client的连接
                pipeline.addFirst("entranceHandler", entranceHandler);

                //添加编解码和处理器(节点间通讯用)
                pipeline.addLast("bdeCoder", new ByteArrayDecoder());
                pipeline.addLast("benCoder", new ByteArrayEncoder());

                // HttpServerCodec:将请求和应答消息解码为HTTP消息
                pipeline.addLast("httpRequestDecoder", new HttpRequestDecoder());
                pipeline.addLast("httpResponseEncoder", new HttpResponseEncoder());

                //建立连接(web端http的握手)
                pipeline.addLast("websocketConnectHandler", websocketConnectHandler);

                //websocket解码、编码
                pipeline.addLast("binaryDeCoder", new BinaryWebsocketDecoder());
                pipeline.addLast("binaryEncoder", new BinaryWebsocketEncoder());
                
                //管理后台登录
                pipeline.addLast("loginHandler", loginRequestHandler);
                //心跳检测
                pipeline.addLast("heartBeat", new HeartBeatServerHandler());      
                //异常处理
                pipeline.addLast("exception", exceptionHandler);

            }
        });
        
        ChannelFuture channelFuture = null;
        //启动成功标识
        boolean startFlag = false;
        //启动失败时,多次启动,直到启动成功为止
        while(!startFlag){
            try{
                channelFuture = serverBootstrap.bind(port).sync();
                startFlag = true;
            }catch(Exception e){
                log.info("端口号:" + port + "已被占用!");
                port++;
                log.info("尝试一个新的端口:" + port);
                //重新便规定端口号
                serverBootstrap.localAddress(new InetSocketAddress(port));  
            }           
        }
        
        //服务端启动监听事件
        channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() {
            @Override
            public void operationComplete(Future<? super Void> future) throws Exception {
                //启动成功后的处理
                if (future.isSuccess()) {       
                   log.info("netty dtu服务器启动成功,Started Successed:" + ip + ":" + port);                     
                } else {
                   log.info("netty dtu服务器启动失败,Started Failed:" + ip + ":" + port);
                }
            }
        }); 
        
        try {
            // 7 监听通道关闭事件
            // 应用程序会一直等待,直到channel关闭
            ChannelFuture closeFuture = channelFuture.channel().closeFuture();
            closeFuture.sync();
        } catch (Exception e) {
            e.printStackTrace();
            log.error("发生其他异常", e);
        } finally {
            // 8 优雅关闭EventLoopGroup,
            // 释放掉所有资源包括创建的线程
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }
    
}

最后,测试,完美验收;
分别启动客户端和服务端,点击客户端的按钮,发起websocket请求,看看能否登陆成功,并保持心跳。

服务端接收到的信息
客户端接收的信息

相关文章

网友评论

      本文标题:Websocket操作字节序 之 服务端

      本文链接:https://www.haomeiwen.com/subject/ufckjktx.html