代码很简单,仅是个应用示范,无实际意义。客户端发送“长度+字节数组”形式的消息,服务器收到直接显示。客户端发送时,自动调用编码器编码信息;服务器收到消息,自动调用解码器,完整准确显示收到信息。
一、客户端程序
(一)编码器LenStringEncoder
package com.wallimn.iteye.netty.lenstr;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
/**
* 将传入的字符串以长度+字节数组的形式编码。
* 客户端发送信息时,直接发送字符串类型数据,netty的handler调用这个编码器进行编码。
*
* <br>
* <br>时间:2019年9月11日 下午10:53:42,作者:wallimn
*/
public class LenStringEncoder extends MessageToByteEncoder<String> {
@Override
protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception {
if(msg!=null){
byte[] bs = msg.getBytes();
out.writeInt(bs.length);
out.writeBytes(bs);
}
}
}
(二)处理器ClientHandler
package com.wallimn.iteye.netty.lenstr;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
* 客户端消息处理器
* 监听连接Active事件,事件中发送一些测试信息。
*
* <br>
* <br>时间:2019年9月11日 下午10:58:38,作者:wallimn
*/
public class ClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
String shortMsg = "hello netty, from wallimn.";
for(int i=0; i<50;i++){
ctx.writeAndFlush(shortMsg + " No."+i);
}
String longMsg = "仅仅问候一下,最近挺好的吧,工作忙吗? from wallimn.";
for(int i=0; i<50;i++){
ctx.writeAndFlush(longMsg + " No."+i);
}
System.out.println("消息发送完毕!");
ctx.close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
(三)客户端Client
package com.wallimn.iteye.netty.lenstr;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* 启动命令:java -classpath .;netty-all-4.1.38.Final.jar com.wallimn.iteye.netty.lenstr.Client
* 客户端,仅启动ClientHandler发送些信息,然后退出。
* <br>
* <br>时间:2019年9月11日 下午11:40:51,作者:wallimn
*/
public class Client {
public static void main(String[] args) {
EventLoopGroup group = new NioEventLoopGroup();
//未使用函数链式操作,看起来容易懂一点儿。
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group);
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_SNDBUF, 10);//发送缓冲区长度
bootstrap.option(ChannelOption.TCP_NODELAY, true);
//bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
//bootstrap.handler(new LoggingHandler(LogLevel.INFO));
bootstrap.handler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LenStringEncoder());
ch.pipeline().addLast(new ClientHandler());
}
});
try {
ChannelFuture future = bootstrap.connect("localhost",8585).sync();//连接服务器
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
finally{
group.shutdownGracefully();
}
}
}
二、服务器端
(一)解码器LenStringDecoder
package com.wallimn.iteye.netty.lenstr;
import java.util.List;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
/**
* 解码器,用于将收到的长度+字节数组的消息解决成字符器。
* 使用长度标识信息,可使信息免受粘包、拆包影响
*
* <br>
* <br>时间:2019年9月12日 下午1:33:37,作者:wallimn
*/
public class LenStringDecoder extends ByteToMessageDecoder {
private final static int HEAD_LENGTH=4;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if(in.readableBytes()<HEAD_LENGTH){
return;
}
in.markReaderIndex();//标记一下读索引位置,信息不完整时恢复。
int msglen = in.readInt();
if(msglen==0){
return;
}
else if(in.readableBytes()<msglen){
in.resetReaderIndex();
return;
}
byte[] msg = new byte[msglen];
in.readBytes(msg);
out.add(new String(msg,0,msg.length));
}
}
(二)处理器ServerHandler
package com.wallimn.iteye.netty.lenstr;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
/**
* 服务器消息处理器
* 显示解码器解完的消息。
*
* <br>
* <br>时间:2019年9月12日 下午1:44:54,作者:wallimn
*/
@Sharable
public class ServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String _msg = (String)msg;
System.out.println("收到消息:"+_msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
(三)服务器Server
package com.wallimn.iteye.netty.lenstr;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* 启动命令:java -classpath .;netty-all-4.1.38.Final.jar com.wallimn.iteye.netty.lenstr.Server
* 通信服务器的程序
* <br>
* <br>时间:2019年9月11日 下午11:40:18,作者:wallimn
*/
public class Server {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();//综合管理相关的组件、操纵组件
bootstrap.group(boss,worker);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.childHandler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LenStringDecoder());
ch.pipeline().addLast(new ServerHandler());
}
});
try {
ChannelFuture future = bootstrap.bind(8585).sync();//绑定监听端口,成功后就可以响应客户端请求
System.out.println("服务启动成功!");
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
finally{
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
网友评论