美文网首页java
Netty实战六:Netty处理同一个端口上来的多条不同协议的数

Netty实战六:Netty处理同一个端口上来的多条不同协议的数

作者: 雪飘千里 | 来源:发表于2018-12-11 16:14 被阅读407次

    在实战三中,我们处理了同一个端口上来的2种不同协议的数据,项目上线后,运行良好,之后项目又需要添加一种数据协议,按照同样的方法处理再上线后,发现在网络很差的情况下,会有数据丢包现象。
    为了更加通用,针对项目进行了重构,对于netty处理也增加了不少优化。

    优化点:

    • 使用工厂模式,这样的话,就不需要好几个decoder和hander;
    • 通过创建多个nettyserver实例(监听不同端口),达到隔离不同数据协议;
    • 在decoder中存储socketChannel和协议的对应关系(HashMap),这样在handler中就可以通过channel来获取到协议类型,然后通过协议类型来创建工厂,通过工厂来处理具体数据;
    • 粘包/拆包处理,之前的处理方式因为对Netty byteBuf认识不足,所以在处理粘包时可能会对数据;

    重构之后,过两天就会上线,现在我们总共支持4种不同的数据协议(四种不同厂家的设备),就算还要继续增加,项目结构上也可以很快处理完成。

    1、Demo

    1、NettyServer.class

    package org.xxx.android.netty.server;
    import javax.annotation.PreDestroy;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.epoll.EpollChannelOption;
    import io.netty.channel.epoll.EpollEventLoopGroup;
    import io.netty.channel.epoll.EpollServerSocketChannel;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    
    /**
     * Created by zhangkai on 2018/6/11.
     * NioEventLoopGroup → EpollEventLoopGroup
       NioEventLoop → EpollEventLoop
       NioServerSocketChannel → EpollServerSocketChannel
       NioSocketChannel → EpollSocketChannel
       @Component
     */
    public class NettyServer{
        private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        EventLoopGroup boss =null;
        EventLoopGroup worker =null;
        ChannelFuture future = null;
        //厂商编码
        Integer factoryCode=null;
    
        boolean epoll=true;
        int port;
        public NettyServer(Integer fc,int port){
            this.factoryCode=fc;
            this.port=port;
        }
    
        @PreDestroy
        public void stop(){
            if(future!=null){
                future.channel().close().addListener(ChannelFutureListener.CLOSE);
                future.awaitUninterruptibly();
                boss.shutdownGracefully();
                worker.shutdownGracefully();
                future=null;
                logger.info(" 服务关闭 ");
            }
        }
        public void start(){
            logger.info(" nettyServer 正在启动");
            
            if(epoll){
                logger.info(" nettyServer 使用epoll模式");
                boss = new EpollEventLoopGroup();
                worker = new EpollEventLoopGroup();
            }
            else{
                logger.info(" nettyServer 使用nio模式");
                boss = new NioEventLoopGroup();
                worker = new NioEventLoopGroup();
            }
            
            logger.info("netty服务器在["+this.port+"]端口启动监听");
            
            serverBootstrap.group(boss,worker)
                .option(ChannelOption.SO_BACKLOG,1024)
                .option(EpollChannelOption.SO_REUSEPORT, true)
                .handler(new LoggingHandler(LogLevel.INFO))
                .option(ChannelOption.TCP_NODELAY,true)
                .childOption(ChannelOption.SO_KEEPALIVE,true)
                .childHandler(new NettyServerInitializer(this.factoryCode));
            
            if(epoll){
                serverBootstrap.channel(EpollServerSocketChannel.class);
            }else{
                serverBootstrap.channel(NioServerSocketChannel.class);
            }
            
            
            try{
                future = serverBootstrap.bind(this.port).sync();
                if(future.isSuccess()){
                    logger.info("nettyServer 完成启动 ");
                }
                // 等待服务端监听端口关闭
                future.channel().closeFuture().sync();
            }catch (Exception e){
                //boss.shutdownGracefully();
                //worker.shutdownGracefully();
                logger.info("nettyServer 启动时发生异常---------------{}",e);
                logger.info(e.getMessage());
            }finally {
                boss.shutdownGracefully();
                worker.shutdownGracefully();
            }
        }
    }
    

    2、NettyServerInitializer.class

    package org.xxx.android.netty.server;
    import java.util.concurrent.TimeUnit;
    import org.xxx.android.netty.NettyConstants;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.handler.codec.bytes.ByteArrayEncoder;
    import io.netty.handler.codec.string.StringEncoder;
    import io.netty.handler.timeout.IdleStateHandler;
    
    /**
     * Created by zhangkai on 2018/6/11.
     */
    public class NettyServerInitializer extends ChannelInitializer<SocketChannel>{
        Integer factoryCode=null;
        public NettyServerInitializer(Integer fc){
            this.factoryCode=fc;
        }
        
        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            ChannelPipeline pipeline = socketChannel.pipeline();
            pipeline.addLast(new IdleStateHandler(
                    NettyConstants.SERVER_READ_IDEL_TIME_OUT,
                    NettyConstants.SERVER_WRITE_IDEL_TIME_OUT,
                    NettyConstants.SERVER_ALL_IDEL_TIME_OUT,
                    TimeUnit.SECONDS));
            pipeline.addLast(new AcceptorIdleStateTrigger());
    
            pipeline.addLast(new StringEncoder());
            pipeline.addLast(new ByteArrayEncoder());
    
            pipeline.addLast(new NettyServerDecoder(this.factoryCode));
            pipeline.addLast(new NettyServerHandler());
        }
    }
    

    3、NettyServerDecoder.class

    package org.xxx.android.netty.server;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    import io.netty.channel.socket.SocketChannel;
    import org.xxx.android.factory.util.FactoryMap;
    import org.xxx.android.factory.util.FactoryUtil;
    import org.xxx.android.factory.util.MessageUtil;
    import org.xxx.android.factory.vo.FactoryEnum;
    import org.xxx.android.netty.delegate.DecoderDelegate;
    import org.xxx.android.netty.server.decoder.IDecoder;
    import org.xxx.android.util.DataUtil;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.ByteToMessageDecoder;
    
    public class NettyServerDecoder extends ByteToMessageDecoder {
        protected final Logger log = LoggerFactory.getLogger(getClass());
        /*
         * 记录设备登录次数
         */
        static volatile Map<Integer,Integer> timesMap=new ConcurrentHashMap<Integer,Integer>();
        /*
         * 解码器委托模式
         */
        DecoderDelegate decoderDelegate=null;
        
        Integer factoryCode=null;
        public NettyServerDecoder(Integer fc){
            this.factoryCode=fc;
            this.decoderDelegate=new DecoderDelegate();
        }
    
        @Override
        protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> list) throws Exception {
            try {
                in.retain();
                Channel channel=channelHandlerContext.channel();
                int hashCode=channel.hashCode();
                
                ByteBufToBytes reader = new ByteBufToBytes();
                byte[] byteData = reader.read(in);
                log.info("服务端接收到的原始消息为{}={}",hashCode,DataUtil.ByteArrToHexString(byteData));
                //根据通道获取厂商
                FactoryEnum channelFactory=null;
                if(this.factoryCode==null){
                    //AA、BB、CC未指名工厂,从消息中获取工厂
                    channelFactory=this.indentifyFromMsg(channel, byteData, in, list);
                }
                else{
                    channelFactory=FactoryEnum.codeOf(this.factoryCode);
                    FactoryMap.putChannelDecoder(hashCode, channelFactory.getCode());
                }
                if(channelFactory==null){
                    log.info("设备{}消息未识别",hashCode);
                    return;
                }
                //获取解码器
                IDecoder decoder=decoderDelegate.getDelegate(channelFactory);
                if(decoder==null){
                    log.info("设备{}厂商{}解码器未配置",hashCode,channelFactory.toString());
                    return;
                }
                boolean complete = decoder.decoder(hashCode, byteData, in, list);
                if (!complete) {
                    log.info("未识别出完整消息,继续接收{}", DataUtil.ByteArrToHexString(byteData));
                    return;
                }
                
            }catch (Throwable e){
                log.error("解析出错{}",e);
            }
        }
        /*
         * 从消息中获取工厂
         */
        private FactoryEnum indentifyFromMsg(Channel channel, byte[] byteData, ByteBuf in,
                List<Object> list) {
            int hashCode=channel.hashCode();
            FactoryEnum channelFactory = FactoryUtil.indentifyByChannel(channel);
            if (channelFactory==null) {
                //根据数据识别出厂商
                channelFactory= MessageUtil.getMsgType(byteData);
                if (channelFactory == null) {
                    int times=1;
                    if(timesMap.containsKey(hashCode)){
                        times=timesMap.get(hashCode)+1;
                    }
                    if(times==5){
                        log.info("设备{}已登录5次,服务器关闭连接",hashCode);
                        timesMap.remove(hashCode);
                        //关闭通道
                        channel.close();
                        return null;
                    }
                    else{
                        timesMap.put(hashCode, times);
                    }
                    //厂商未能识别,继续接收
                    in.resetReaderIndex();
                    log.info("设备{}厂商未能识别,继续接收{}", hashCode,
                            DataUtil.ByteArrToHexString(byteData));
                }
                else{
                    //在decoder中存储socketChannel和协议的对应关系
                    FactoryMap.putChannelDecoder(hashCode, channelFactory.getCode());
                }
            } else {
                timesMap.remove(hashCode);
                log.info("从通道获取厂商成功:{}={}",
                        hashCode,
                        channelFactory.toString());
            }
            return channelFactory;
        }
        
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                throws Exception {
            System.err.println("--------数据读异常----------: ");
            cause.printStackTrace();
            ctx.close();
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.flush();
            System.err.println("--------数据读取完毕----------");
        }
        
    }
    

    4、NettyServerHandler.class

    package org.xxx.android.netty.server;
    
    import org.apache.commons.lang3.StringUtils;
    import org.xxx.android.factory.IFactory;
    import org.xxx.android.factory.util.FactoryMap;
    import org.xxx.android.factory.util.FactoryUtil;
    import org.xxx.android.factory.vo.FactoryEnum;
    import org.xxx.android.netty.delegate.FactoryDelegate;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.channel.socket.SocketChannel;
    /**
     * 多线程共享
     */
    @ChannelHandler.Sharable
    public class NettyServerHandler extends ChannelInboundHandlerAdapter {
        public final Logger log = LoggerFactory.getLogger(getClass());
        /*
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            log.info("----客户端设备连接:{}", ctx);
            ctx.fireChannelActive();
        }
        */
        @Override
        public void channelInactive(ChannelHandlerContext chc) throws Exception {
            SocketChannel socketChannel = (SocketChannel) chc.channel();
    
            String clientId = FactoryMap.getDevNoByChannel(socketChannel);
            log.info("----客户端设备连接断开:{}", clientId);
            if (!StringUtils.isEmpty(clientId)) {
                FactoryMap.removeChannelByDevNo(clientId);
                FactoryMap.removeChannelDecoder(chc.channel().hashCode());
                FactoryMap.removeChannelFactory(chc.channel().hashCode());
                //客户端断开
                FactoryUtil.getFactoryService().syncNetworkStatus(clientId, 0);
            }
        }
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.flush();
            //System.err.println("--------数据读取完毕----------");
        }
        @Override
        public void channelRead(ChannelHandlerContext chc, Object message) throws Exception {
            try {
                Channel channel=chc.channel();
                //获取协议类型
                Integer channelFactory=FactoryMap.getDecoderByChannel(channel.hashCode());
                if(channelFactory==null){
                    log.info("解码器未能维护通道和工厂关系");
                    return;
                }
                FactoryEnum factoryEnum=FactoryEnum.codeOf(channelFactory);
                if (factoryEnum == null) {
                    log.info("解析消息失败,未识别消息所属厂家");
                    return;
                }
                this.factoryMessage(channel,message,factoryEnum);
                
            }catch (Exception e){
                log.error("处理业务消息失败,{}",e);
            }
        }
        void factoryMessage(Channel channel, Object msg,FactoryEnum factoryEnum) {
            //处理消息
            /*
            byte[] data = (byte[])message;
            log.info("{}{}接收到通道{}的原始消息=={}",
                    factoryEnum.getTitle(),
                    NettyMap.getDevNoByChannel(socketChannel),
                    socketChannel.hashCode(),
                    DataUtil.bytesToHexString(data));
            */
    
            IFactory factory=FactoryMap.getFactoryByChannel(channel.hashCode());
            SocketChannel socketChannel = (SocketChannel) channel;
            if(factory==null){
                //委托模式创建工厂
                factory = FactoryDelegate.createFactory(factoryEnum);
                //对接收到的消息进行处理
                factory.processMessage(socketChannel,msg);
                FactoryMap.putChannelFactory(socketChannel.hashCode(), factory);
            }
            else{
                //对接收到的消息进行处理
                factory.processMessage(socketChannel,msg);
            }
            log.info("{}={}",socketChannel.hashCode(),factory.getFactoryDevNo());
        }
    }
    

    5、SpringbootApplication.class

    package org.xxx.android;
    
    import org.xxx.android.netty.server.NettyServer;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.boot.context.embedded.ConfigurableEmbeddedServletContainer;
    import org.springframework.boot.context.embedded.EmbeddedServletContainerCustomizer;
    import org.springframework.data.jpa.repository.config.EnableJpaAuditing;
    import org.springframework.scheduling.annotation.EnableScheduling;
    /**
    * 该注解指定项目为springboot,由此类当作程序入口
    * 自动装配 web 依赖的环境
    **/
    //@Slf4j
    @EnableJpaAuditing
    @EnableScheduling
    @SpringBootApplication
    public class SpringbootApplication implements CommandLineRunner,EmbeddedServletContainerCustomizer{
       @Value("${server.port}")
       int serverPort;
       
    
       @Value("${netty.startup}")
       int startupStartup;
       // 注入NettyServer
       @Autowired NettyServer nettyServer;
       @Autowired NettyServer yyNettyServer;
       
       public static void main(String[] args) {
           SpringApplication.run(SpringbootApplication.class, args);
       }
       @Override
       public void customize(ConfigurableEmbeddedServletContainer container) {
           container.setPort(serverPort);
       }
       @Override
       public void run(String... strings) {
           this.startNettyServer();
        }
        void startNettyServer() {
            if(startupStartup==1){
                this.nettyThreadStart(nettyServer);
                this.nettyThreadStart(yyNettyServer);
                Runtime.getRuntime().addShutdownHook(new Thread(){
                     @Override
                     public void run(){
                         stopNettyServer();
                     }
                });
           }
        }
        void stopNettyServer() {
            nettyServer.stop();
            yyNettyServer.stop();
        }
        void nettyThreadStart(final NettyServer ns) {
            Thread thread = new Thread(new Runnable(){
                @Override
                public void run() {
                    ns.start();
                }
            });
            thread.start();
        }
    }
    

    2、粘包/拆包解决思路

    基本思路就是不断从TCP缓冲区中读取数据,每次读取完都需要判断是否是一个完整的数据包;
    若当前读取的数据不足以拼接成一个完整的业务数据包,那就保留该数据,继续从tcp缓冲区中读取,直到得到一个完整的数据包

    • 定长——FixedLengthFrameDecoder
    • 分隔符——DelimiterBasedFrameDecoder
    • 基于长度的变长包——LengthFieldBasedFrameDecoder

    若当前读到的数据加上已经读取的数据足够拼接成一个数据包,那就将已经读取的数据拼接上本次读取的数据,够成一个完整的业务数据包传递到业务逻辑,多余的数据仍然保留,以便和下次读到的数据尝试拼接

    这里最重要的是就是,使用markReaderIndex标记读索引,使的多余的数据保留,继续等待后面的数据

            //BB数据
            //判断心跳
            if(isXyHeart(byteData)){
                list.add(byteData);
                return true;
            }
    
            //判断是否是开头
            if(isXYMsgHeader(byteData)){
                headIndexMap.remove(hashCode);
                int length = DataUtil.byteToInt(byteData[XyConstant.BUSINESS_RSP_MSG_FIELD.LEN.INDEX]);
                //整包
                if(length == byteData.length){
                    //判断校验和
                    if(!isCheckNum(byteData,length)){
                        log.error("兴元数据包校验和不通过{}!={}==={}",byteData[length-1]& FactoryConstant.BYTE_MASK, XyBusinessReqMsgUtil.getCheckSum(byteData)&FactoryConstant.BYTE_MASK,DataUtil.ByteArrayToString(byteData));
                    }else {
                        list.add(byteData);
                        return true;
                    }
                }
                if(length > byteData.length){
                    //半包,继续接收
                    in.resetReaderIndex();
                    return false;
                }
                if(length < byteData.length){
                    log.info("粘包=====接收数据大于帧长度{}>{}",byteData.toString(),length);
                    return dealStickyPackage(in, list, length);
                }
    
        public boolean dealStickyPackage(ByteBuf in, List<Object> list, int length) {
            //粘包,重置读索引
            in.resetReaderIndex();
            byte[] bytes = new byte[length];
            in.readBytes(bytes);
            //已接收到的完整包数据传给handler去处理
            list.add(bytes);
            //标记读索引,相当于清除当前读索引readIndex之前的数据
            //剩下的数据就是下一条数据的开头,继续等待接收
            in.markReaderIndex();
            return false;
        }
    

    相关文章

      网友评论

        本文标题:Netty实战六:Netty处理同一个端口上来的多条不同协议的数

        本文链接:https://www.haomeiwen.com/subject/nhixcqtx.html