一、Netty编程模板
1、Netty编程步骤:
netty客户端/netty服务器端,代码流程:
1.创建客户端类/服务器端(c/s)类
2.创造c/s构造方法,主要传入参数为地址和端口
3.创建c/s的引导器Bootstrap(ServerBootstrap)
4.创建c/s的生命周期组EventLoopGroup为NioEventLoopGroup()
5.配置装载引导器,把生命周期组EventLoopGroup装载到引导器,配置地址和端口,配置通道类型,配置操作句柄Handler
4.填写Handler,初始化装载channel,并对channel配置一个或多个pipeline,即ChannelInboundHandler(ChannelOutboundHandler),相当于注册相应的逻辑程序(handler)
5.创建ChannelFuture启动//引导启动连接绑定(connect/bind)阻塞
6.阻塞关闭关闭占位符ChannelFuture
7.关闭整个线程组
客户端应用/服务器端应用代码编写流程
编写操作继承ChannelInboundHandler(ChannelOutboundHandler)
客户端重写channelActive,向服务端发送数据,服务端重写channelRead读取客户端的数据并向客户端发送数据
客户端channelRead0读取服务端的数据
重写异常处理
2、Netty编程代码模板:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.6.Final</version>
</dependency>
//构造服务端方法
public class SendServer {
private final int port;
public SendServer(int port) {
this.port=port;
}
//服务器端启动
private void start() throws Exception {
EventLoopGroup group = null;
try{
ServerBootstrap serverBootstrap = new ServerBootstrap();//创建server引导
group = new NioEventLoopGroup();//reactor线程组
serverBootstrap.group(group)
.channel(NioServerSocketChannel.class)//设置channel类型
.localAddress("localhost",port)//绑定端口
//reacor handle
.childHandler(new ChannelInitializer<io.netty.channel.socket.SocketChannel>() {
@Override
protected void initChannel(
io.netty.channel.socket.SocketChannel ch)
throws Exception {
ch.pipeline().addFirst((ChannelHandler) new SendServerForWork());
}
});
ChannelFuture sync = serverBootstrap.bind().sync();//异步绑定
System.out.println("开始监听,地址端口为:" + sync.channel());
sync.channel().closeFuture().sync();
}finally{
group.shutdownGracefully().sync();
}
}
public static void main(String[] args) throws Exception {
System.out.println("server start.......");
new SendServer(20000).start();
}
}
public class SendServerForWork extends ChannelInboundHandlerAdapter{
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
//读取数据
System.out.println("服务器读取数据。。");
ByteBuf buf = (ByteBuf)msg;
byte[] bytes = new byte[buf.readableBytes()];
buf.readBytes(bytes);
String string = new String(bytes, "UTF-8");
System.out.println("读取客户端的数据为:"+ string);
//向客户端发送数据
System.out.println("服务器向客户端发送数据...");
String currenttime = new Date(System.currentTimeMillis()).toString();
ByteBuf copiedBuffer = Unpooled.copiedBuffer(currenttime.getBytes());
ctx.write(copiedBuffer);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("服务器读写数据完毕。。。");
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
System.out.println("服务器异常处理。。。");
ctx.close();
}
}
public class SendClinet {
private final String host;
private final int port;
//构造客户端方法
public SendClinet(String host,int port){
this.host=host;
this.port=port;
}
//客户端启动
public void start() throws InterruptedException {
//创建生命周期组,EventLoopGroup包含一个或多个EventLoop,而EventLoop在一个生命周期内只能绑定一个Thread
//每一个EventLoop的I/O事件都是由这个Thread处理,一个channel在生命周期内只能对应一个EventLoop,
//但一个EventLoop可以被分给一个或多个channel,因此channel和thread是对应的
EventLoopGroup group = null;
try {
Bootstrap bootstrap = new Bootstrap();//创建一个引导启动类
group = new NioEventLoopGroup();
bootstrap.group(group)//把事件生命周期组EventLoopGroup注册引导启动类中去启动
.channel(NioSocketChannel.class)//注册channel类型为NioSocketChannel,这个类型还有NioSctpChannel,NioDatagramChannel,LocalServerChannel,EmbeddedChannel
.remoteAddress(new InetSocketAddress(host, port))//注册连接的服务器地址端口
//注册事件操作句柄,使用childHandler时候不可以,所以只能用handler代替了
.handler(new ChannelInitializer<io.netty.channel.socket.SocketChannel>() {
@Override
protected void initChannel(//初始化通道
io.netty.channel.socket.SocketChannel ch)
throws Exception {
//一个SocketChannel可以添加多个ChannelHandler,可以多加addLast,
//这个ChannelHandler,有两种In和Out即ChannelInboundHandler,ChannelOutboundHandler
//pipeline 在处理In和Out顺序是,in是从头部开始,out是尾部开始,例如 in1,out1,out2,in2,运行结果是in1->in2,out2->out1
//ChannelInboundHandler之间的传递,通过调用 ctx.fireChannelRead(msg) 实现;调用ctx.write(msg) 将传递到ChannelOutboundHandler,
//ctx.write()方法执行后,需要调用flush()方法才能令它立即执行,
//pipeline中outhandler不能放在最后,否则不生效
ch.pipeline().addLast((ChannelHandler) new SendClientForWork());
}
});
// 最后绑定服务器等待直到绑定完成,调用sync()方法会阻塞直到服务器完成绑定,然后服务器等待通道关闭,因为使用sync(),所以关闭操作也会被阻塞。
ChannelFuture sync=bootstrap.connect().sync();//引导启动连接,ChannelFuture为将要执行操作的占位符
//sync.channel().close().sync();
sync.channel().closeFuture().sync();//关闭占位符,而不关闭整个通道,close是关闭整个客户端
}finally {
group.shutdownGracefully().sync();//关闭整个线程组
}
}
public static void main(String[] args) throws Exception {
System.out.println("client start....");
new SendClinet("localhost",20000).start();//指定连接服务器的地址和端口
}
}
//继承ChannelInboundHandler
public class SendClientForWork extends SimpleChannelInboundHandler<ByteBuf>{
// 客户端连接服务器后被调用,并向服务器发送数据
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("客户端连接服务器,并开始发送数据。。。");
byte[] req = "查询时间".getBytes();//查询命令设置,并序列化
ByteBuf buffer = Unpooled.buffer(req.length);
buffer.writeBytes(req);
ctx.writeAndFlush(buffer);//发送数据
};
// 从服务器接收到数据后调用,处理收到的数据逻辑
@Override
protected void channelRead0(ChannelHandlerContext arg0, ByteBuf arg1)
throws Exception {
System.out.println("客户端读取服务端的数据...");
ByteBuf msg = arg1;
byte[] bytes=new byte[msg.readableBytes()];
msg.readBytes(bytes);//将msg消息读取到bytes中,并反序列化收到的消息
String string = new String(bytes,"UTF-8");
System.out.println("读取的服务端的数据为:" + string);
}
// 当连接发生异常时被调用
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
System.out.println("客户端异常处理。。。");
ctx.close();//关闭上下文,释放资源
}
}
网友评论