粘包示例
服务端
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.nio.charset.Charset;
public class ServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
private int count; // 计数器
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
// 接收消息
byte[] bytes = new byte[msg.readableBytes()];
msg.readBytes(bytes);
System.out.println("服务端收到: " + new String(bytes, Charset.forName("UTF-8")));
System.out.println("计数:" + (++count));
// 发送消息
ctx.writeAndFlush(Unpooled.copiedBuffer("abc", Charset.forName("UTF-8")));
}
}
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
public class Server {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup parentGroup = new NioEventLoopGroup(1);
EventLoopGroup childGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(parentGroup, childGroup).channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ServerHandler());
}
});
ChannelFuture channelFuture = bootstrap.bind(8899).sync();
channelFuture.channel().closeFuture().sync();
} finally {
parentGroup.shutdownGracefully();
childGroup.shutdownGracefully();
}
}
}
客户端
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.nio.charset.Charset;
public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
private int count; // 计数器
// 发送消息
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for (int i = 0; i < 10; i++) {
ctx.writeAndFlush(Unpooled.copiedBuffer("ABC", Charset.forName("UTF-8")));
}
}
// 接收消息
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
byte[] bytes = new byte[msg.readableBytes()];
msg.readBytes(bytes);
System.out.println("客户端收到: " + new String(bytes, Charset.forName("UTF-8")));
System.out.println("计数:" + (++count));
}
}
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class Client {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect("localhost", 8899).sync();
channelFuture.channel().closeFuture().sync();
} finally {
eventLoopGroup.shutdownGracefully();
}
}
}
自定义协议处理粘包问题
/**
* Netty所谓的自定义协议,其实就是规定好输入输出的格式
*/
public class Protocol {
private int length;
private byte[] content;
public int getLength() {
return length;
}
public void setLength(int length) {
this.length = length;
}
public byte[] getContent() {
return content;
}
public void setContent(byte[] content) {
this.content = content;
}
}
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.nio.charset.Charset;
public class ClientHandler extends SimpleChannelInboundHandler<Protocol> {
private int count; // 计数器
// 发送消息
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for (int i = 0; i < 10; i++) {
// 每次发送的消息都是按照自定义协议的格式
Protocol protocol = new Protocol();
protocol.setLength("ABC".getBytes().length);
protocol.setContent("ABC".getBytes());
ctx.writeAndFlush(protocol);
}
}
// 接收消息
@Override
protected void channelRead0(ChannelHandlerContext ctx, Protocol msg) throws Exception {
System.out.println("客户端收到: length - " + msg.getLength() + ", content - " + new String(msg.getContent(), Charset.forName("UTF-8")));
System.out.println("计数:" + (++count));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
/**
* 出站编码器
*/
public class Encoder extends MessageToByteEncoder<Protocol> {
@Override
protected void encode(ChannelHandlerContext ctx, Protocol msg, ByteBuf out) throws Exception {
// 每次写出站的都是一个自定义Protocol
// 按照一个Length,一个Content的格式出站
out.writeInt(msg.getLength());
out.writeBytes(msg.getContent());
}
}
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
import java.util.List;
/**
* 入站解码器
*/
public class Decoder extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
/*从出站解码器接收数据,出站时数据格式是:
一个Length表示接下来数据的长度,
一个Content是接下来的数据
+--------+----------------+
| Length | Actual Content |
| 0x000E | "HELLO, WORLD" |
+--------+----------------+
*/
// 接收数据,解码
int length = in.readInt();
byte[] content = new byte[length];
in.readBytes(content);
// 封装成自定义协议的格式,转发给下一个Handler
Protocol protocol = new Protocol();
protocol.setLength(length);
protocol.setContent(content);
out.add(protocol);
}
}
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.nio.charset.Charset;
public class ServerHandler extends SimpleChannelInboundHandler<Protocol> {
private int count; // 计数器
@Override
protected void channelRead0(ChannelHandlerContext ctx, Protocol msg) throws Exception {
// 接收消息
System.out.println("服务端收到: length - " + msg.getLength() + ", content - " + new String(msg.getContent(), Charset.forName("UTF-8")));
System.out.println("计数:" + (++count));
// 发送消息
Protocol protocol = new Protocol();
protocol.setLength("abc".getBytes().length);
protocol.setContent("abc".getBytes());
ctx.writeAndFlush(protocol);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
public class Server {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup parentGroup = new NioEventLoopGroup(1);
EventLoopGroup childGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(parentGroup, childGroup).channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new Decoder());
pipeline.addLast(new Encoder());
pipeline.addLast(new ServerHandler());
}
});
ChannelFuture channelFuture = bootstrap.bind(8899).sync();
channelFuture.channel().closeFuture().sync();
} finally {
parentGroup.shutdownGracefully();
childGroup.shutdownGracefully();
}
}
}
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class Client {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new Decoder());
pipeline.addLast(new Encoder());
pipeline.addLast(new ClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect("localhost", 8899).sync();
channelFuture.channel().closeFuture().sync();
} finally {
eventLoopGroup.shutdownGracefully();
}
}
}
网友评论