美文网首页netty
netty搭建tcp服务器通信(解决粘包问题)

netty搭建tcp服务器通信(解决粘包问题)

作者: lannisiter | 来源:发表于2020-09-28 08:56 被阅读0次

    最近做的项目有需求跟硬件通信,使用tcp实现长连接,协议自己规定,于是后端决定选用netty来作为tcp服务器,这里简单说一下netty的工作流程。外部的数据传入netty服务器中,netty首先通过解码器对数据进行一次预处理(比如把字节转为字符串或对象来方便操作),接着把预处理后的数据转发给处理器,在处理器中执行业务逻辑,最后如果有必要返回数据给连接者,可以通过netty提供的channel发送。

    • netty—>decode—>handler

    首先是启动一个tcp服务器

    package server;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    /**
     * @author lanni
     * @date 2020/8/19 23:05
     * @description
     **/
    public class TCPServer {
        public void run(int port) throws Exception {
            //创建线程组
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                //创建启动类
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(new ServerInitializer())
                        .option(ChannelOption.SO_BACKLOG, 256)
                        .childOption(ChannelOption.SO_KEEPALIVE, true);
                // 绑定端口,开始接收进来的连接
                ChannelFuture f = b.bind(port).sync();
                // 等待服务器 socket 关闭 。
                f.channel().closeFuture().sync();
            } finally {
                workerGroup.shutdownGracefully();
                bossGroup.shutdownGracefully();
            }
        }
        public static void main(String[] args) {
            try {
                System.out.println("tcp服务器启动...");
                new TCPServer().run(8998);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    

    初始化解码器、处理器

    package server;
    
    import handler.CustomDecode;
    import handler.TCPServerHandler;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.socket.SocketChannel;
    
    /**
     * @author lanni
     * @date 2020/8/22 11:58
     * @description
     **/
    public class ServerInitializer extends ChannelInitializer<SocketChannel> {
    
        @Override
        protected void initChannel(SocketChannel channel) throws Exception {
            channel.pipeline().
                    addLast(new CustomDecode()).        //自定义解码器
                    addLast(new TCPServerHandler())     //自定义处理器
            ;
        }
    }
    

    解码器中解决tcp粘包问题,关于什么是粘包、拆包我就不做解释了,我这里直接上解决方案,这里我简单说一下我做的项目数据传输,规定数据格式:

    固定头部(2字节)+数据长度(4字节)+其它(17字节)+数据(可变长度)+crc校验码(2字节)+固定结尾(2字节)

    所以每次收到的数据包中包含了数据的长度,就以此长度来组装数据包传递给handler,这里注意看我的注释部分。

    import util.StringUtil;
    
    import java.util.List;
    
    /**
     * @Author lanni
     * @Date 2020/8/23 9:30
     * @Description
     **/
    public class CustomDecode extends ByteToMessageDecoder {
    
        @Override
        protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> out) throws Exception {
            int len = in.readableBytes();       //这里得到可读取的字节长度
            in.markReaderIndex();               //包头做标记位,后面可以重新回到数据包头开始读数据
            //有数据时开始读数据包
            if (len > 0) {
                byte[] src = new byte[len];
                in.readBytes(src);          //把数据读到字节数组中(读取完之后指针会到最后一个数据)
                in.resetReaderIndex();      //重置当前指针到标记位(包头)
                //验证首部为A5 5A,只接收首部正确的数据包,如果包头错误可以直接丢弃或关闭连接
                if ((src[0] & 0x000000ff) == 0xA5 && (src[1] & 0x000000ff) == 0x5A) {
                    //计算报文长度
                    byte[] data =  {src[3],src[2]};
                    String hexLen = StringUtil.byteArrayToHexString(data);
                    //这里计算出来的是数据长度的报文长度,需要加27个固定长度
                    int pLen = Integer.parseInt(hexLen, 16) + 27;
                    if (len < pLen) {
                        //当数据包的长度不够时直接return,netty在缓冲区有数据时会一直调用decode方法,所以我们只需要等待下一个数据包传输过来一起解析
                        return;
                    }
                    byte[] packet = new byte[pLen];
                    in.readBytes(packet,0,pLen);
                    out.add(packet);
                }else {
                    channelHandlerContext.close();
                }
            }
        }
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            System.out.println("连接异常:"+cause);
    //        ctx.close();
        }
    

    然后就是处理器,用于处理得到的数据包,这个大家可以自己编写逻辑。

    package handler;
    
    import cn.hutool.core.util.StrUtil;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import util.StringUtil;
    /**
     * @Author lanni
     * @Date 2020/8/19 23:07
     * @Description
     **/
    public class TCPServerHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            //这里msg就是从解码器中传来的数据,解码器传输过来是什么格式,这里直接转成对应的格式就可以
            byte[] src = (byte[]) msg;
            try {
                //这里做自己的业务逻辑
                
                
                //获取链接实例
                Channel channel = ctx.channel();
                //响应消息一定要这样去发送,只能使用字节传输
                //netty中发送数据需要把待发送的字节数组包装一下成为ByteBuf来发送
                byte[] dest = null;
                ByteBuf buf = Unpooled.copiedBuffer(dest);
                //数据冲刷
                ChannelFuture cf = channel.writeAndFlush(buf);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            // 当出现异常就关闭连接
            cause.printStackTrace();
            ctx.close();
        }
    }
    

    netty中当然还涉及到服务器主动发送消息给客户端,但是需要注意的是如果是主动发消息,有一个先决条件是需要知道客户端的唯一标识(id或其它标识),我们需要用一个map来保存好channel和这个标识的对应关系。我所做的项目是服务器来维护设备id和连接通道channel的对应关系。

    首先需要一个统一管理channel的类,这里有CHANNEL_POOLKEY_POOL两个map,是为了让id和channel能够互相对应起来,可能有人会想着只需要维护id—>channel的关系就可以了,但是可以看见上面在发生异常时所使用的处理方法exceptionCaught(ChannelHandlerContext ctx, Throwable cause)时,只能拿到channel,所以需要通过channel找到id来做出相应的操作。

    import io.netty.channel.Channel;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.Map;
    import java.util.Set;
    import java.util.concurrent.ConcurrentHashMap;
    
    /**
     * @author lanni
     * @date 2020/9/11 20:21
     *
     **/
    @Slf4j
    public class NettyChannelManager {
        /**
         * 保存连接 Channel 的地方
         */
        private static Map<String, Channel> CHANNEL_POOL = new ConcurrentHashMap<>();
        private static Map<Channel, String> KEY_POOL = new ConcurrentHashMap<>();
    
        /**
         * 添加 Channel
         *
         * @param key
         */
        public static void add(String key, Channel channel) {
            CHANNEL_POOL.put(key, channel);
            KEY_POOL.put(channel, key);
        }
    
        /**
         * 删除 Channel
         *
         * @param key
         */
        public static void remove(String key) {
            Channel channel = CHANNEL_POOL.get(key);
            if (channel == null) {
                return;
            }
            CHANNEL_POOL.remove(key);
            KEY_POOL.remove(channel);
        }
    
        /**
         * 删除并同步关闭连接
         *
         * @param key
         */
        public static void removeAndClose(String key) {
            Channel channel = CHANNEL_POOL.get(key);
            remove(key);
            if (channel != null) {
                // 关闭连接
                try {
                    channel.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    
        public static void removeAndClose(Channel channel) {
            String key = KEY_POOL.get(channel);
            removeAndClose(key);
        }
    
        /**
         * 获得 Channel
         *
         * @param key
         * @return String
         */
        public static Channel getChannel(String key) {
            return CHANNEL_POOL.get(key);
        }
    
        /**
         * 获得 key
         *
         * @param channel
         * @return Channel
         */
        public static String getKey(Channel channel) {
            return KEY_POOL.get(channel);
        }
    
        /**
         * 判断是否存在key
         * @author lanni
         * @date 2020/9/16 10:10
         * @param key
         * @return boolean
         **/
        public static boolean hasKey(String key) {
            return CHANNEL_POOL.containsKey(key);
        }
    
        /**
         * 判断是否存在channel
         * @author lanni
         * @date 2020/10/12 9:34
         * @param channel
         * @return boolean
         **/
        public static boolean hasChannel(Channel channel) {
            return KEY_POOL.containsKey(channel);
        }
    
    }
    

    我这里是在处理器中获取到设备的id,然后交给NettyChannelManager管理,当发生异常时关闭channel并移除对应的连接信息。

    package handler;
    
    import cn.hutool.core.util.StrUtil;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import util.StringUtil;
    /**
     * @Author lanni
     * @Date 2020/8/19 23:07
     * @Description
     **/
    public class TCPServerHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            //这里msg就是从解码器中传来的数据,解码器传输过来是什么格式,这里直接转成对应的格式就可以
            byte[] src = (byte[]) msg;
            try {
                // 从数据包中拿到设备id
                byte[] deviceId = new byte[17];
                System.arraycopy(src, 4, deviceId, 0, 17);
                String devId = StrUtil.str(deviceId, CharsetUtil.UTF_8);
                // 保存channel,key
                // deviceId为空时表示设备断线重连
                if (!NettyChannelManager.hasKey(devId)) {
                    NettyChannelManager.add(devId, ctx.channel());
                }
    
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            // 当出现异常就关闭连接
            cause.printStackTrace();
            log.error("发生异常:" + cause.getMessage());
            String devId = NettyChannelManager.getKey(ctx.channel());
            if (devId == null || "".equals(devId)) {
                return;
            }
            // 删除链接信息并关闭链接
            NettyChannelManager.removeAndClose(ctx.channel());
        }
    
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            String devId = NettyChannelManager.getKey(ctx.channel());
            if (devId == null || "".equals(devId)) {
                return;
            }
            // 删除链接信息并关闭链接
            NettyChannelManager.removeAndClose(ctx.channel());
        }
    
    }
    

    现在有了这样一个对应关系之后,如果我们想给客户端主动发送消息,那么我们只需要通过客户端的id拿到对应的channel就可以在任意位置发送数据。

            // 先准备好需要发送的数据
            byte[] pkg = 
            // 通过id获取netty连接通道channel
            Channel channel = NettyChannelManager.getChannel(deviceId);
            // 封装数据
            ByteBuf buf = Unpooled.copiedBuffer(pkg);
            // 把数据写入通道并发送
            channel.writeAndFlush(buf);
    

    结语:以上所说都是在单机环境下,如果说是分布式环境的话那么关于id-channel的维护就需要修改。我们可以使用spring session来代替这里的

    NettyChannelManager,只需要几个配置就能解决分布式的问题,当然也可以有其它的方案,我在这里就不列举了。

    相关文章

      网友评论

        本文标题:netty搭建tcp服务器通信(解决粘包问题)

        本文链接:https://www.haomeiwen.com/subject/kgyluktx.html