- 代码来源于netty官方样例
netty响应式编程模型 Reactor pattern
图片.png服务端代码
这个样例的关键在于服务端,服务端需要建立两个线程组bossGroup
和workerGroup
分别用于处理连接请求和真正的客户端业务,对应的模型如上图
服务端的关键代码分别是
1. ServerBootstrap bootstrap = new ServerBootstrap();
创建服务器端的启动对象
2. bootstrap.group(bossGroup, workerGroup) ...
链式的配置参数
3. ChannelFuture f = bootstrap.bind(PORT).sync();
绑定端口,启动
服务端需要一个ChannelHandler
来专注于业务逻辑的实现,只需要继承官方提供的标准Handler
,重写相应的方法,实现具体的逻辑即可。
此处修改官方样例,在与客户端连接成功时打印一条消息,在收到消息时将消息打印出来,并发送一条确认消息给客户端
package example.echo;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.SelfSignedCertificate;
/**
* @author netty
*/
public final class EchoServer {
static final boolean SSL = System.getProperty("ssl") != null;
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception {
// Configure SSL.
final SslContext sslCtx;
if (SSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
} else {
sslCtx = null;
}
// Configure the server.
// 创建两个线程组bossGroup和workerGroup,bossGroup只是处理连接请求,真正和客户端业务处理的是workerGroup
// NioEventLoopGroup的子线程数默认是CPU核数的2倍
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
// 关键代码 1:创建服务器端的启动对象
ServerBootstrap bootstrap = new ServerBootstrap();
// 关键代码 2:配置参数,链式
// 设置两个线程组
bootstrap.group(bossGroup, workerGroup)
// 使用NioServerSocketChannel作为服务器的通道实现
.channel(NioServerSocketChannel.class)
// 初始化服务器连接队列的大小,服务器端处理客户端的连接请求是顺序处理的,所以同一时间只能处理一个客户端连接
// 多个客户端同时来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
// 对workerGroup的SocketChannel绑定处理器
p.addLast(serverHandler);
}
});
// Start the server.
// 关键代码 3:绑定端口,启动
ChannelFuture f = bootstrap.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
package example.echo;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/**
* @author netty
*/
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
/**
* 客户端连接服务器就会触发该方法
*
* @param ctx 上下文,内含通道channel,管道pipeline
* @throws Exception 异常
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// super.channelActive(ctx);
Channel channel = ctx.channel();
System.out.println("客户端连接通道建立完成");
}
/**
* 读取客户端发送的数据
*
* @param ctx 上下文
* @param msg 客户端发送的数据
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// ctx.write(msg);
ByteBuf buf = (ByteBuf) msg;
String msgStr = buf.toString(CharsetUtil.UTF_8);
Channel channel = ctx.channel();
System.out.println("收到客户端发送的消息---->" + msgStr);
channel.writeAndFlush(Unpooled.copiedBuffer("------>服务端回复,已经收到消息:" + msgStr, CharsetUtil.UTF_8));
}
/**
* 数据读取完毕处理方法
*
* @param ctx 上下文
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
System.out.println("消息读取完毕........V");
ctx.flush();
}
/**
* 引发异常时处理
*
* @param ctx 上下文
* @param cause 错误
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
System.out.println("异常发生........X");
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
客户端
客户端写法和服务端差不多,不同的是启动的时候使用的是connect ->ChannelFuture channelFuture = b.connect(HOST, PORT).sync();
在连接后写一个Scanner
扫描控制台输入,来给客户端发消息
调整客户端的Handler,在channelActive
中给服务端发送一条连接成功的消息,在channelRead
中将收到的消息打印出来,并注释原本代码,免得形成了客户端发送服务端回复的死循环
package example.echo;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.util.CharsetUtil;
import java.util.Scanner;
/**
* @author netty
*/
public final class EchoClient {
static final boolean SSL = System.getProperty("ssl") != null;
static final String HOST = System.getProperty("host", "127.0.0.1");
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));
public static void main(String[] args) throws Exception {
// Configure SSL.git
final SslContext sslCtx;
if (SSL) {
sslCtx = SslContextBuilder.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE).build();
} else {
sslCtx = null;
}
// Configure the client.
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(new EchoClientHandler());
}
});
// Start the client.
ChannelFuture channelFuture = b.connect(HOST, PORT).sync();
// 客户端连接后会返回一个channel,可以用此与服务端通信
Channel channel = channelFuture.channel();
System.out.println("=================" + channel.localAddress() + "===============");
// 扫描控制台输入
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String msg = scanner.nextLine();
channel.writeAndFlush(Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8));
}
// Wait until the connection is closed.
channelFuture.channel().closeFuture().sync();
} finally {
// Shut down the event loop to terminate all threads.
group.shutdownGracefully();
}
}
}
package example.echo;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
public class EchoClientHandler extends ChannelInboundHandlerAdapter {
// private final ByteBuf firstMessage;
// /**
// * Creates a client-side handler.
// */
// public EchoClientHandler() {
// firstMessage = Unpooled.buffer(EchoClient.SIZE);
// for (int i = 0; i < firstMessage.capacity(); i++) {
// firstMessage.writeByte((byte) i);
// }
// }
@Override
public void channelActive(ChannelHandlerContext ctx) {
// 连接成功后给客户端发送一条消息
ByteBuf firstMessage = Unpooled.copiedBuffer("客户端连接成功发送的第一条消息......OK", CharsetUtil.UTF_8);
ctx.writeAndFlush(firstMessage);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf bf = (ByteBuf) msg;
System.out.println(bf.toString(CharsetUtil.UTF_8));
// ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
实现效果
- 客户端
=================/127.0.0.1:54400===============
------>服务端回复,已经收到消息:客户端连接成功发送的第一条消息......OK
hello
------>服务端回复,已经收到消息:hello
你好
------>服务端回复,已经收到消息:你好
- 服务端
客户端连接通道建立完成
收到客户端发送的消息---->客户端连接成功发送的第一条消息......OK
消息读取完毕........V
收到客户端发送的消息---->hello
消息读取完毕........V
收到客户端发送的消息---->你好
消息读取完毕........V
参考学习: https://www.bilibili.com/video/BV1fA41157Ht?share_source=copy_web
netty官方文档:https://netty.io/wiki/index.html
netty官方样例:https://netty.io/4.1/xref/io/netty/example/echo/package-summary.html
网友评论