美文网首页java 编程
Netty 模拟心跳通信

Netty 模拟心跳通信

作者: 真海ice | 来源:发表于2018-02-22 15:49 被阅读0次

    在长连接场景下有很多发送心跳测试的需求,如服务注册与实现等。

    一 、服务器端

    ServerNetty:心跳服务端
    ServerHeartBeatHandler:处理某个客户端的心跳通信

    package com.test.thread.netty.heartBeat;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    
    import com.test.thread.netty.MarshallingCodefactory;
    import com.test.thread.utils.Constant;
    
    /**
     * netty 模拟发送心跳的服务器端
     * @author zhb
     */
    public class ServerNetty {
        
        private int port;
        
        public ServerNetty(int port){
            this.port = port;
        }
        
        // netty 服务端启动
        public void action() throws InterruptedException{
            
            // 用来接收进来的连接
            EventLoopGroup bossGroup = new NioEventLoopGroup(); 
            // 用来处理已经被接收的连接,一旦bossGroup接收到连接,就会把连接信息注册到workerGroup上
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            
            try {
                // nio服务的启动类
                ServerBootstrap sbs = new ServerBootstrap();
                // 配置nio服务参数
                sbs.group(bossGroup, workerGroup)
                   .channel(NioServerSocketChannel.class) // 说明一个新的Channel如何接收进来的连接
                   .option(ChannelOption.SO_BACKLOG, 128) // tcp最大缓存链接个数
                   .childOption(ChannelOption.SO_KEEPALIVE, true) //保持连接
                   .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            
                            // marshalling 序列化对象的解码
                            socketChannel.pipeline().addLast(MarshallingCodefactory.buildDecoder());
                            // marshalling 序列化对象的编码
                            socketChannel.pipeline().addLast(MarshallingCodefactory.buildEncoder());
                            
                            // 处理接收到的请求
                            socketChannel.pipeline().addLast(new ServerHeartBeatHandler()); // 这里相当于过滤器,可以配置多个
                        }
                   });
                
                System.err.println("server 开启--------------");
                // 绑定端口,开始接受链接
                ChannelFuture cf = sbs.bind(port).sync();
                
                // 等待服务端口的关闭;在这个例子中不会发生,但你可以优雅实现;关闭你的服务
                cf.channel().closeFuture().sync();
            } finally{
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }       
        }
            
        // 开启netty服务线程
        public static void main(String[] args) throws InterruptedException {
            new ServerNetty(Constant.serverSocketPort).action();
        }   
        
    }
    
    
    package com.test.thread.netty.heartBeat;
    
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    
    import java.util.HashMap;
    
    /**
     * 处理具体的一个客户端的心跳处理
     * @author zhb
     */
    public class ServerHeartBeatHandler extends ChannelInboundHandlerAdapter {
    
        // 允许接入的认证信息
        private static HashMap<String, Object> auth_map = new HashMap<String, Object>();
        
        private static final String SUCCESS_KEY = "auth_success_key";
        
        static{
            // 可配置多个
            auth_map.put("127.0.0.1", "1234");
            // 192.168.56.1
            auth_map.put("192.168.56.1", "1234");
        }
        
        public void channelRead(ChannelHandlerContext ctx, Object msg){
            System.out.println("--------------------------------------------");
            // 如果信息是字符串类型,就去做认证
            if(msg instanceof String){
                auth(ctx, msg);
            // 认证通过后的心跳信息   
            }else if(msg instanceof RequestInfo){
                
                handlerHeartBeatInfo(ctx, msg);
            }else{
                ctx.writeAndFlush("info error!").addListener(ChannelFutureListener.CLOSE);
            }
        }
    
        /**
         * 处理检测客户端心跳信息
         * @param msg
         * @param msg2 
         */
        private void handlerHeartBeatInfo(ChannelHandlerContext ctx, Object msg) {
            
            RequestInfo info = (RequestInfo) msg;
            System.out.println("--------------------------------------------");
            System.out.println("当前主机ip为: " + info.getIp());
            System.out.println("当前主机cpu情况: ");
            HashMap<String, Object> cpu = info.getCpuPercMap();
            System.out.println("总使用率: " + cpu.get("combined"));
            System.out.println("用户使用率: " + cpu.get("user"));
            System.out.println("系统使用率: " + cpu.get("sys"));
            System.out.println("等待率: " + cpu.get("wait"));
            System.out.println("空闲率: " + cpu.get("idle"));
            
            System.out.println("当前主机memory情况: ");
            HashMap<String, Object> memory = info.getMemoryMap();
            System.out.println("内存总量: " + memory.get("total"));
            System.out.println("当前内存使用量: " + memory.get("used"));
            System.out.println("当前内存剩余量: " + memory.get("free"));
            // 返回心跳信息接收成功   
            ctx.writeAndFlush("info received!");
        }
        
        /**
         * 验证客户端的信息
         * @param ctx
         * @param msg
         */
        private void auth(ChannelHandlerContext ctx, Object msg) {
            
            String[]  authStr = ((String)msg).split(",");   
            String authKey = (String) auth_map.get(authStr[0]);     
            // 请求的ip对应的key和服务端的是否一致
            if(authKey != null && authKey.equals(authStr[1])){
                ctx.writeAndFlush(SUCCESS_KEY);
            }else{
                // 返回认证失败
                ctx.writeAndFlush("auth failure !").addListener(ChannelFutureListener.CLOSE);
            }
        }
            
        // 数据读取完毕的处理
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            System.err.println("服务端读取数据完毕");
        }
        
        // 出现异常的处理
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            System.err.println("server 读取数据出现异常");
            ctx.close();
        }
            
    }
    
    

    二、客户端

    ClientNetty:心跳测试客户端
    ClientHeartBeatHandler:客户端心跳测试处理类
    RequestInfo:发送心跳信息的实体类
    HeartBeatTask:发送心跳信息任务类

    package com.test.thread.netty.heartBeat;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    
    import java.io.UnsupportedEncodingException;
    
    import com.test.thread.netty.MarshallingCodefactory;
    import com.test.thread.utils.Constant;
    
    /**
     * 客户端发送请求
     * @author zhb
     *
     */
    public class ClientNetty {
        
        // 要请求的服务器的ip地址
        private String ip;
        // 服务器的端口
        private int port;
        
        public ClientNetty(String ip, int port){
            this.ip = ip;
            this.port = port;
        }
        
        // 请求端主题
        private void action() throws InterruptedException, UnsupportedEncodingException {
            
            EventLoopGroup bossGroup = new NioEventLoopGroup();     
            Bootstrap bs = new Bootstrap();
            
            bs.group(bossGroup)
              .channel(NioSocketChannel.class)
              .option(ChannelOption.SO_KEEPALIVE, true)
              .handler(new ChannelInitializer<SocketChannel>() {
                  @Override
                  protected void initChannel(SocketChannel socketChannel) throws Exception {
                      
                        // marshalling 序列化对象的解码
                        socketChannel.pipeline().addLast(MarshallingCodefactory.buildDecoder());
                        // marshalling 序列化对象的编码
                        socketChannel.pipeline().addLast(MarshallingCodefactory.buildEncoder());
                      
                        // 处理来自服务端的响应信息
                        socketChannel.pipeline().addLast(new ClientHeartBeatHandler());
                  }
             });
            
            // 客户端开启
            ChannelFuture cf = bs.connect(ip, port).sync();     
            // 等待直到连接中断
            cf.channel().closeFuture().sync();      
            bossGroup.shutdownGracefully();
        }
            
        public static void main(String[] args) throws UnsupportedEncodingException, InterruptedException {
            new ClientNetty("127.0.0.1", Constant.serverSocketPort).action();
        }
            
    }
    
    
    package com.test.thread.netty.heartBeat;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.util.ReferenceCountUtil;
    
    import java.net.InetAddress;
    import java.net.UnknownHostException;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.ScheduledFuture;
    import java.util.concurrent.TimeUnit;
    /**
     * 处理客户端的心跳测试
     * @author zhb
     *
     */
    public class ClientHeartBeatHandler extends ChannelInboundHandlerAdapter {
        
        private InetAddress addr;
        
        private static final String SUCCESS_KEY = "auth_success_key";
        
        private ScheduledFuture<?> heartBeat;
        
        // 心跳发送执行
        private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); 
    
        //当channel已激活就执行的方法
        public void channelActive(ChannelHandlerContext ctx) throws UnknownHostException{
            
            
            addr = InetAddress.getLocalHost();
            String ip = addr.getHostAddress();
            String key = "1234";
            
            // 相当于认证证书
            String auth = ip + "," + key;
            System.err.println("client 发送认证给你信息: " + auth);
            // 首先向服务器发送认证
            ctx.writeAndFlush(auth);
            
        }
        
        // 读取服务端响应的信息
        public void channelRead(ChannelHandlerContext ctx, Object msg){
            try {
                if(msg instanceof String){
                    System.err.println("client 收到服务端的响应信息:" +(String)msg);
                    if(((String)msg).equals(SUCCESS_KEY)){
                        // 如果服务器端认证成功,开始执行心跳信息 每5秒执行一次
                        heartBeat = scheduler.scheduleWithFixedDelay(new HeartBeatTask(ctx, addr), 0, 5, TimeUnit.SECONDS);
                    }
                }else{
                    System.err.println(msg);
                }
            } finally {
                // 没有返回信息,要释放msg
                ReferenceCountUtil.safeRelease(msg);
            }
        }
    
        
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            System.err.println("服务端出现异常");
            
            cause.printStackTrace();
            if (heartBeat != null) {
                heartBeat.cancel(true);
                heartBeat = null;
            }
            ctx.fireExceptionCaught(cause);
        }   
    
        // 数据读取完毕的处理
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            System.err.println("服务端读取数据完毕");
        }
            
    }
    
    
    package com.test.thread.netty.heartBeat;
    
    import java.net.InetAddress;
    import java.util.HashMap;
    
    import org.hyperic.sigar.CpuPerc;
    import org.hyperic.sigar.Mem;
    import org.hyperic.sigar.Sigar;
    import org.hyperic.sigar.SigarException;
    
    import io.netty.channel.ChannelHandlerContext;
    /**
     * 客户端发送一个心跳信息的线程
     * @author zhb
     *
     */
    public class HeartBeatTask implements Runnable {
        
        private ChannelHandlerContext ctx;  
        private InetAddress addr;
        
        public HeartBeatTask(ChannelHandlerContext ctx, InetAddress addr){
            this.ctx = ctx;
            this.addr = addr;
        }
    
        @Override
        public void run() {     
            try {
                RequestInfo reqInfo = new RequestInfo();
                reqInfo.setIp(addr.getHostAddress());
                
                // 该类可以自行检查本地的系统相关参数,具体信息可以百度
                Sigar sigar = new Sigar();
                //cpu prec
                CpuPerc cpuPerc = sigar.getCpuPerc();
                HashMap<String, Object> cpuPercMap = new HashMap<String, Object>();
                cpuPercMap.put("combined", cpuPerc.getCombined());
                cpuPercMap.put("user", cpuPerc.getUser());
                cpuPercMap.put("sys", cpuPerc.getSys());
                cpuPercMap.put("wait", cpuPerc.getWait());
                cpuPercMap.put("idle", cpuPerc.getIdle());
                
                // memory
                Mem mem = sigar.getMem();
                HashMap<String, Object> memoryMap = new HashMap<String, Object>();
                memoryMap.put("total", mem.getTotal() / 1024L);
                memoryMap.put("used", mem.getUsed() / 1024L);
                memoryMap.put("free", mem.getFree() / 1024L);
                
                reqInfo.setCpuPercMap(cpuPercMap);
                reqInfo.setMemoryMap(memoryMap);
                // 发送心跳信息
                ctx.writeAndFlush(reqInfo);
                
            } catch (SigarException e) {
                e.printStackTrace();
            }
            
        }
        
    
    }
    
    
    package com.test.thread.netty.heartBeat; 
    
    import java.io.Serializable;
    import java.util.HashMap;
    /**
     * 心跳信息实体类
     * @author zhb
     * 
     */
    public class RequestInfo implements Serializable {
    
        private String ip ;
        // cpu的一些信息
        private HashMap<String, Object> cpuPercMap ;
        // 内存的一些信息
        private HashMap<String, Object> memoryMap;
        //.. other field
        
        public String getIp() {
            return ip;
        }
        public void setIp(String ip) {
            this.ip = ip;
        }
        public HashMap<String, Object> getCpuPercMap() {
            return cpuPercMap;
        }
        public void setCpuPercMap(HashMap<String, Object> cpuPercMap) {
            this.cpuPercMap = cpuPercMap;
        }
        public HashMap<String, Object> getMemoryMap() {
            return memoryMap;
        }
        public void setMemoryMap(HashMap<String, Object> memoryMap) {
            this.memoryMap = memoryMap;
        }   
        
    }
    
    

    相关文章

      网友评论

        本文标题:Netty 模拟心跳通信

        本文链接:https://www.haomeiwen.com/subject/sykztftx.html