客户端代码:
public class NettyClient {
public static void main(String[] args) {
Bootstrap bootstrap = new Bootstrap();
NioEventLoopGroup group = new NioEventLoopGroup();
bootstrap.group(group)
.option(ChannelOption.TCP_NODELAY, true)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new XXCodec());
ch.pipeline().addLast(new FirstClientHandler());
}
});
String inetHost = "127.0.0.1";
bootstrap.connect(inetHost, 8000)
.addListener(future -> {
if (future.isSuccess()) {
System.out.println("客户端连接成功!");
Channel channel = ((ChannelFuture) future).channel();
startThread(channel);
} else {
System.out.println("客户端连接失败!");
}
})
.channel();
}
private static ExecutorService exec = Executors.newFixedThreadPool(5);
private static void startThread(Channel channel) {
exec.submit(() -> {
while (true) {
// 1. 获取数据
ByteBuf buffer = Unpooled.buffer()
.writeBytes("你好!".getBytes(Charset.forName("utf-8")));
// 2. 写数据
ChannelFuture channelFuture = channel.writeAndFlush(buffer);
channelFuture.sync();
}
});
}
}
编解码工具(客户端和服务端是一样的,需要注意的是修改decode的读取字串长度):
public class XXCodec extends ByteToMessageCodec<String> {
@Override
protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception {
byte[] bytes = msg.getBytes("UTF-8");
out.writeInt(bytes.length);
out.writeBytes(bytes);
//在写出字节流的末尾增加\n表示数据结束
out.writeBytes(new byte[]{'\n'});
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
ByteBuf byteBuf = in.readBytes(18);
if (in.readableBytes() >= 18) {
String msg = byteBuf.toString(Charset.forName("utf-8"));
out.add(msg);
}
if(byteBuf.refCnt()>0)
byteBuf.release();
}
}
@ChannelHandler.Sharable
public class FirstClientHandler extends ChannelInboundHandlerAdapter {
private static Logger logger = LogManager.getLogger(FirstClientHandler.class);
private static ScheduledExecutorService scheduled
= Executors.newScheduledThreadPool(1);
private static volatile AtomicInteger num = new AtomicInteger(0);
public FirstClientHandler() throws Exception {
scheduled.scheduleAtFixedRate(() ->
loggerInfo(), 0, 1, TimeUnit.SECONDS);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
num.incrementAndGet();
}
public void loggerInfo() {
logger.info("当前执行了{}次", num.intValue());
num.set(0);
}
}
在服务器端:
public class FirstServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 回复数据到客户端
String str ="ABCD";//这部分调用生成数据的方法
ByteBuf out = ctx.alloc().buffer()
.writeBytes(str.getBytes(Charset.forName("utf-8")));
ctx.channel().writeAndFlush(out);
}
}
网友评论