1. DelimiterBasedFrameDecoder使用特殊字符作为分割,如果使用的话,注意特殊字符不能在真正要传输的内容中出现,
客户端
public class NettyClient {
public static void main(String[] args) throws Exception {
//客户端需要一个事件循环组
EventLoopGroup group = new NioEventLoopGroup ();
try {
//创建客户端启动对象
//注意客户端使用的不是 ServerBootstrap 而是 Bootstrap
Bootstrap bootstrap = new Bootstrap ();
//设置相关参数
bootstrap.group (group) //设置线程组
.channel (NioSocketChannel.class) // 使用 NioSocketChannel 作为客户端的通道实现
.handler (new ChannelInitializer<SocketChannel> () {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
//字符串编码器
//channel.pipeline ().addLast (new StringEncoder ());
//long编码器
// channel.pipeline ().addLast (new OutEncoder ());
ByteBuf delimiter = Unpooled.copiedBuffer ("$_$".getBytes ());
channel.pipeline ().addLast (new DelimiterBasedFrameDecoder (1024, delimiter));
//加入处理器
channel.pipeline ().addLast (new NettyClientHandler ());
}
});
System.out.println ("netty client start");
//启动客户端去连接服务器端
ChannelFuture channelFuture = bootstrap.connect ("127.0.0.1", 9000).sync ();
//对关闭通道进行监听
channelFuture.channel ().closeFuture ().sync ();
} finally {
group.shutdownGracefully ();
}
}
}
客户端的handler
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
/**
* 当客户端连接服务器完成就会触发该方法
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println ("发送消息");
for (int i = 0; i < 2; i++) {
//拆包用分隔符
ByteBuf buf = Unpooled.copiedBuffer ("HelloServer$_$", CharsetUtil.UTF_8);
ctx.writeAndFlush (buf);
}
ctx.fireChannelActive ();
}
//当通道有读取事件时会触发,即服务端发送数据给客户端
//msg就额是接受的消息
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = ( ByteBuf ) msg;
//因为已经使用了StringDecoder传过来的已经string类型
System.out.println ("收到服务端的消息:" + buf);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace ();
ctx.close ();
}
}
服务端
public static void main(String[] args) throws Exception {
//创建两个线程组bossGroup和workerGroup,
//NioEventLoop的个数默认为cpu核数的两倍
// bossGroup只是处理连接请求 ,真正的和客户端业务处理,workerGroup完成
EventLoopGroup bossGroup = new NioEventLoopGroup (1);
EventLoopGroup workerGroup = new NioEventLoopGroup (8);
try {
//创建服务器端的引导对象
ServerBootstrap bootstrap = new ServerBootstrap ();
//将两个线程组放进引导对象
bootstrap.group (bossGroup, workerGroup)
//NioServerSocketChannel服务端channel
.channel (NioServerSocketChannel.class)
// 初始化服务器连接队列大小,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。
// 多个客户端同时来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理
.option (ChannelOption.SO_BACKLOG, 1024)
.childHandler (new ChannelInitializer<SocketChannel> () {//创建通道初始化对象,设置初始化参数
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//每一个链接过来(每一次创建channel)都会执行这个方法
System.out.println ("初始化pipeline");
//编码,将String转码为ByteBuf
ch.pipeline ().addLast ("encode", new StringEncoder ());
//用分割符处理粘包问题,分隔符可以是多个,
// 1024代表1024个字节内还没有找到分隔符抛出异常,TooLongFrameException,
//如果后续的Handler重写了exceptionCaught方法就会调用exceptionCaught方法
//DelimiterBasedFrameDecoder分割是转义成ByteBuf才分割的,所以如果添加了StringDecoder的话,
// 要把StringDecoder放在DelimiterBasedFrameDecoder的后面
ByteBuf delimiter = Unpooled.copiedBuffer ("$_$".getBytes ());
ch.pipeline ().addLast (new DelimiterBasedFrameDecoder (1024, delimiter));
//解码,将ByteBuf解码为String
ch.pipeline ().addLast ("decode", new StringDecoder ());
//handler一般都是放在最后面,建议一个就可以,如果业务复杂拆分多个,那在handler中调用fireXXX才会向下一个handler继续调用
ch.pipeline ().addLast (new NettyServerHandler ());
}
});
System.out.println ("netty server start。。");
//绑定一个端口并且同步, 生成了一个ChannelFuture异步对象,通过isDone()等方法可以判断异步事件的执行情况
//启动服务器(并绑定端口),bind是异步操作,sync方法是等待异步操作执行完毕
//我们可以去掉sync,添加事件监听,如果链接成功失败相对应的处理
ChannelFuture cf = bootstrap.bind (9000);
//给cf注册监听器,监听我们关心的事件
cf.addListener (new ChannelFutureListener () {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (cf.isSuccess ()) {
//这里只是简单的打印,
System.out.println ("监听端口9000成功");
} else {
System.out.println ("监听端口9000失败");
}
}
});
//对通道关闭进行监听,closeFuture是异步操作,监听通道关闭
// 通过sync方法同步等待通道关闭处理完毕,这里会阻塞等待通道关闭完成
cf.channel ().closeFuture ().sync ();
} finally {
bossGroup.shutdownGracefully ();
workerGroup.shutdownGracefully ();
}
}
}
服务端的handler
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println ("channelActive打印" + Thread.currentThread ().getName () + ctx.channel ().remoteAddress ());
//如果此handler后续还有handler的话,只有调用了fireXXX才能向下继续调用
ctx.fireChannelActive ();
}
/**
* 读取客户端发送的数据
*
* @param ctx 上下文对象, 含有通道channel,管道pipeline
* @param msg 就是客户端发送的数据
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//使用了Stingdecoder所以这里接受到消息就是string类型的,不需要再次抓交换
System.out.println ("客户端发送消息是:" +msg);
//这是返回给客户端消息,如果使用StringEncoder,直接发String类型就可以,如果没有使用就先转为ByteBuf
ByteBuf buf1 = Unpooled.copiedBuffer ("HelloClient$_$", CharsetUtil.UTF_8);
Channel channel = ctx.channel ();
channel.writeAndFlush (buf1);
}
/**
* 数据读取完毕处理方法,这个方法个人认为不太好用,
*
* @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
* System.out.println ("服务端接受消息结束");
* ByteBuf buf = Unpooled.copiedBuffer ("HelloClient$_$", CharsetUtil.UTF_8);
* ctx.writeAndFlush (buf);
* }
*/
@Override
//处理异常, 一般是需要关闭通道
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println (cause.getMessage ());
ctx.close ();
}
}
2. FixedLengthFrameDecoder定长,这个也不常用
//每一个链接过来(每一次创建channel)都会执行这个方法
System.out.println ("初始化pipeline");
//编码,将String转码为ByteBuf
ch.pipeline ().addLast ("encode", new StringEncoder ());
//固定接受10个字节,如果超过这个字节数,超出的部分存在tcp的缓存中,等待下一次传输,超出的部分不会丢弃掉
ch.pipeline ().addLast (new FixedLengthFrameDecoder (10));
//解码,将ByteBuf解码为String
ch.pipeline ().addLast ("decode", new StringDecoder ());
//handler一般都是放在最后面,建议一个就可以,如果业务复杂拆分多个,那在handler中调用fireXXX才会向下一个handler继续调用
ch.pipeline ().addLast (new NettyServerHandler ());
客户端发送的消息为HelloServer11个字节,可以看到超出的r会放到下一次的消息中,
一个汉字为3个字节,所以很难在实际项目中使用
客户端发送消息是:HelloServe
客户端发送消息是:rHelloServ
3. LineBasedFrameDecoder 换行符在发送消息加上换行符 \n
System.out.println ("初始化pipeline");
//编码,将String转码为ByteBuf
ch.pipeline ().addLast ("encode", new StringEncoder ());
//如果字节超过1024还没有找到换行符则抛出异常
ch.pipeline ().addLast (new LineBasedFrameDecoder (1024));
//解码,将ByteBuf解码为String
ch.pipeline ().addLast ("decode", new StringDecoder ());
//handler一般都是放在最后面,建议一个就可以,如果业务复杂拆分多个,那在handler中调用fireXXX才会向下一个handler继续调用
ch.pipeline ().addLast (new NettyServerHandler ());
4. LengthFieldBasedFrameDecoder和LengthFieldPrepender
LengthFieldPrepender编码器,将发送消息的前面加上请求体的字节长度
LengthFieldBasedFrameDecoder获取请求头的长度,根据长度获取请求体的信息
个人认为这个比较常用
客户端
public class NettyClient {
public static void main(String[] args) throws Exception {
//客户端需要一个事件循环组
EventLoopGroup group = new NioEventLoopGroup ();
try {
//创建客户端启动对象
//注意客户端使用的不是 ServerBootstrap 而是 Bootstrap
Bootstrap bootstrap = new Bootstrap ();
//设置相关参数
bootstrap.group (group) //设置线程组
.channel (NioSocketChannel.class) // 使用 NioSocketChannel 作为客户端的通道实现
.handler (new ChannelInitializer<SocketChannel> () {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
//规定标记消息提长度所占字节数
channel.pipeline().addLast(new LengthFieldPrepender (2));
channel.pipeline ().addLast (new NettyClientHandler ());
}
});
System.out.println ("netty client start");
//启动客户端去连接服务器端
ChannelFuture channelFuture = bootstrap.connect ("127.0.0.1", 9000).sync ();
//对关闭通道进行监听
channelFuture.channel ().closeFuture ().sync ();
} finally {
group.shutdownGracefully ();
}
}
}
客户端handler
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
/**
* 当客户端连接服务器完成就会触发该方法
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println ("发送消息");
for (int i = 0; i < 2; i++) {
//拆包用分隔符
ByteBuf buf = Unpooled.copiedBuffer ("HelloServer"+i, CharsetUtil.UTF_8);
ctx.writeAndFlush (buf);
}
ctx.fireChannelActive ();
}
//当通道有读取事件时会触发,即服务端发送数据给客户端
//msg就额是接受的消息
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = ( ByteBuf ) msg;
//因为已经使用了StringDecoder传过来的已经string类型
System.out.println ("收到服务端的消息:" + buf);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace ();
ctx.close ();
}
}
服务端
public class NettyServer {
public static void main(String[] args) throws Exception {
//创建两个线程组bossGroup和workerGroup,
//NioEventLoop的个数默认为cpu核数的两倍
// bossGroup只是处理连接请求 ,真正的和客户端业务处理,workerGroup完成
EventLoopGroup bossGroup = new NioEventLoopGroup (1);
EventLoopGroup workerGroup = new NioEventLoopGroup (8);
try {
//创建服务器端的引导对象
ServerBootstrap bootstrap = new ServerBootstrap ();
//将两个线程组放进引导对象
bootstrap.group (bossGroup, workerGroup)
//NioServerSocketChannel服务端channel
.channel (NioServerSocketChannel.class)
// 初始化服务器连接队列大小,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。
// 多个客户端同时来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理
.option (ChannelOption.SO_BACKLOG, 1024)
.childHandler (new ChannelInitializer<SocketChannel> () {//创建通道初始化对象,设置初始化参数
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//每一个链接过来(每一次创建channel)都会执行这个方法
System.out.println ("初始化pipeline");
// 这里将LengthFieldBasedFrameDecoder添加到pipeline的首位,因为其需要对接收到的数据
// 进行长度字段解码,这里也会对数据进行粘包和拆包处理
//maxFrameLength:指定了每个包所能传递的最大数据包大小;
//lengthFieldOffset:指定了长度字段在字节码中的偏移量;
//lengthFieldLength:指定了长度字段所占用的字节长度;
//lengthAdjustment:对一些不仅包含有消息头和消息体的数据进行消息头的长度的调整,这样就可以只得到消息体的数据,这里的lengthAdjustment指定的就是消息头的长度;
//initialBytesToStrip:对于长度字段在消息头中间的情况,可以通过initialBytesToStrip忽略掉消息头以及长度字段占用的字节。
//1024最大数据包长度,包括长度所占的字节数
//0,因为第一字符开始就是长度
//2消息体长度所占字节数
//0因为第一个字符就是长度
//2去掉长度所占字节数,获取剩下的消息体
//注2就是下面LengthFieldPrepender中的规定长度所占的字节数
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder (1024, 0, 2, 0, 2));
// LengthFieldPrepender是一个编码器,主要是在响应字节数据前面添加字节长度字段
ch.pipeline().addLast(new LengthFieldPrepender (2));
//handler一般都是放在最后面,建议一个就可以,如果业务复杂拆分多个,那在handler中调用fireXXX才会向下一个handler继续调用
//StringDecoder要放到LengthFieldBasedFrameDecoder后面,对得到消息体的ByteBuf转码为String类型,个人认为这个比较常用
ch.pipeline ().addLast ("encode", new StringDecoder ());
ch.pipeline ().addLast (new NettyServerHandler ());
}
});
System.out.println ("netty server start。。");
//绑定一个端口并且同步, 生成了一个ChannelFuture异步对象,通过isDone()等方法可以判断异步事件的执行情况
//启动服务器(并绑定端口),bind是异步操作,sync方法是等待异步操作执行完毕
//我们可以去掉sync,添加事件监听,如果链接成功失败相对应的处理
ChannelFuture cf = bootstrap.bind (9000);
//给cf注册监听器,监听我们关心的事件
cf.addListener (new ChannelFutureListener () {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (cf.isSuccess ()) {
//这里只是简单的打印,
System.out.println ("监听端口9000成功");
} else {
System.out.println ("监听端口9000失败");
}
}
});
//对通道关闭进行监听,closeFuture是异步操作,监听通道关闭
// 通过sync方法同步等待通道关闭处理完毕,这里会阻塞等待通道关闭完成
cf.channel ().closeFuture ().sync ();
} finally {
bossGroup.shutdownGracefully ();
workerGroup.shutdownGracefully ();
}
}
}
服务端handler
/**
* 继承了ChannelInboundHandlerAdapter 属于Inbound,输入的处理器
*/
@ChannelHandler.Sharable
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println ("channelActive打印" + Thread.currentThread ().getName () + ctx.channel ().remoteAddress ());
//如果此handler后续还有handler的话,只有调用了fireXXX才能向下继续调用
ctx.fireChannelActive ();
}
/**
* 读取客户端发送的数据
*
* @param ctx 上下文对象, 含有通道channel,管道pipeline
* @param msg 就是客户端发送的数据
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//使用了Stingdecoder所以这里接受到消息就是string类型的,不需要再次抓交换
System.out.println ("客户端发送消息是:" +msg);
}
/**
* 数据读取完毕处理方法,这个方法个人认为不太好用,
*
* @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
* System.out.println ("服务端接受消息结束");
* ByteBuf buf = Unpooled.copiedBuffer ("HelloClient$_$", CharsetUtil.UTF_8);
* ctx.writeAndFlush (buf);
* }
*/
@Override
//处理异常, 一般是需要关闭通道
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println (cause.getMessage ());
ctx.close ();
}
}
网友评论