前言
Netty心跳重连的代码,需要解决以下几个问题
1)ChannelPipeline中的ChannelHandlers的维护,首次连接和重连都需要对ChannelHandlers进行管理
2)重连对象的管理,也就是bootstrap对象的管理
3)重连机制编写
下面是代码实现...
一.客户端
重连机制主要是在客户端来实现,下面直接上代码
public class HeartClient {
public static void main(String[] args){
new HeartClient().init("localhost",8080);
}
public void init(String address,int port){
NioEventLoopGroup workerGroup = new NioEventLoopGroup(1);
Bootstrap bootstrap = new Bootstrap();
//触发发送心跳包的类
ConnectorIdleStateTrigger idleStateTrigger = new ConnectorIdleStateTrigger();
//netty提供的HashedWheelTimer 主要用来高效处理大量定时任务
// 且任务对时间精度要求相对不高, 比如链接超时管理等场景, 缺点是内存占用相对较高.
HashedWheelTimer timer = new HashedWheelTimer();
bootstrap.group(workerGroup).
option(ChannelOption.TCP_NODELAY,true).
channel(NioSocketChannel.class).
handler(new LoggingHandler());
//顾名思义 监视连接的类,重连机制主要靠他
ConnectionWatchdog connectionWatchdog = new ConnectionWatchdog(bootstrap,timer,address,port) {
@Override
public ChannelHandler[] handler() {
return new ChannelHandler[]{
this,
new IdleStateHandler(0, 5, 0),
new MessageEncoder(),
new MessageDecoder(1 << 20, 9, 4, 0, 0, false),
idleStateTrigger,
new HeartClientHanlder()
};
}
};
ChannelFuture future;
try {
synchronized (bootstrap){
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(connectionWatchdog.handler());
}
});
//进行连接
future = bootstrap.connect(new InetSocketAddress(address,port));
}
future.sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
主要用于重连的时候复用ChannelHandler
public interface ChannelHandlerHolder {
ChannelHandler[] handler();
}
最最关键的来了,ConnectionWatchdog 可以去观察链路是否断了,如果断了,进行循环的断线重连操作
@ChannelHandler.Sharable
public abstract class ConnectionWatchdog extends ChannelInboundHandlerAdapter implements ChannelHandlerHolder , TimerTask {
//尝试次数
private int attempts;
//bootstrap对象,重连的时候依旧需要这个对象
private Bootstrap bootstrap;
//是否重连
boolean reconnect = true;
//执行重连任务的调度器
private Timer timer;
//地址 端口号
private String address;
private int port;
public ConnectionWatchdog(Bootstrap bootstrap, Timer timer, String address, int port) {
this.bootstrap = bootstrap;
this.timer = timer;
this.address = address;
this.port = port;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("当前channel以及激活,尝试次数重置为0");
attempts=0;
ctx.fireChannelActive();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("当前channel关闭");
if (reconnect){
if (attempts<12){
attempts++;
int timeouts = 1<<attempts;
System.out.println("正在尝试重新建立连接:"+"第"+attempts+"次");
timer.newTimeout(this,timeouts, TimeUnit.SECONDS);
}
}
ctx.fireChannelInactive();
}
@Override
public void run(Timeout timeout) throws Exception {
ChannelFuture future = null;
//线程同步
synchronized (bootstrap){
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(handler());
}
});
future = bootstrap.connect(new InetSocketAddress(this.address, this.port));
}
//增加监听器 判断是否成功
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
//判断是否重新连接成功
boolean successd = channelFuture.isSuccess();
if (successd){
System.out.println("重新连接成功");
}else{
//如果重连失败,则调用ChannelInactive方法,再次出发重连事件,一直尝试12次,如果失败则不再重连
channelFuture.channel().pipeline().fireChannelInactive();
}
}
});
}
}
定时发送心跳包
@ChannelHandler.Sharable
public class ConnectorIdleStateTrigger extends ChannelInboundHandlerAdapter {
private final Message heartMsg = new Message((byte)Integer.parseInt("AF",16), System.currentTimeMillis(),"Heartbeat");
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent){
IdleStateEvent stateEvent = (IdleStateEvent) evt;
IdleState state = stateEvent.state();
if (state == IdleState.WRITER_IDLE){
ctx.writeAndFlush(heartMsg);
}
}else{
super.userEventTriggered(ctx, evt);
}
}
}
这里是客户端用于处理和服务端的通信
@ChannelHandler.Sharable
public class HeartClientHanlder extends ChannelInboundHandlerAdapter {
private Scanner scanner = new Scanner(System.in);;
private ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
private ExecutorService executorService = Executors.newFixedThreadPool(10);;
private Thread thread;
private FutureTask futureTask;
//表示业务包 非心跳包
private final int flag = Integer.parseInt("CF", 16);
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client active:"+new Date());
ctx.writeAndFlush(new Message((byte)flag,System.currentTimeMillis(),"hello,I am common client!"));
ctx.fireChannelActive();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Message message = (Message) msg;
System.out.println("from server >>>"+message.getType()+":"+message.getRequestId()+":"+message.getBody());
//这里异步执行,才能看到效果,不然线程都被scanner阻塞住了
singleThreadExecutor.execute(()->{
//这里再套一个线程,是用来处理scanner从控制台读取信息的
this.futureTask = new FutureTask<>(new MsgThread(ctx));
this.executorService.execute(futureTask);
try {
//这个get方法会阻塞线程,所有才在外面又套了一层,完成异步操作
this.futureTask.get();
} catch (Exception e) {
}
});
}
class MsgThread implements Callable {
ChannelHandlerContext ctx;
public MsgThread(ChannelHandlerContext ctx) {
this.ctx = ctx;
}
@Override
public Object call() throws Exception {
System.out.println("**********:");
ctx.writeAndFlush(new Message((byte)flag,System.currentTimeMillis(),scanner.nextLine()));
return null;
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client inactive:"+new Date());
//取消任务的执行
this.futureTask.cancel(true);
}
}
重连机制实现.png
二.服务端
首先是服务端的代码HeartServer
public class HeartServer {
public static void main(String[] args){
HeartServer server = new HeartServer();
server.init(8080);
}
public void init (int port){
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup(10);
ServerBootstrap serverBootstrap = new ServerBootstrap();
AcceptorIdleStateTrigger idleStateTrigger = new AcceptorIdleStateTrigger();
serverBootstrap.option(ChannelOption.SO_BACKLOG, 128).
group(bossGroup,workerGroup).
channel(NioServerSocketChannel.class).
childOption(ChannelOption.SO_KEEPALIVE, true).
childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new IdleStateHandler(5,0,0));
pipeline.addLast(new MessageDecoder(Integer.MAX_VALUE,9,4,0,0,false));
pipeline.addLast(new MessageEncoder());
pipeline.addLast(idleStateTrigger);
pipeline.addLast(new HeartServerHandler());
}
});
try {
ChannelFuture future = serverBootstrap.bind(port).sync();
System.out.println("server start ...");
//服务器同步连接断开时,这句代码才会往下执行
future.channel().closeFuture().sync();
System.out.println("server stop ...");
} catch (InterruptedException e) {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
e.printStackTrace();
}
}
}
HeartServerHandler代码
public class HeartServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Server Active");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Message message= (Message) msg;
if (message.getType()==(byte)Integer.parseInt("CF", 16)){
System.out.println(ctx.channel().remoteAddress()+">>>"+message.getType()+":"+message.getRequestId()+":"+message.getBody());
ctx.writeAndFlush(new Message(message.getType(),message.getRequestId(),"server time->"+new Date()));
}else if (message.getType()==(byte)Integer.parseInt("AF", 16)){
System.out.println(ctx.channel().remoteAddress()+">>>>>>>>>>>>>>>>"+message.getBody());
}
ctx.fireChannelRead(msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("已断开客户端"+ctx.channel().remoteAddress());
ctx.channel().close();
}
}
这里如果长时间没有触发读操作,就会自动和客户端断开连接,这种实现是基于IdleStateHandler来实现了,具体可以见我的上一篇文章。
@ChannelHandler.Sharable
public class AcceptorIdleStateTrigger extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent){
IdleStateEvent stateEvent = (IdleStateEvent) evt;
IdleState state = stateEvent.state();
if (state==IdleState.READER_IDLE){
throw new Exception("客户端空闲时间长,自动断开连接");
}
}else {
super.userEventTriggered(ctx,evt);
}
}
}
三.自定义的拆包粘包规则
Message
public class Message {
//消息类型
private byte type;
//请求ID
private long requestId;
//长度
private int messageLength;
//请求体
private String body;
public Message(byte type, long requestId, String body) {
this.type = type;
this.requestId = requestId;
this.messageLength = body.getBytes(Charset.forName("UTF-8")).length;
this.body = body;
}
public Message(byte type, long requestId, byte[] data) {
this.type = type;
this.requestId = requestId;
this.messageLength = data.length;
this.body = new String(data, Charset.forName("UTF-8"));
}
public byte getType() {
return type;
}
public void setType(byte type) {
this.type = type;
}
public long getRequestId() {
return requestId;
}
public void setRequestId(long requestId) {
this.requestId = requestId;
}
public int getMessageLength() {
return messageLength;
}
public void setMessageLength(int messageLength) {
this.messageLength = messageLength;
}
public String getBody() {
return body;
}
public void setBody(String body) {
this.body = body;
}
}
MessageDecoder
public class MessageDecoder extends LengthFieldBasedFrameDecoder {
private final byte HEADER_SIZE=13;
public MessageDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip, boolean failFast) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, failFast);
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
if (in == null || in.readableBytes()<HEADER_SIZE)return null;
Object res = super.decode(ctx, in);
ByteBuf processed = null;
if (res!=null && res instanceof ByteBuf){
processed = (ByteBuf) res;
}else {
throw new RuntimeException("协议异常");
}
byte type = processed.readByte();
long requestId = processed.readLong();
int length = processed.readInt();
if (processed.readableBytes()<length){
in.resetReaderIndex();
return null;
}
byte[] data = new byte[length];
processed.readBytes(data);
return new Message(type,requestId,data);
}
}
MessageEncoder
public class MessageEncoder extends MessageToByteEncoder<Message> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf byteBuf) throws Exception {
byteBuf.writeByte(message.getType());
byteBuf.writeLong(message.getRequestId());
byte[] data = message.getBody().getBytes();
byteBuf.writeInt(data.length);
byteBuf.writeBytes(data);
}
}
网友评论