美文网首页netty
Netty实战二:Netty服务端心跳检测

Netty实战二:Netty服务端心跳检测

作者: 雪飘千里 | 来源:发表于2018-07-21 17:20 被阅读0次

业务场景:做一个netty服务端,跟设备交互,设备使用socket连接服务端。

需要的注意的地方只有两个,一是:服务端心跳检测,二是:服务端粘包处理
NettyServer

public class NettyServer {
    private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);
    ServerBootstrap serverBootstrap = new ServerBootstrap();
    EventLoopGroup boss = new NioEventLoopGroup();
    EventLoopGroup work = new NioEventLoopGroup();
   ChannelFuture future = null;
    @Resource
    private NettyConfig nettyConfig;
    @PreDestroy
    public void stop(){
        if(future!=null){
            future.channel().close().addListener(ChannelFutureListener.CLOSE);
            future.awaitUninterruptibly();
            boss.shutdownGracefully();
            work.shutdownGracefully();
            future=null;
            logger.info(" 服务关闭 ");
        }
    }
    public void start(){    
        logger.info(" nettyServer 正在启动");
        int port = nettyConfig.getPort();
        serverBootstrap.group(boss,work)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG,100)
                .handler(new LoggingHandler(LogLevel.INFO))
                .option(ChannelOption.TCP_NODELAY,true)
                .childOption(ChannelOption.SO_KEEPALIVE,true)
                .childHandler(new NettyServerInitializer());
            logger.info("netty服务器在["+port+"]端口启动监听");
        try{
            future = serverBootstrap.bind(port).sync();
            if(future.isSuccess()){
                logger.info("nettyServer 完成启动 ");
            }
            // 等待服务端监听端口关闭
            future.channel().closeFuture().sync();

        }catch (Exception e){
            logger.info("[出现异常] 释放资源,{}",e);
            boss.shutdownGracefully();
            work.shutdownGracefully();
        }finally {
            boss.shutdownGracefully();
            work.shutdownGracefully();
        }
    }
}

NettyServerInitializer


public class NettyServerInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();
        pipeline.addLast(new IdleStateHandler(
                Constants.SERVER_READ_IDEL_TIME_OUT,
                Constants.SERVER_WRITE_IDEL_TIME_OUT, 
                Constants.SERVER_ALL_IDEL_TIME_OUT,
                TimeUnit.SECONDS));
        pipeline.addLast(new AcceptorIdleStateTrigger());
        // 字符串解码 和 编码
        pipeline.addLast("decoder", new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
        pipeline.addLast("encoder", new ObjectEncoder());
        // 自己的逻辑Handler
        pipeline.addLast(new NettyServerHandler());
    }
}

AcceptorIdleStateTrigger

@ChannelHandler.Sharable
public class AcceptorIdleStateTrigger extends ChannelInboundHandlerAdapter {
    private static final Logger logger = LoggerFactory.getLogger(AcceptorIdleStateTrigger.class);

    //可以把loss_connect_time 放到AttributeMap中
    private int loss_connect_time = 0;

    private static DeviceWarnService deviceWarnService;

 @Override
    public void channelInactive(ChannelHandlerContext chc) throws  Exception{
        SocketChannel socketChannel = (SocketChannel) chc.channel();
        String clientId = NettyMap.getKeyByChannel(socketChannel);
        logger.info("----客户端设备连接断开:{}",clientId);
        if(!StringUtils.isEmpty(clientId)) {
            NettyMap.removeChannel(clientId);
            //客户端断开
            HttpUtil.syncNetworkStatus(clientId,0);
        }
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        SocketChannel socketChannel=(SocketChannel) ctx.channel();
        String clientId = NettyChannelMap.get(socketChannel);
        if(StringUtil.isEmpty(clientId)){
            return;
        }

        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            IdleState state=event.state();
            if (state==IdleState.READER_IDLE) {
                //设备离线,更改设备状态,增加离线操作日志
                this.updateDeviceStatus(clientId,0,loss_connect_time);
                loss_connect_time++;
                logger.info(clientId+"客户端离线"+loss_connect_time+"周期了");
                if(loss_connect_time>=Constants.MAX_LOSS_CONNECT_TIME){
                    //客户端断连10分钟
                    logger.info("服务器主动关闭客户端链路--"+String.valueOf(Constants.SERVER_READ_IDEL_TIME_OUT*loss_connect_time)+"s没有"+ NettyChannelMap.get(clientId)+"的信息了");
                    //发送离线短信通知客户
                    this.sendDeviceOffLineMsg(clientId,0);
                    loss_connect_time=0;
                    NettyChannelMap.remove(clientId);
                    //服务端主动关闭channel,会触发com.vendor.netty.server.NettyServerHandler.channelInactive()方法
                    ctx.channel().close();
                }
            } else {
                //复位
                logger.info(clientId+"客户端恢复连接-----");
                loss_connect_time =0;
                super.userEventTriggered(ctx,evt);
            }
        } else {
            //复位
            logger.info(clientId+"客户端恢复连接=======");
            loss_connect_time =0;
            super.userEventTriggered(ctx,evt);
        }
    }


    private void updateDeviceStatus(String clientId, int status,int lossConnectTime) {
        if(deviceWarnService==null){
            deviceWarnService = ContextUtil.getBeanByName(DeviceWarnService.class, "deviceWarnService");
        }
        String factoryDevNo= Constants.getFactoryDevNo(clientId);
        if(lossConnectTime == 0 ) {
            //只添加离线操作日志
            deviceWarnService.updateDeviceNetworkStatusAndLog(factoryDevNo, status);
        }
    }

netty的demo网上一大把,这里就不详细解释了,这里只记录实际业务中遇到的问题。

服务端心跳检测:

在这个业务中,设备使用socket连接服务端,然后会有定期的心跳,同时,服务端要检测客户端是否在线,如果不在线,则发出告警,给后台服务发送报警日志,同时给先关人员发送短信邮件。问题在于,如果设备断网或者断电后,channelInactive并不会被触发,这时就需要服务端主动监控客户端连接。
有两个方案来处理这个问题:
第一个:使用redis来实现,每次设备发起心跳,server就更新一次redis,当设备断网超过一定时间,则redis中数据失效。这时候就认为设备失联,可以发送告警。
每次server重启,从数据库中读取所有设备号,然后储存在内存中,同时启动一个线程,定时根据设备号去redis中获取数据,如果有,则认为设备在线,如果没有,则设备失联
第二个:使用IdleStateHandler来实现。
IdleStateHandler中的三个参数解释如下:
1)readerIdleTime:为读超时时间;
2)writerIdleTime:为写超时时间;
3)allIdleTime:所有类型的超时时间;
这里最重要是的readerIdleTime,当设置了readerIdleTime以后,服务端server会每隔readerIdleTime时间去检查一次channelRead方法被调用的情况,如果在readerIdleTime时间内该channel上的channelRead()方法没有被触发,就会调用userEventTriggered方法。
最终项目中采用的是IdleStateHandler来实现,因为用起来实在是太方便了。
总之,不管客户端是用什么实现的,socket netttyclient websocket,服务端想要主动检测客户端是否在线,都需要心跳,事实上,用到socket的地方,大多都要实现心跳,只要客户端有心跳,那服务端检测客户端是否在线就可以使用IdleStateHandler

粘包拆包处理:

粘包拆包的概念,这里就不重复了,项目中最开始遇到的是粘包问题,因为交互命令都很短,同时数据格式是String,所以最开始只是使用String.split()来切割命令来解决拆包。但是后来遇到一个特殊命令,上报是消息超过了1024字节,这时候就发生了拆包现象,netty默认一次性只接受1024字节的数据,如果超过了,则会拆分。这时候就用到DelimiterBasedFrameDecoder了。

TCP以流的方式进行数据传输,上层应用协议为了对消息进行区分,一半采用如下四种方式:
1、消息长度固定,累计读取到消息长度总和为定长Len的报文之后即认为是读取到了一个完整的消息。计数器归位,重新读取。
2、将回车换行符作为消息结束符。
3、将特殊的分隔符作为消息分隔符
4、通过在消息头定义长度字段来标识消息总长度。
DelimiterBasedFrameDecoder属于第三种。业务中因为设备上client是被人家的代码,消息格式都是固定的,所以第1、2、4都不行,只能使用第三种。
DelimiterBasedFrameDecoder的参数:
maxFrameLength:解码的帧的最大长度
stripDelimiter:解码时是否去掉分隔符
failFast:为true,当frame长度超过maxFrameLength时立即报TooLongFrameException异常,为false,读取完整个帧再报异常
delimiter:分隔符

Netty 服务端创建参考资料
http://www.infoq.com/cn/articles/netty-server-create

相关文章

网友评论

    本文标题:Netty实战二:Netty服务端心跳检测

    本文链接:https://www.haomeiwen.com/subject/gqzgpftx.html