客户端断线重连机制。
客户端数量多,且需要传递的数据量级较大。可以周期性的发送数据的时候,使用。要求对数据的即时性不高的时候,才可使用。
优点: 可以使用数据缓存。不是每条数据进行一次数据交互。可以定时回收资源,对资源利用率高。相对来说,即时性可以通过其他方式保证。如: 120秒自动断线。数据变化1000次请求服务器一次。300秒中自动发送不足1000次的变化数据。
image.png
/**
* 1. 双线程组
* 2. Bootstrap配置启动信息
* 3. 注册业务处理Handler
* 4. 绑定服务监听端口并启动服务
*/
package com.bjsxt.socket.netty.timer;
import com.bjsxt.socket.utils.SerializableFactory4Marshalling;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
public class Server4Timer {
// 监听线程组,监听客户端请求
private EventLoopGroup acceptorGroup = null;
// 处理客户端相关操作线程组,负责处理与客户端的数据通讯
private EventLoopGroup clientGroup = null;
// 服务启动相关配置信息
private ServerBootstrap bootstrap = null;
public Server4Timer(){
init();
}
private void init(){
acceptorGroup = new NioEventLoopGroup();
clientGroup = new NioEventLoopGroup();
bootstrap = new ServerBootstrap();
// 绑定线程组
bootstrap.group(acceptorGroup, clientGroup);
// 设定通讯模式为NIO
bootstrap.channel(NioServerSocketChannel.class);
// 设定缓冲区大小
bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
// SO_SNDBUF发送缓冲区,SO_RCVBUF接收缓冲区,SO_KEEPALIVE开启心跳监测(保证连接有效)
bootstrap.option(ChannelOption.SO_SNDBUF, 16*1024)
.option(ChannelOption.SO_RCVBUF, 16*1024)
.option(ChannelOption.SO_KEEPALIVE, true);
// 增加日志Handler,日志级别为info
// bootstrap.handler(new LoggingHandler(LogLevel.INFO));
}
public ChannelFuture doAccept(int port) throws InterruptedException{
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingDecoder());
ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingEncoder());
// 定义一个定时断线处理器,当多长时间内,没有任何的可读取数据,自动断开连接。
// 构造参数,就是间隔时长。 默认的单位是秒。
// 自定义间隔时长单位。 new ReadTimeoutHandler(long times, TimeUnit unit);
ch.pipeline().addLast(new ReadTimeoutHandler(3));
ch.pipeline().addLast(new Server4TimerHandler());
}
});
ChannelFuture future = bootstrap.bind(port).sync();
return future;
}
public void release(){
this.acceptorGroup.shutdownGracefully();
this.clientGroup.shutdownGracefully();
}
public static void main(String[] args){
ChannelFuture future = null;
Server4Timer server = null;
try{
server = new Server4Timer();
future = server.doAccept(9999);
System.out.println("server started.");
future.channel().closeFuture().sync();
}catch(InterruptedException e){
e.printStackTrace();
}finally{
if(null != future){
try {
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if(null != server){
server.release();
}
}
}
}
/**
* @Sharable注解 -
* 代表当前Handler是一个可以分享的处理器。也就意味着,服务器注册此Handler后,可以分享给多个客户端同时使用。
* 如果不使用注解描述类型,则每次客户端请求时,必须为客户端重新创建一个新的Handler对象。
*
*/
package com.bjsxt.socket.netty.timer;
import com.bjsxt.socket.utils.ResponseMessage;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
@Sharable
public class Server4TimerHandler extends ChannelHandlerAdapter {
// 业务处理逻辑
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("from client : ClassName - " + msg.getClass().getName()
+ " ; message : " + msg.toString());
ResponseMessage response = new ResponseMessage(0L, "test response");
ctx.writeAndFlush(response);
}
// 异常处理逻辑
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("server exceptionCaught method run...");
// cause.printStackTrace();
ctx.close();
}
}
/**
* 1. 单线程组
* 2. Bootstrap配置启动信息
* 3. 注册业务处理Handler
* 4. connect连接服务,并发起请求
*/
package com.bjsxt.socket.netty.timer;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import com.bjsxt.socket.utils.RequestMessage;
import com.bjsxt.socket.utils.SerializableFactory4Marshalling;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.WriteTimeoutHandler;
public class Client4Timer {
// 处理请求和处理服务端响应的线程组
private EventLoopGroup group = null;
// 服务启动相关配置信息
private Bootstrap bootstrap = null;
private ChannelFuture future = null;
public Client4Timer(){
init();
}
private void init(){
group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
// 绑定线程组
bootstrap.group(group);
// 设定通讯模式为NIO
bootstrap.channel(NioSocketChannel.class);
// bootstrap.handler(new LoggingHandler(LogLevel.INFO));
}
public void setHandlers() throws InterruptedException{
this.bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingDecoder());
ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingEncoder());
// 写操作自定断线。 在指定时间内,没有写操作,自动断线。
ch.pipeline().addLast(new WriteTimeoutHandler(3));
ch.pipeline().addLast(new Client4TimerHandler());
}
});
}
public ChannelFuture getChannelFuture(String host, int port) throws InterruptedException{
if(future == null){
future = this.bootstrap.connect(host, port).sync();
}
if(!future.channel().isActive()){
future = this.bootstrap.connect(host, port).sync();
}
return future;
}
public void release(){
this.group.shutdownGracefully();
}
public static void main(String[] args) {
Client4Timer client = null;
ChannelFuture future = null;
try{
client = new Client4Timer();
client.setHandlers();
future = client.getChannelFuture("localhost", 9999);
for(int i = 0; i < 3; i++){
RequestMessage msg = new RequestMessage(new Random().nextLong(),
"test"+i, new byte[0]);
future.channel().writeAndFlush(msg);
TimeUnit.SECONDS.sleep(2);
}
TimeUnit.SECONDS.sleep(5);
future = client.getChannelFuture("localhost", 9999);
RequestMessage msg = new RequestMessage(new Random().nextLong(),
"test", new byte[0]);
future.channel().writeAndFlush(msg);
}catch(Exception e){
e.printStackTrace();
}finally{
if(null != future){
try {
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if(null != client){
client.release();
}
}
}
}
package com.bjsxt.socket.netty.timer;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
public class Client4TimerHandler extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("from server : ClassName - " + msg.getClass().getName()
+ " ; message : " + msg.toString());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("client exceptionCaught method run...");
cause.printStackTrace();
ctx.close();
}
/**
* 当连接建立成功后,出发的代码逻辑。
* 在一次连接中只运行唯一一次。
* 通常用于实现连接确认和资源初始化的。
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client channel active");
}
}
网友评论