在长连接场景下有很多发送心跳测试的需求,如服务注册与实现等。
一 、服务器端
ServerNetty:心跳服务端
ServerHeartBeatHandler:处理某个客户端的心跳通信
package com.test.thread.netty.heartBeat;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import com.test.thread.netty.MarshallingCodefactory;
import com.test.thread.utils.Constant;
/**
* netty 模拟发送心跳的服务器端
* @author zhb
*/
public class ServerNetty {
private int port;
public ServerNetty(int port){
this.port = port;
}
// netty 服务端启动
public void action() throws InterruptedException{
// 用来接收进来的连接
EventLoopGroup bossGroup = new NioEventLoopGroup();
// 用来处理已经被接收的连接,一旦bossGroup接收到连接,就会把连接信息注册到workerGroup上
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// nio服务的启动类
ServerBootstrap sbs = new ServerBootstrap();
// 配置nio服务参数
sbs.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // 说明一个新的Channel如何接收进来的连接
.option(ChannelOption.SO_BACKLOG, 128) // tcp最大缓存链接个数
.childOption(ChannelOption.SO_KEEPALIVE, true) //保持连接
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// marshalling 序列化对象的解码
socketChannel.pipeline().addLast(MarshallingCodefactory.buildDecoder());
// marshalling 序列化对象的编码
socketChannel.pipeline().addLast(MarshallingCodefactory.buildEncoder());
// 处理接收到的请求
socketChannel.pipeline().addLast(new ServerHeartBeatHandler()); // 这里相当于过滤器,可以配置多个
}
});
System.err.println("server 开启--------------");
// 绑定端口,开始接受链接
ChannelFuture cf = sbs.bind(port).sync();
// 等待服务端口的关闭;在这个例子中不会发生,但你可以优雅实现;关闭你的服务
cf.channel().closeFuture().sync();
} finally{
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
// 开启netty服务线程
public static void main(String[] args) throws InterruptedException {
new ServerNetty(Constant.serverSocketPort).action();
}
}
package com.test.thread.netty.heartBeat;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.HashMap;
/**
* 处理具体的一个客户端的心跳处理
* @author zhb
*/
public class ServerHeartBeatHandler extends ChannelInboundHandlerAdapter {
// 允许接入的认证信息
private static HashMap<String, Object> auth_map = new HashMap<String, Object>();
private static final String SUCCESS_KEY = "auth_success_key";
static{
// 可配置多个
auth_map.put("127.0.0.1", "1234");
// 192.168.56.1
auth_map.put("192.168.56.1", "1234");
}
public void channelRead(ChannelHandlerContext ctx, Object msg){
System.out.println("--------------------------------------------");
// 如果信息是字符串类型,就去做认证
if(msg instanceof String){
auth(ctx, msg);
// 认证通过后的心跳信息
}else if(msg instanceof RequestInfo){
handlerHeartBeatInfo(ctx, msg);
}else{
ctx.writeAndFlush("info error!").addListener(ChannelFutureListener.CLOSE);
}
}
/**
* 处理检测客户端心跳信息
* @param msg
* @param msg2
*/
private void handlerHeartBeatInfo(ChannelHandlerContext ctx, Object msg) {
RequestInfo info = (RequestInfo) msg;
System.out.println("--------------------------------------------");
System.out.println("当前主机ip为: " + info.getIp());
System.out.println("当前主机cpu情况: ");
HashMap<String, Object> cpu = info.getCpuPercMap();
System.out.println("总使用率: " + cpu.get("combined"));
System.out.println("用户使用率: " + cpu.get("user"));
System.out.println("系统使用率: " + cpu.get("sys"));
System.out.println("等待率: " + cpu.get("wait"));
System.out.println("空闲率: " + cpu.get("idle"));
System.out.println("当前主机memory情况: ");
HashMap<String, Object> memory = info.getMemoryMap();
System.out.println("内存总量: " + memory.get("total"));
System.out.println("当前内存使用量: " + memory.get("used"));
System.out.println("当前内存剩余量: " + memory.get("free"));
// 返回心跳信息接收成功
ctx.writeAndFlush("info received!");
}
/**
* 验证客户端的信息
* @param ctx
* @param msg
*/
private void auth(ChannelHandlerContext ctx, Object msg) {
String[] authStr = ((String)msg).split(",");
String authKey = (String) auth_map.get(authStr[0]);
// 请求的ip对应的key和服务端的是否一致
if(authKey != null && authKey.equals(authStr[1])){
ctx.writeAndFlush(SUCCESS_KEY);
}else{
// 返回认证失败
ctx.writeAndFlush("auth failure !").addListener(ChannelFutureListener.CLOSE);
}
}
// 数据读取完毕的处理
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.err.println("服务端读取数据完毕");
}
// 出现异常的处理
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.err.println("server 读取数据出现异常");
ctx.close();
}
}
二、客户端
ClientNetty:心跳测试客户端
ClientHeartBeatHandler:客户端心跳测试处理类
RequestInfo:发送心跳信息的实体类
HeartBeatTask:发送心跳信息任务类
package com.test.thread.netty.heartBeat;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.io.UnsupportedEncodingException;
import com.test.thread.netty.MarshallingCodefactory;
import com.test.thread.utils.Constant;
/**
* 客户端发送请求
* @author zhb
*
*/
public class ClientNetty {
// 要请求的服务器的ip地址
private String ip;
// 服务器的端口
private int port;
public ClientNetty(String ip, int port){
this.ip = ip;
this.port = port;
}
// 请求端主题
private void action() throws InterruptedException, UnsupportedEncodingException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
Bootstrap bs = new Bootstrap();
bs.group(bossGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// marshalling 序列化对象的解码
socketChannel.pipeline().addLast(MarshallingCodefactory.buildDecoder());
// marshalling 序列化对象的编码
socketChannel.pipeline().addLast(MarshallingCodefactory.buildEncoder());
// 处理来自服务端的响应信息
socketChannel.pipeline().addLast(new ClientHeartBeatHandler());
}
});
// 客户端开启
ChannelFuture cf = bs.connect(ip, port).sync();
// 等待直到连接中断
cf.channel().closeFuture().sync();
bossGroup.shutdownGracefully();
}
public static void main(String[] args) throws UnsupportedEncodingException, InterruptedException {
new ClientNetty("127.0.0.1", Constant.serverSocketPort).action();
}
}
package com.test.thread.netty.heartBeat;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
* 处理客户端的心跳测试
* @author zhb
*
*/
public class ClientHeartBeatHandler extends ChannelInboundHandlerAdapter {
private InetAddress addr;
private static final String SUCCESS_KEY = "auth_success_key";
private ScheduledFuture<?> heartBeat;
// 心跳发送执行
private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
//当channel已激活就执行的方法
public void channelActive(ChannelHandlerContext ctx) throws UnknownHostException{
addr = InetAddress.getLocalHost();
String ip = addr.getHostAddress();
String key = "1234";
// 相当于认证证书
String auth = ip + "," + key;
System.err.println("client 发送认证给你信息: " + auth);
// 首先向服务器发送认证
ctx.writeAndFlush(auth);
}
// 读取服务端响应的信息
public void channelRead(ChannelHandlerContext ctx, Object msg){
try {
if(msg instanceof String){
System.err.println("client 收到服务端的响应信息:" +(String)msg);
if(((String)msg).equals(SUCCESS_KEY)){
// 如果服务器端认证成功,开始执行心跳信息 每5秒执行一次
heartBeat = scheduler.scheduleWithFixedDelay(new HeartBeatTask(ctx, addr), 0, 5, TimeUnit.SECONDS);
}
}else{
System.err.println(msg);
}
} finally {
// 没有返回信息,要释放msg
ReferenceCountUtil.safeRelease(msg);
}
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.err.println("服务端出现异常");
cause.printStackTrace();
if (heartBeat != null) {
heartBeat.cancel(true);
heartBeat = null;
}
ctx.fireExceptionCaught(cause);
}
// 数据读取完毕的处理
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.err.println("服务端读取数据完毕");
}
}
package com.test.thread.netty.heartBeat;
import java.net.InetAddress;
import java.util.HashMap;
import org.hyperic.sigar.CpuPerc;
import org.hyperic.sigar.Mem;
import org.hyperic.sigar.Sigar;
import org.hyperic.sigar.SigarException;
import io.netty.channel.ChannelHandlerContext;
/**
* 客户端发送一个心跳信息的线程
* @author zhb
*
*/
public class HeartBeatTask implements Runnable {
private ChannelHandlerContext ctx;
private InetAddress addr;
public HeartBeatTask(ChannelHandlerContext ctx, InetAddress addr){
this.ctx = ctx;
this.addr = addr;
}
@Override
public void run() {
try {
RequestInfo reqInfo = new RequestInfo();
reqInfo.setIp(addr.getHostAddress());
// 该类可以自行检查本地的系统相关参数,具体信息可以百度
Sigar sigar = new Sigar();
//cpu prec
CpuPerc cpuPerc = sigar.getCpuPerc();
HashMap<String, Object> cpuPercMap = new HashMap<String, Object>();
cpuPercMap.put("combined", cpuPerc.getCombined());
cpuPercMap.put("user", cpuPerc.getUser());
cpuPercMap.put("sys", cpuPerc.getSys());
cpuPercMap.put("wait", cpuPerc.getWait());
cpuPercMap.put("idle", cpuPerc.getIdle());
// memory
Mem mem = sigar.getMem();
HashMap<String, Object> memoryMap = new HashMap<String, Object>();
memoryMap.put("total", mem.getTotal() / 1024L);
memoryMap.put("used", mem.getUsed() / 1024L);
memoryMap.put("free", mem.getFree() / 1024L);
reqInfo.setCpuPercMap(cpuPercMap);
reqInfo.setMemoryMap(memoryMap);
// 发送心跳信息
ctx.writeAndFlush(reqInfo);
} catch (SigarException e) {
e.printStackTrace();
}
}
}
package com.test.thread.netty.heartBeat;
import java.io.Serializable;
import java.util.HashMap;
/**
* 心跳信息实体类
* @author zhb
*
*/
public class RequestInfo implements Serializable {
private String ip ;
// cpu的一些信息
private HashMap<String, Object> cpuPercMap ;
// 内存的一些信息
private HashMap<String, Object> memoryMap;
//.. other field
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public HashMap<String, Object> getCpuPercMap() {
return cpuPercMap;
}
public void setCpuPercMap(HashMap<String, Object> cpuPercMap) {
this.cpuPercMap = cpuPercMap;
}
public HashMap<String, Object> getMemoryMap() {
return memoryMap;
}
public void setMemoryMap(HashMap<String, Object> memoryMap) {
this.memoryMap = memoryMap;
}
}
网友评论