Netty是互联网中间件领域使用最广泛最核心的网络通信框架,几乎所有互联网中间件或者大数据领域均离不开Netty。
参考https://www.jianshu.com/p/7522bda72a25
IO模型
OIO方式
下面是通过OIO(old IO)实现的Server-Client方式,可以看到效率低下
public class IOServer {
/**
* Server服务端首先创建ServerSocket监听8000端口,然后创建线程不断调用阻塞方法 serversocket.accept()获取新的连接,当获取到新的连接给每条连接创建新的线程负责从该连接中读取数据,然后读取数据是以字节流的方式
*
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket(8000);
//接收新连接线程
new Thread(() -> {
try {
//(1)阻塞方法获取新的连接
Socket socket = serverSocket.accept();
//(2)每一个新的连接都创建一个线程,负责读取数据
new Thread(() -> {
try {
byte[] data = new byte[1024];
InputStream inputStream = socket.getInputStream();
while (true) {
int len;
//(3)按照字节流方式读取数据
while ((len = inputStream.read(data)) != -1)
System.out.println(new String(data, 0, len));
}
} catch (IOException e) {
e.printStackTrace();
}
}).start();
} catch (IOException e) {
e.printStackTrace();
}
}).start();
}
}
public class IOClient {
/**
* Client客户端连接服务端8000端口每隔2秒向服务端写带有时间戳的 "hello world"
*
* @param args
*/
public static void main(String[] args) {
new Thread(() -> {
try {
Socket socket = new Socket("127.0.0.1", 8000);
while (true) {
try {
socket.getOutputStream().write((new Date() + ": hello world").getBytes());
socket.getOutputStream().flush();
Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}).start();
}
}
传统的IO模型每个连接创建成功都需要一个线程来维护,每个线程包含一个while死循环,那么1w个连接对应1w个线程,继而1w个while死循环带来如下几个问题:
- 线程资源受限:线程是操作系统中非常宝贵的资源,同一时刻有大量的线程处于阻塞状态是非常严重的资源浪费,操作系统耗不起;
- 线程切换效率低下:单机CPU核数固定,线程爆炸之后操作系统频繁进行线程切换,应用性能急剧下降;
- 数据读写是以字节流为单位效率不高:每次都是从操作系统底层一个字节一个字节地读取数据.
NIO方式
-
线程资源受限:NIO编程模型新来一个连接不再创建一个新的线程,把这条连接直接绑定到某个固定的线程,然后这条连接所有的读写都由该线程来负责.把这么多while死循环变成一个死循环,这个死循环由一个线程控制,一条连接来了,不创建一个while死循环去监听是否有数据可读,直接把这条连接注册到Selector上,然后通过检查Selector批量监测出有数据可读的连接进而读取数据.
io模型.png
nio模型.png - 线程切换效率低下:线程数量大大降低,线程切换效率因此也大幅度提高.
- 数据读写是以字节流为单位效率不高:NIO维护一个缓冲区每次从这个缓冲区里面读取一块的数据,数据读写不再以字节为单位,而是以字节块为单位.
public class NIOServer {
/**
* serverSelector负责轮询是否有新的连接,clientSelector负责轮询连接是否有数据可读.
* 服务端监测到新的连接不再创建一个新的线程,而是直接将新连接绑定到clientSelector上,这样不用IO模型中1w个while循环在死等
* clientSelector被一个while死循环包裹,如果在某一时刻有多条连接有数据可读通过 clientSelector.select(1)方法轮询出来进而批量处理
* 数据的读写以内存块为单位
*
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException {
Selector serverSelector = Selector.open();
Selector clientSelector = Selector.open();
new Thread(() -> {
try {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(8000));
serverSocketChannel.configureBlocking(false);
serverSocketChannel.register(serverSelector, SelectionKey.OP_ACCEPT);
while (true) {
// 轮询监测是否有新的连接
if (serverSelector.select(1) > 0) {
Set<SelectionKey> selectionKeys = serverSelector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
while (keyIterator.hasNext()) {
SelectionKey selectionKey = keyIterator.next();
if (selectionKey.isAcceptable()) {
try {
//(1)每来一个新连接不需要创建一个线程而是直接注册到clientSelector
SocketChannel socketChannel = ((ServerSocketChannel) selectionKey.channel()).accept();
socketChannel.configureBlocking(false);
socketChannel.register(clientSelector, SelectionKey.OP_READ);
} finally {
keyIterator.remove();
}
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
while (true) {
// (2)批量轮询是否有哪些连接有数据可读
if (clientSelector.select(1) > 0) {
Set<SelectionKey> selectionKeys = serverSelector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
while (keyIterator.hasNext()) {
SelectionKey selectionKey = keyIterator.next();
if (selectionKey.isReadable()) {
try {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
//(3)读取数据以块为单位批量读取
socketChannel.read(byteBuffer);
byteBuffer.flip();
System.out.println(Charset.defaultCharset().newDecoder().decode(byteBuffer)
.toString());
} finally {
keyIterator.remove();
selectionKey.interestOps(SelectionKey.OP_READ);
}
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}).start();
}
}
Netty编程方式
public class NettyServer {
/**
* 1.boss对应,IOServer.java中的接受新连接线程,主要负责创建新连接
* 2.worker对应 IOClient.java中的负责读取数据的线程,主要用于读取数据以及业务逻辑处理
*
* @param args
*/
public static void main(String[] args) {
ServerBootstrap serverBootstrap = new ServerBootstrap();
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(msg);
}
});
}
}).bind(8000);
}
}
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
Bootstrap bootstrap = new Bootstrap();
NioEventLoopGroup group = new NioEventLoopGroup();
bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());
}
});
Channel channel = bootstrap.connect("127.0.0.1", 8000).channel();
while (true) {
channel.writeAndFlush(new Date() + ": hello world!");
Thread.sleep(2000);
}
}
}
服务端启动流程
要启动Netty服务端,必须要指定三类属性,分别是线程模型、IO模型、连接读写处理逻辑.
Netty服务端启动流程:创建引导类->指定线程模型、IO模型、连接读写处理逻辑->绑定端口.
服务端启动流程.png
/**
* 01: 服务端启动流程介绍[https://www.jianshu.com/p/ec3ebb396943]
* 要启动Netty服务端,必须要指定三类属性,分别是线程模型、IO模型、连接读写处理逻辑
* Netty服务端启动的流程是创建引导类给引导类指定线程模型,IO模型,连接读写处理逻辑,绑定端口之后服务端就启动起来
* bind方法是异步的通过异步机制来实现端口递增绑定
* Netty服务端启动额外的参数,主要包括给服务端channel或者channel设置属性值,设置底层TCP参数
*/
public class NettyServer {
private static final int BEGIN_PORT = 8000;
private static final AttributeKey<Object> SERVER_NAME_KEY = AttributeKey.newInstance("serverName");
private static final String SERVER_NAME_VALUE = "nettyServer";
public static final AttributeKey<Object> CLIENT_KEY = AttributeKey.newInstance("clientKey");
public static final String CLIENT_VALUE = "clientValue";
/**
* 创建两个NioEventLoopGroup,这两个对象可以看做是传统IO编程模型的两大线程组,boosGroup表示监听端口,创建新连接的线程组,workerGroup表示处理每一条连接的数据读写的线程组
* 创建引导类 ServerBootstrap进行服务端的启动工作,通过.group(boosGroup, workerGroup)给引导类配置两大线程定型引导类的线程模型指定服务端的IO模型为NIO,通过.channel(NioServerSocketChannel.class)来指定IO模型
* 调用childHandler()方法给引导类创建ChannelInitializer定义后续每条连接的数据读写,业务处理逻辑,泛型参数NioSocketChannel是Netty对NIO类型的连接的抽象,而NioServerSocketChannel也是对NIO类型的连接的抽象
* serverBootstrap.bind()是异步的方法调用之后是立即返回的,返回值是ChannelFuture,给ChannelFuture添加监听器GenericFutureListener,在GenericFutureListener的operationComplete方法里面监听端口是否绑定成功
* childHandler()用于指定处理新连接数据的读写处理逻辑,handler()用于指定在服务端启动过程中的一些逻辑
* attr()方法给服务端的channel即NioServerSocketChannel指定一些自定义属性,通过channel.attr()取出该属性,给NioServerSocketChannel维护一个map
* childAttr()方法给每一条连接指定自定义属性,通过channel.attr()取出该属性
* childOption()方法给每条连接设置一些TCP底层相关的属性:
* ChannelOption.SO_KEEPALIVE表示是否开启TCP底层心跳机制,true为开启
* ChannelOption.SO_REUSEADDR表示端口释放后立即就可以被再次使用,因为一般来说,一个端口释放后会等待两分钟之后才能再被使用
* ChannelOption.TCP_NODELAY表示是否开始Nagle算法,true表示关闭,false表示开启,通俗地说,如果要求高实时性,有数据发送时就马上发送,就关闭,如果需要减少发送次数减少网络交互就开启
* option()方法给服务端channel设置一些TCP底层相关的属性:
* ChannelOption.SO_BACKLOG表示系统用于临时存放已完成三次握手的请求的队列的最大长度,如果连接建立频繁,服务器处理创建新连接较慢,适当调大该参数
*
* @param args
*/
public static void main(String[] args) {
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new ChannelInitializer<ServerSocketChannel>() {
@Override
protected void initChannel(ServerSocketChannel ch) throws Exception {
System.out.println("服务端启动中");
System.out.println(ch.attr(SERVER_NAME_KEY).get());
}
})
.attr(SERVER_NAME_KEY, SERVER_NAME_VALUE)
.option(ChannelOption.SO_BACKLOG, 1024)
.childAttr(CLIENT_KEY, CLIENT_VALUE)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.SO_REUSEADDR, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
System.out.println(ch.attr(CLIENT_KEY).get());
}
});
bind(serverBootstrap, BEGIN_PORT);
}
private static void bind(ServerBootstrap serverBootstrap, int port) {
serverBootstrap.bind(BEGIN_PORT).addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
if (future.isSuccess()) {
System.out.println("端口[" + port + "]绑定成功!");
} else {
System.err.println("端口[" + port + "]绑定失败!");
bind(serverBootstrap, port + 1);
}
}
});
}
}
客户端启动流程
要启动Netty客户端,必须要指定三类属性,分别是线程模型、IO模型、连接读写处理逻辑.
Netty客户端启动流程:创建引导类->指定线程模型、IO模型、连接读写处理逻辑->建立连接.
客户端启动流程.png
失败重连通过connect()异步回调机制实现指数退避重连逻辑:
// 第几次重连
int order = (MAX_RETRY - retry) + 1;
// 本次重连的间隔
int delay = 1 << order;
bootstrap.config().group().schedule(() -> connect(bootstrap, host, port, retry - 1), delay, TimeUnit.SECONDS);
客户端与服务端双向通信
客户端/服务端连接读写逻辑处理均是启动阶段通过给逻辑处理链 Pipeline 添加逻辑处理器实现连接数据的读写逻辑.
客户端连接成功回调逻辑处理器channelActive()方法,客户端/服务端接收连接数据调用channelRead()方法.
写数据调用writeAndFlush()方法,客户端与服务端交互的二进制数据传输载体为ByteBuf,ByteBuf通过连接的内存管理器创建即ctx.alloc().buffer(),通过writeBytes()方法将字节数据填充到ByteBuf 写到对端.
双向通信.png
网友评论