由于后面想自己做一个rpc,所以有一个很重要的场景就是心跳。
下面就通过netty来模拟心跳。
这里实现的功能比较简单。满足以下场景
1.如果服务一段时间内,没有收到心跳,则会对客户端发送消息。
2.客户端如果是存活状态,则按一定的频率对服务端发送消息。
先说说IdleStateHandler这个类
通过这个类的构造方法,看出存在3个入参,读超时时间,写超时时间,所有的超时时间。
其实就是,在某个时间段内,没读到数据/没写到数据/既没有读也没有写到,就会有相关操作(通过发生事件的方式)。
public class IdleStateHandler extends ChannelDuplexHandler {
public IdleStateHandler(
int readerIdleTimeSeconds,
int writerIdleTimeSeconds,
int allIdleTimeSeconds) {
this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,
TimeUnit.SECONDS);
}
}
那么服务端是如何感知到多久时间内,没有读或者写数据呢?
看一下实现。在IdleStateHandler 类中有一个初始化方法,这个方法调用的位置,有注册,以及激活。当channel注册完 或者 处于激活状态,则会进行初始化。
原理也是比较简单,通过定时器轮训的方式去判断在规定时间内是否读/写数据。
private void initialize(ChannelHandlerContext ctx) {
// Avoid the case where destroy() is called before scheduling timeouts.
// See: https://github.com/netty/netty/issues/143
switch (state) {
case 1:
case 2:
return;
}
state = 1;
initOutputChanged(ctx);
lastReadTime = lastWriteTime = ticksInNanos();
if (readerIdleTimeNanos > 0) {
readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
readerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (writerIdleTimeNanos > 0) {
writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
writerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (allIdleTimeNanos > 0) {
allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
allIdleTimeNanos, TimeUnit.NANOSECONDS);
}
}
从上面代码中可以看到三个任务。其实我们看到重点都是 到最后会去发送事件
所以服务端监控到什么,比如很久时间channel处于闲置状态,想去做点什么,可以通过对事件的处理。
ReaderIdleTimeoutTask,WriterIdleTimeoutTask, AllIdleTimeoutTask
源码如下
private final class ReaderIdleTimeoutTask extends AbstractIdleTask {
ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
super(ctx);
}
@Override
protected void run(ChannelHandlerContext ctx) {
long nextDelay = readerIdleTimeNanos;
if (!reading) {
nextDelay -= ticksInNanos() - lastReadTime;
}
if (nextDelay <= 0) {
readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
boolean first = firstReaderIdleEvent;
firstReaderIdleEvent = false;
try {
发事件。=================================================
IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {
readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
}
}
}
private final class WriterIdleTimeoutTask extends AbstractIdleTask {
WriterIdleTimeoutTask(ChannelHandlerContext ctx) {
super(ctx);
}
@Override
protected void run(ChannelHandlerContext ctx) {
long lastWriteTime = IdleStateHandler.this.lastWriteTime;
long nextDelay = writerIdleTimeNanos - (ticksInNanos() - lastWriteTime);
if (nextDelay <= 0) {
writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);
boolean first = firstWriterIdleEvent;
firstWriterIdleEvent = false;
try {
if (hasOutputChanged(ctx, first)) {
return;
}
发事件。=================================================
IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first);
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {
// Write occurred before the timeout - set a new timeout with shorter delay.
writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
}
}
}
private final class AllIdleTimeoutTask extends AbstractIdleTask {
AllIdleTimeoutTask(ChannelHandlerContext ctx) {
super(ctx);
}
@Override
protected void run(ChannelHandlerContext ctx) {
long nextDelay = allIdleTimeNanos;
if (!reading) {
nextDelay -= ticksInNanos() - Math.max(lastReadTime, lastWriteTime);
}
if (nextDelay <= 0) {
allIdleTimeout = schedule(ctx, this, allIdleTimeNanos, TimeUnit.NANOSECONDS);
boolean first = firstAllIdleEvent;
firstAllIdleEvent = false;
try {
if (hasOutputChanged(ctx, first)) {
return;
}
发事件。=================================================
IdleStateEvent event = newIdleStateEvent(IdleState.ALL_IDLE, first);
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {
allIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
}
}
}
如何发送事件呢?其实也是通过传播的方式。
我们现在想自己处理了,所以最简单的方式就是实现一个channelHandler,加入到channel的channelPipeline中去即可,然后覆写fireUserEventTriggered方法。
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
ctx.fireUserEventTriggered(evt);
}
接下来还是做一个demo吧
客户端部分
定义一个channelHandler,用于回复服务端
public class CustomChannelHandler extends ChannelInboundHandlerAdapter{
private static String RECEIVE_TMP = "收到消息:%s";
private static String SEND = "在的在的";
private static String SEND_SUCCESS = "发送成功";
private static String SEND_FAIL = "发送失败";
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String info = (String)msg;
System.out.println(String.format(RECEIVE_TMP, info));
ByteBuf byteBuf = Unpooled.copiedBuffer(SEND.getBytes());
ctx.channel().writeAndFlush(byteBuf).addListener(new GenericFutureListener<ChannelPromise>() {
public void operationComplete(ChannelPromise future) throws Exception {
boolean isDone = future.isDone();
if(!isDone) {
System.out.println(SEND_FAIL);
}else {
System.out.println(SEND_SUCCESS);
}
}
});
}
}
客户端,里面用一个线程,循环去发送心跳。
public class Client {
private static final int port = 9527;
private static final String host = "127.0.0.1";
public static void main(String args[]) {
connect();
}
public static void connect() {
NioEventLoopGroup work = new NioEventLoopGroup();
Bootstrap bs = new Bootstrap();
bs.group(work);
bs.channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
ch.pipeline().addLast(new CustomChannelHandler());
ch.pipeline().addLast(new StringEncoder());
}
});
ChannelFuture cf = null;
try {
cf = bs.connect(host, port).sync();
ChannelFuture tmp = cf;
模拟心跳,频繁给服务端发送消息。
new Thread(() -> {
while (true) {
try {
Thread.sleep(1000);
tmp.channel().writeAndFlush("heart beat" + System.getProperty("line.separator"));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
cf.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
work.shutdownGracefully();
}
}
}
服务端部分
这个handler用于处理当NioSocketChannel处于闲置状态。
当处于闲置状态会给客户端连接发送消息。如果是注册中心的话,下一步实现应该就是,没有得到回复,若干次之后就会将该客户端移除出注册表。
public class HandlerHeartbeatHandler extends ChannelDuplexHandler{
private static byte[] HEART_BEAT_MSG = "滴滴滴".getBytes();
private static String SEND_SUCCESS = "发送心跳成功";
private static String SEND_FAIL = "发生心跳失败";
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
boolean matchType = evt instanceof IdleStateEvent;
if(!matchType) {
不是想要处理的世界,就继续传播吧。
ctx.fireUserEventTriggered(evt);
}
IdleStateEvent ise = (IdleStateEvent)evt;
if (ise.state() == IdleState.READER_IDLE) {
ByteBuf byteBuf = Unpooled.copiedBuffer(HEART_BEAT_MSG);
ctx.channel().writeAndFlush(byteBuf).addListener(new GenericFutureListener<ChannelPromise>() {
public void operationComplete(ChannelPromise future) throws Exception {
boolean isDone = future.isDone();
if(!isDone) {
System.out.println(SEND_FAIL);
}else {
System.out.println(SEND_SUCCESS);
}
}
});
}
if(ise.state() == IdleState.WRITER_IDLE) {
}
if(ise.state() == IdleState.ALL_IDLE) {
}
}
}
服务端.
这里需要注意的点,对于心跳的处理器,要加在childHandler中。
public class Server {
private static int port = 9527;
private static String host = "127.0.0.1";
private static int readTimeOut = 5;
private static int writeTimeOut = 0;
private static int idleTimeOut = 0;
public static void main(String args[]) {
start();
}
private static void start() {
EventLoopGroup work = new NioEventLoopGroup();
EventLoopGroup boss = new NioEventLoopGroup();
ServerBootstrap sb = new ServerBootstrap();
sb.group(work, boss).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true).childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// 一秒钟的心跳
ch.pipeline().addLast(new IdleStateHandler(readTimeOut, writeTimeOut, idleTimeOut));
ch.pipeline().addLast(new LineBasedFrameDecoder(128));
ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new HandlerHeartbeatHandler());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("收到客户端的消息" + msg);
}
});
}
});
ChannelFuture cf = null;
try {
cf = sb.bind(host, port).sync();
cf.channel().closeFuture().sync();
} catch (InterruptedException e) {
work.shutdownGracefully();
boss.shutdownGracefully();
}
}
}
进行简单的测试
先将客户端进行简单的调整,不发送消息。启用服务端,一段时间之后,服务端就会频繁发送消息。
public class Client {
private static final int port = 9527;
private static final String host = "127.0.0.1";
public static void main(String args[]) {
connect();
}
public static void connect() {
NioEventLoopGroup work = new NioEventLoopGroup();
Bootstrap bs = new Bootstrap();
bs.group(work);
bs.channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
ch.pipeline().addLast(new CustomChannelHandler());
ch.pipeline().addLast(new StringEncoder());
}
});
ChannelFuture cf = null;
try {
cf = bs.connect(host, port).sync();
ChannelFuture tmp = cf;
// new Thread(() -> {
// while (true) {
// try {
// Thread.sleep(1000);
// tmp.channel().writeAndFlush("heart beat" + System.getProperty("line.separator"));
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// }
// }).start();
cf.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
work.shutdownGracefully();
}
}
}
服务端
客户端
现在将客户端恢复。定时发送心跳
运行结果如下
服务端
服务端
客户端则没有输出任何信息。
这样子基本的功能就完成了。
网友评论