-
1.netty reactor工作机制
image.png
- 2.netty启动源码剖析
-3.netty接受请求源码剖析
-
关于BIO
同步阻塞,服务器实现模式为一个连接一个线程,可以通过线程池机制改善(实现多个客户连接服务器)
BIO编程简单流程:
1.服务器端启动一个ServerSocket
2.客户端启动Socket对服务器进行通信,默认情况下服务器端需要对每个客户建立一个线程与之通信
3.客户端发出请求后,先咨询服务器是否有线程响应,如果没有则会等待,或者被拒绝
4.如果有响应,客户端线程会等待请求结束后,再继续执行
- BIO案例:
1.使用BIO模型编写一个服务器端,监听6666端口,当有客户端连接时,启动一个线程与之通信
2.要求使用线程池机制改善,可以连接多个客户端
3.服务器端可以接收客户端发送的数据(telnet)
-
NIO
non-blocking IO , 三大核心
Channel(通道)
Buffer(缓冲区)
Selector(选择器)
BIO以流的方式处理数据,NIO以块的方式。
NIO基于Channel通道和Buffer缓冲区 进行操作,数据总是从通道读到缓冲区,或者从缓冲区写入到通道
Selector选择器用于监听多个通道的事件,比如:连接请求、数据到达。因此使用单个线程就能监听多个客户端通道
BIO时阻塞的,NIO是非阻塞的
NIO三大核心组件 关系图
image.png
1.每个channel都会对应一个buffer
2.一个Selector对应一个线程,一个线程对应多个channel
3.Channel需要被注册到Selector中
4.程序切换到哪个channel是由事件决定的(事件驱动)
5.Selector会根据不同的事件在各个Channel通道上切换
6.Buffer就是一个内存块,底层是有一个数组
7.数据的读取或写入是通过Buffer,BIO中要么是输入流,要么是输出流,不能双向。但是NIO的Buffer可以读写,需要使用flip方法切换
8.channel是双向的,可以返回
- 缓冲区
本质上是一个可以读写数据的内存块,可以理解成一个容器对象(含数组),该对象提供了一组方法,可以更轻松的使用内存块,缓冲区对象内置了一些机制,能够跟踪和记录缓冲区的状态变化情况,Channel提供从文件、网络读取数据的渠道,但是读取或写入的数据都必须经过Buffer
image.png
在NIO中,Buffer是一个顶层父类,是一个抽象类
常用Buffer子类一览
ByteBuffer
:存储字节数据到缓冲区
ShortBuffer
:存储字符串数据到缓冲区
CharBuffer
:存储字符数据到缓冲区
IntBuffer
:存储整数数据到缓冲区
LongBuffer
:存储长整形数据到缓冲区
DoubleBuffer
:存储小数数据到缓冲区
FloatBuffer
:存储小数数据到缓冲区
四个常用属性:
Capacity
:可以容纳的最大数据量,在缓冲区创建时被设定并且不能改变
Limit
:缓冲区的当前终点,不能对缓冲区超过极限的位置进行读写操作。且极限是可以修改的
Position
:位置,下一个要被读或写的元素的索引,每次读写缓冲区数据时都会改变该值
Mark
:标记
- Channel
Channel在NIO中是一个借口用,常见的Channel类由:
FileChannel
、DatagramChannel
、ServerSocketChannel
、SocketChannel
FileChannel
:用于文件的数据读写
DatagramChannel
:用于UDP的数据读写
ServerSocketChannel
和SocketChannel
用于TCP的数据读写
FileChannel案例1(往本地test.txt写入数据)
image.png
public static void main(String[] args) {
// 将字符串 "hello 中华人民共和国!" 写入到文件中
try (FileOutputStream fileOutputStream = new FileOutputStream("/Users/yuliang/IdeaProjects/NettyProject/src/test.txt");
FileChannel fileChannel = fileOutputStream.getChannel();)
{
String str = "hello 中华人民共和国!";
// 超级缓冲区
// byte[] bytes = new byte[1024];
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
//将str放入缓冲区
byteBuffer.put(str.getBytes());
// 对byteBuffer flip
byteBuffer.flip();
//将byteBuffer数据写入到fileOutputStream
fileChannel.write(byteBuffer);
} catch (FileNotFoundException e){
e.printStackTrace();
} catch (IOException e){
e.printStackTrace();
}
}
- FileChannel案例2(本地test.txt读数据)
public static void main(String[] args) {
// 创建文件的输入流
File file = new File("/Users/yuliang/IdeaProjects/NettyProject/src/test.txt");
try (FileInputStream fileInputStream = new FileInputStream(file);){
// 通过FileInputStream获取对应的FileChannel(实际类型FileChannelImpl)
FileChannel fileChannel = fileInputStream.getChannel();
// 创建缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocate((int)file.length());
//将通道的数据读入到buffer
fileChannel.read(byteBuffer);
// 将byteBuffer字节数据转为String
System.out.println(new String(byteBuffer.array()));
} catch (FileNotFoundException e){
e.printStackTrace();
} catch (IOException e){
e.printStackTrace();
}
}
输出: hello 中华人民共和国!
FileChannel案例3(使用一个buffer完成文件的读取和写入)
方案:使用FileChannel和方法read、write完成文件的拷贝。拷贝一个文本文件2.txt放到项目下即可
image.png
public static void main(String[] args) {
try (FileInputStream fileInputStream = new FileInputStream("/Users/yuliang/IdeaProjects/NettyProject/src/test.txt");
FileOutputStream fileOutputStream = new FileOutputStream("/Users/yuliang/IdeaProjects/NettyProject/src/test2.txt")
){
FileChannel fileInputChannel = fileInputStream.getChannel();
FileChannel fileOutputChannel = fileOutputStream.getChannel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
while(true){
byteBuffer.clear();
int read = fileInputChannel.read(byteBuffer);
if (read == -1){
break;
}
byteBuffer.flip();
// 将buffer中的数据写入到fileOutputChannel
fileOutputChannel.write(byteBuffer);
}
}catch (FileNotFoundException e){
e.printStackTrace();
} catch (IOException e){
e.printStackTrace();
}
}
FileChannel案例4(使用FileChannel和transferFrom拷贝文件(图片))
public static void main(String[] args) {
try (FileInputStream fileInputStream = new FileInputStream("/Users/yuliang/IdeaProjects/NettyProject/src/3.jpg");
FileOutputStream fileOutputStream = new FileOutputStream("/Users/yuliang/IdeaProjects/NettyProject/src/noexist.jpg");
FileChannel fileInputChannel = fileInputStream.getChannel();
FileChannel fileOutputChannel = fileOutputStream.getChannel();
){
fileOutputChannel.transferFrom(fileInputChannel, 0, fileInputChannel.size());
} catch (FileNotFoundException e){
e.printStackTrace();
} catch (IOException e){
e.printStackTrace();
}
}
- Buffer和Channel的注意细节:
1.ByteBuffer支持类型化的put和get,存入啥类型取出啥类型
2.NOI提供了MappedByteBuffer,可以让文件直接存哉内存(堆外的内存)中进行修改,而如何同步到文件由NIO完成
3.NIO还支持通过多个buffer(buffer数组)完成读写操作,即Scattering和Gathering
MappedByteBuffer的使用
public static void main(String[] args) {
try(RandomAccessFile randomAccessFile = new RandomAccessFile("/Users/yuliang/IdeaProjects/NettyProject/src/test.txt", "rw");
FileChannel fileChannel = randomAccessFile.getChannel();
) {
/**
* 1.读写模式: FileChannel.MapMode.READ_WRITE
* 2.position: 可以直接修改的起始位置
* 3.size:是映射到内存的大小(不是索引的位置),可以将多少个字节映射到内存,可以直接修改的范围0-5
*/
MappedByteBuffer byteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 16);
// 原始数据: "hello 中华人民共和国!"
// 修改后数据:"Hello-中华人民共和国!"
byteBuffer.put(0, (byte) 'H');
byteBuffer.put(5, (byte) '-');
} catch (FileNotFoundException e){
e.printStackTrace();
} catch (IOException e){
e.printStackTrace();
}
}
- Selector选择器
1.Java的NIO用到非阻塞IO方式,可以用一个线程,处理多个大的客户端连接,就会使用到selector选择器
2.Selector能够检测多个注册的通道上是否有时间发生(多个channel以事件的方式可以注册到同一个Selector),如果有事件发生便获取事件然后进行处理,这样可以只使用一个单线程去管理多个通道,也就是管理多个连接和请求
3.只有在连接正真有读写事件发生时,才会进行读写,大大减少了系统开销,不必为每个连接都创建一个线程,不用维护多个线程
4.避免了多个线程之间的上下文切换导致的开销
5.NIO中的ServerSocketChannel功能类似ServerSocket,SocketChannel功能类似Socket
NIO非阻塞网络编程原理分析图
NIO非阻塞网络编程相关的(Selector、SelectionKey、ServerSocketChannel和SocketChannel) 关系梳理图
image.png
图解:
1.当客户端连接时,会通过ServerSocketChannel得到SocketChannel
2.Selector进行监听(select方法), 返回有事件发生的通道的个数
3.将SocketChannel注册到Selector上(register方法),一个Selector可以注册多个SocketChannel
4.注册后返回一个SelectionKey,会和该Selector关联(集合)
5.进一步得到有事件发生的SelectionKey
6.再通过SelectionKey反向获取SocketChannel
7.通过得到的SocketChannel完成业务处理
案例:编写一个NIO入门案例,实现客户器端和客户端之间的数据简单通讯(非阻塞)
# 客户端
package cn.netty.server;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
public class NIOClient {
public static void main(String[] args) {
//得到一个网络通道
try (SocketChannel socketChannel = SocketChannel.open();){
// 设置为非阻塞
socketChannel.configureBlocking(false);
// 服务器的ip、端口
InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666);
//连接服务器
if(!socketChannel.connect(inetSocketAddress)){
while(!socketChannel.finishConnect()){
System.out.println("因为连接需要时间,客户端不会阻塞,可以做其他工作");
}
}
// 连接成功, 发送数据
String data = "hello wudy!";
ByteBuffer byteBuffer = ByteBuffer.wrap(data.getBytes());
// 发送数据,将buffer的数据写入channel
socketChannel.write(byteBuffer);
System.in.read();
} catch (IOException e){
e.printStackTrace();
}
}
}
# 服务端
package cn.netty.server;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class NIOServer {
public static void main(String[] args) {
//1.创建ServerSocketChannel
//2.拿到Selector
try(ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
Selector selector = Selector.open();
){
//绑定一个端口,在服务器监听
serverSocketChannel.socket().bind(new InetSocketAddress(6666));
//设置为非阻塞
serverSocketChannel.configureBlocking(false);
// 把serverSocketChannel注册到Selector, 关心OP_ACCEPT事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while(true){
if (selector.select(1000) == 0){
System.out.println("服务器等待了1秒,无连接");
continue;
}
// 获取到了关注的OP_ACCEPT事件集合
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
if (iterator.hasNext()){
// 获取SelectionKey
SelectionKey selectionKey = iterator.next();
// 根据key对应的通道发生的事件进行不同逻辑处理
if (selectionKey.isAcceptable()){// 如果是OP_ACCEPT,表示有新的客户连接
// 该客户端生成一个SocketChannel
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
// 将当前的socketChannel注册到Selector,关注事件为OP_READ,同时给socketChannel关联一个buffer
socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
} else if (selectionKey.isReadable()){ // 发送OP_READ
//通过key,反向获取到对应channel
SocketChannel socketChannel = (SocketChannel)selectionKey.channel();
//获取到该channel关联的buffer
ByteBuffer byteBuffer = (ByteBuffer)selectionKey.attachment();
socketChannel.read(byteBuffer);
System.out.println("from 客户端 " + new String(byteBuffer.array()));
}
// 手动从集合中移除当前的selectionKey,防止重复操作
iterator.remove();
}
}
} catch (IOException e){
e.printStackTrace();
}
}
}
SelectionKey
相关方法:
1.Selector selector() 得到与之关联的Selector对象
2.SelectableChannel channel() 得到与之关联的通道
3.Object attachment() 得到与之关联的共享数据
4.SelectionKey interestOps(int ops) 设置或改变监听事件
5.boolean isAcceptable()
6.boolean isWritable()
7.boolean isWritable()
ServerSocketChannel
在服务器端监听新的客户端Socket连接
相关方法:
1.ServerSocketChannel open() 得到一个ServerSocketChannel通道
2.ServerSocketChannel bind(SocketAddress local) 设置服务器端端口号
3.SelectableChannel configureBlocking(boolean block) 设置阻塞或非阻塞模式,false表示非阻塞模式
4.SocketChannel accept() 接收一个连接,返回代表这个连接的通道对象
5.SelectionKey register(Selector sel, int ops) 注册一个选择器并设置监听事件
SocketChannel
IO网络通道,具体负责读写操作,NIO把缓冲区的数据写入通道,或者把通道里的数据读到缓冲区
1.SocketChannel open() 得到一个SocketChannel通道
2.boolean connect(SocketAddress remote) 连接服务器
3.boolean finishConnect() 如果上面的方法连接失败,接下来就要通过该方法完成连接操作
4.int write(ByteBuffer src) 往通道里写数据
5.int read(ByteBuffer dst) 从通道里读数据
6.SelectionKey register(Selector sel,int ops , Object att) 注册一个选择器并设置监听事件,最后一个参数可以设置共享数据
7.void close() 关闭通道
案例: 群聊系统
实例要求:
1.编写一个NIO群聊系统,实现服务器端和客户端之间的简单通信(非阻塞)
2.实现多人群聊
3.服务器端:可以监测用户上线,离线,并实现消息转发功能
4.客户端:通过channel可以无阻塞发送消息给其他所有用户,同时可以接收其他用户发送的消息(由服务器转发得到)
image.pngNetty是一个异步的基于事件驱动的网络应用框架
三大核心:
可扩展的事件模型(Extension Event Model)
API
零拷贝
线程模型:
1.目前存在的现场模型有:传统阻塞I/O服务模型、Reactor模式
2.根据Reactor的数量和处理资源池线程的数量不同,有3种典型的实现:
单Reactor单线程
单Reactor多线程
主从Reactor多线程
3.Netty线程模式(Netty主要基于主从多线程模型做了一定的改进,其中主从Reactor多线程模型有多个Reactor)
-
传统阻塞IO服务模型
image.png
Reactor模式(反应器模式 也叫 分发者模式Dispatcher)
- 针对传统阻塞IO服务模型的2个缺点,解决方案:
1.基于IO复用模型:多个连接公用一个阻塞对象,应用程序只需要在一个阻塞对象等待,无需阻塞等待所有连接。当某个连接有新的数据可以处理时,操作系统通知应用程序,线程从阻塞状态返回,开始进行业务处理
2.基于线程池复用线程资源:不必再为每个连接创建线程,将连接完成后的业务处理任务分配给线程进行处理,一个线程可以处理多个连接的业务
image.png image.png说明:
1.Reactor模式通过一个或多个输入同时传递给服务处理器的模式(基于事件驱动)
2.服务器端程序处理传入的多个请求,并将它们同步分派到相应的处理线程,因此Reactor模式也叫Dispatcher模式
3.Reactor模式使用IO复用监听事件,收到事件后,分发给某个线程(这就是网络服务器高并发处理的关键)
-
单Reactor单线程模型
单线程在应对高并发的时候显得力不从心
image.png方案说明:
1.Select是前面I/O复用模型介绍的标准网络编程API,可以实现应用程序通过一个阻塞对象监听多路连接请求
2.Reactor对象通过Select监控客户端请求事件,收到事件后通过Dispatch进行分发,
3.如果是建立连接的请求,则由Acceptor通过Accept处理连接请求,然后创建一个Handler对象处理连接完成后的后续业务处理
4.如果不是建立连接请求,则Reactor会分发调用对应的Handler来响应
5.Handler会完成Read->业务处理->Send的晚餐业务流程
-
单Reactor多线程
-
主从Reactor多线程模型
-
Netty模型
-
TaskQueue自定义任务
任务队列中的task有3种经典使用场景
1.用户程序自定义的普通任务
image.png
2.用户自定义定时任务
// 用户自定义定时任务(提交到scheduleTaskQueue)
ctx.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(5 * 1000); // 睡眠5秒
ctx.writeAndFlush(Unpooled.copiedBuffer("first hello java!" + "当前时间=" + DateFormat.getTimeInstance().format(new Date()), CharsetUtil.UTF_8));
} catch (Exception e){
System.out.println("捕获异常=" + e.getMessage());
}
}
}, 5, TimeUnit.SECONDS);
ctx.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(10 * 1000); // 睡眠5秒
ctx.writeAndFlush(Unpooled.copiedBuffer("third hello java!" + "当前时间=" + DateFormat.getTimeInstance().format(new Date()), CharsetUtil.UTF_8));
} catch (Exception e){
System.out.println("捕获异常=" + e.getMessage());
}
}
}, 10, TimeUnit.SECONDS);
3.非当前Reactor线程调用Channel的各种方法
例如在推送系统的业务线程里面,根据用户的标示找到对应的Channel引用,然后调用Write类方法向该用户推送消息,就会进入到这种场景,。最终的write会提交到任务队列中后被异步消费
-
异步模型
image.pngFuture说明
- 1.他表示异步的执行结果,可以通过它提供的方法来监测执行是否完成,比如检索计算等
- 2.ChannelFuture是一个接口,
public interface ChannelFuture extends Future<Void>
, 我们可以添加监听起,当监听的事件发生时,就会通知到监听器
链式操作示意图
image.png
-
异步模型
image.png
// 示例
//绑定一个端口并且同步生成了一个ChannelFuture对象
// 启动服务器并启动监听
ChannelFuture channelFuture = serverBootstrap.bind(6668).sync();
// 给channelFuture注册监听器,管控我们关心的事件
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (channelFuture.isSuccess()){
System.out. println("监听端口6668成功");
} else {
System.out.println("监听失败");
}
}
});
image.png
-
快速入门实例
package cn.netty.http.server;
import cn.netty.http.handler.ServerHandlerInitializer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class HttpServer {
public static void main(String[] args) throws Exception{
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ServerHandlerInitializer());
ChannelFuture channelFuture = serverBootstrap.bind(3169).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
// HttpServerHandler
package cn.netty.http.handler;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;
import java.net.URI;
/**
* 说明:
* 1.SimpleChannelInboundHandler是ChannelInboundHandlerAdapter
* 2.HttpObject客户端和服务器端相互通讯的数据被封装成HttpObject
*/
public class HttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {
// channelRead0 读取客户端数据
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpObject httpObject) throws Exception {
// 判断httpObject是不是httprequest请求
if (httpObject instanceof HttpRequest) {
// 不同的浏览器和相同浏览器每次刷新 都会是不一样的hashCoed
System.out.println("pipeline hashcode=" + channelHandlerContext.pipeline().hashCode() + "HttpServerhandler hashCOde=" + this.hashCode());
System.out.println("httpObject 类型=" + httpObject.getClass());
System.out.println("客户端地址=" + channelHandlerContext.channel().remoteAddress());
// 获取HttpRequest
HttpRequest httpRequest = (HttpRequest) httpObject;
// 获取uri
URI uri = new URI(httpRequest.getUri());
if ("/favicon.ico".equals(uri.getPath())){
System.out.println("被拦截的请求,不做处理");
return;
}
// 回复信息给浏览器
ByteBuf content = Unpooled.copiedBuffer("hello, 我是服务器!", CharsetUtil.UTF_8);
// 构造一个http的响应,即httpResponse
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());
// 将构建好的response返回
channelHandlerContext.writeAndFlush(response);
}
}
}
// ServerInitializer
package cn.netty.http.handler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpServerCodec;
public class ServerHandlerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//向管道加入处理器
//得到管道
ChannelPipeline pipeline = socketChannel.pipeline();
// 加入一个netty提供的HttpServerCodec Codec = [coder - decoder]
// HttpServerCodec: 是Netty提供的基于http的编码解码器
pipeline.addLast("MyHttpServerCodec", new HttpServerCodec());
// 增加一个自定义Handler
pipeline.addLast("MyHttpServerHandler", new HttpServerHandler());
}
}
网友评论