Netty是什么
Netty是一个高性能、高可扩展的异步事件驱动的网络应用程序框架,它极大的简化了TCP和UDP客户端和服务器开发等网络编程
Netty重要的四个内容
- Reactor线程模型:一种高性能的多线程程序设计思路
- Netty中自定义的Channel概念:增强版的通道概念
- ChannelPipeline职责链设计模式:事件处理机制
- 内存管理:增强的ByteBuf缓存区
Netty整体结构
netty结构图图片来自Netty官网 https://netty.io/
- 支撑Socket等多种传输方式
- 提供了多种协议的编码实现
- 核心设计包含事件处理模型、API的使用、ByteBuffer的增强
Netty EchoServer 代码示例
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.concurrent.GenericFutureListener;
/**
* Echoes back any received data from a client.
*/
public final class EchoServer {
static final int PORT = Integer.parseInt(System.getProperty("port", "8080"));
public static void main(String[] args) throws Exception {
// Configure the server.
// 创建EventLoopGroup accept线程组 NioEventLoop
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// 创建EventLoopGroup I/O线程组
EventLoopGroup workerGroup2 = new NioEventLoopGroup(1);
try {
// 服务端启动引导工具类
ServerBootstrap b = new ServerBootstrap();
// 配置服务端处理的reactor线程组以及服务端的其他配置
b.group(bossGroup, workerGroup2).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.DEBUG)).childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new EchoServerHandler());
}
});
// 通过bind启动服务
ChannelFuture f = b.bind(PORT).sync();
// 阻塞主线程,知道网络服务被关闭
f.channel().closeFuture().sync();
} finally {
// 关闭线程组
bossGroup.shutdownGracefully();
workerGroup2.shutdownGracefully();
}
}
}
-
bind():绑定端口
-
group() : 设定accept,客户端连接,所有I/O事件由谁处理
-
channel():创建具体的通道
-
handler():处理服务端通道请求
-
option():通道相关配置
-
childHandler():处理具体客户端socket连接后的请求
Netty线程模型
为了让NIO处理更好的利用多线程特性,Netty实现了Reactor线程模型;Reactor模型中有四个核心概念:
- Resources资源(请求/任务)
- Synchronous Event Demultiplexer 同步事件复用器
- Dispatcher分配器
- Request Handler 请求处理器
EventLoopGroup 事件轮询器
EventLoopGroup EventLoopChannel概念
netty中的Channel是一个抽象的概念,可以理解为对JDK NIO Channel的增强和拓展;
AbstractChannel常见的属性和方法
- Pipeline 通道内事件处理链路
- EventLoop 绑定的EventLoop,用于执行操作
- Unsafe 提供 I/O相关的封装操作
- config() 返回通道配置信息
- read() 开始读数据,触发读取链路调用
- write() 写数据,触发链路调用
- bind() 绑定
设计模式-责任链模式
发起请求和具体处理请求的过程解偶:职责链上的处理者负责处理请求,客户只需将请求发送到职责链上即可,无须关心请求的处理细节和请求的传递
责任链模式实现责任链模式4个要素
- 处理器抽象类
- 具体的处理器实现类
- 保存处理器信息
- 处理执行
// -----链表形式调用------netty就是类似的这种形式
public class PipelineDemo {
/**
* 初始化的时候造一个head,作为责任链的开始,但是并没有具体的处理
*/
public HandlerChainContext head = new HandlerChainContext(new AbstractHandler() {
@Override
void doHandler(HandlerChainContext handlerChainContext, Object arg0) {
handlerChainContext.runNext(arg0);
}
});
public void requestProcess(Object arg0) {
this.head.handler(arg0);
}
public void addLast(AbstractHandler handler) {
HandlerChainContext context = head;
while (context.next != null) {
context = context.next;
}
context.next = new HandlerChainContext(handler);
}
public static void main(String[] args) {
PipelineDemo pipelineChainDemo = new PipelineDemo();
pipelineChainDemo.addLast(new Handler2());
pipelineChainDemo.addLast(new Handler1());
pipelineChainDemo.addLast(new Handler1());
pipelineChainDemo.addLast(new Handler2());
// 发起请求
pipelineChainDemo.requestProcess("火车呜呜呜~~");
}
}
/**
* handler上下文,我主要负责维护链,和链的执行
*/
class HandlerChainContext {
HandlerChainContext next; // 下一个节点
AbstractHandler handler;
public HandlerChainContext(AbstractHandler handler) {
this.handler = handler;
}
void handler(Object arg0) {
this.handler.doHandler(this, arg0);
}
/**
* 继续执行下一个
*/
void runNext(Object arg0) {
if (this.next != null) {
this.next.handler(arg0);
}
}
}
// 处理器抽象类
abstract class AbstractHandler {
/**
* 处理器,这个处理器就做一件事情,在传入的字符串中增加一个尾巴..
*/
abstract void doHandler(HandlerChainContext handlerChainContext, Object arg0); // handler方法
}
// 处理器具体实现类
class Handler1 extends AbstractHandler {
@Override
void doHandler(HandlerChainContext handlerChainContext, Object arg0) {
arg0 = arg0.toString() + "..handler1的小尾巴.....";
System.out.println("我是Handler1的实例,我在处理:" + arg0);
// 继续执行下一个
handlerChainContext.runNext(arg0);
}
}
// 处理器具体实现类
class Handler2 extends AbstractHandler {
@Override
void doHandler(HandlerChainContext handlerChainContext, Object arg0) {
arg0 = arg0.toString() + "..handler2的小尾巴.....";
System.out.println("我是Handler2的实例,我在处理:" + arg0);
// 继续执行下一个
handlerChainContext.runNext(arg0);
}
}
Netty中的ChannelPipeline责任链
ChannelPipeline入站事件和出站事件
入站事件:通常指I/O线程成了入站数据
(通俗理解:从socket底层自己往上冒上来的事件都是入站)
比如EventLoop收selector的OP_READ事件,入站处理器调用socketChannel.read(ByteBuffer)接收到数据后,这将导致通道的ChannelPipeline中包含的下一个中的channelRead方法被调用
出站事件:经常指I/O线程执行实际的输出操作
(通俗理解:想主动往socket底层操作的事件都是出站)
比如bind方法用意是请求server socket绑定到给定的SocketAddress,这将导致通道的ChannelPipeline中包含的下一个出站处理器中的bind方法被调用
Netty中的事件定义
Netty中的事件定义Pipeline中的handler是什么
ChannelHandler
用于处理I/O事件或拦截I/O操作,并转发到ChannelPipeline中的下一个处理器
这个顶级接口定义功能很弱,实际使用时会去实现下面两大子接口:
- 处理入站I/O事件的ChannelInboundHandler
- 处理出站I/O操作的ChannelOutboundHandler
适配器类
为了开发方便,避免所有handler去实现一遍接口方法,Netty提供了简单实现类
- ChannelInboundHandlerAdapter处理入站I/O事件
- ChannelOutbundHandlerAdapter 处理出站I/O操作
- ChannelDuplexHandler 支持同时处理入站和出站事件
ChannelHandlerContext
实际存储在Pipeline中的对象并非ChannelHandler,而是上下文对象
将handler,包裹在上下文对象中,通过上下文对象与它所属的ChannelPipeline交互,向上或者向下传递事件,或者修改pipeline都是通过上下文对象
ChannelPipeline是线程安全的,ChannelHandler可以在任何时候添加或删除
handler apiHandler示例
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.nio.charset.Charset;
import java.util.Arrays;
/**
* Handler implementation for the echo server.
*/
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println("收到数据:" + ((ByteBuf)msg).toString(Charset.defaultCharset()));
ctx.write(Unpooled.wrappedBuffer("98877".getBytes()));
// ((ByteBuf) msg).release();
ctx.fireChannelRead(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
Handler的执行分析
Handler执行顺序.pngregistered入站事件的处理
registered入站事件的处理bind出站事件的处理
bind出站事件处理pipeline分析的关键4要素
- 什么事件
- 有哪些处理器
- 哪些会被触发
- 执行顺序
Netty 中的ByteBuf
JDK ByteBuffer的缺点:无法动态扩容,API使用复杂
ByteBuf增强
- API操作便捷性
- 动态扩容
- 多种ByteBuf实现
- 高效的零拷贝机制
ByteBuf 三个重要的属性
- capacity 容量
- readerIndex 读取位置
- writerIndex 写入位置
相比JDK中的一个指针,Netty中的ByteBuf提供了两个指针来支出顺序读和写操作
示例
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.junit.Test;
import java.util.Arrays;
/**
* bytebuf的常规API操作示例
*/
public class ByteBufDemo {
@Test
public void apiTest() {
// +-------------------+------------------+------------------+
// | discardable bytes | readable bytes | writable bytes |
// | | (CONTENT) | |
// +-------------------+------------------+------------------+
// | | | |
// 0 <= readerIndex <= writerIndex <= capacity
// 1.创建一个非池化的ByteBuf,大小为10个字节
ByteBuf buf = Unpooled.buffer(10);
System.out.println("原始ByteBuf为====================>" + buf.toString());
System.out.println("1.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
// 2.写入一段内容
byte[] bytes = {1, 2, 3, 4, 5};
buf.writeBytes(bytes);
System.out.println("写入的bytes为====================>" + Arrays.toString(bytes));
System.out.println("写入一段内容后ByteBuf为===========>" + buf.toString());
System.out.println("2.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
// 3.读取一段内容
byte b1 = buf.readByte();
byte b2 = buf.readByte();
System.out.println("读取的bytes为====================>" + Arrays.toString(new byte[]{b1, b2}));
System.out.println("读取一段内容后ByteBuf为===========>" + buf.toString());
System.out.println("3.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
// 4.将读取的内容丢弃
buf.discardReadBytes();
System.out.println("将读取的内容丢弃后ByteBuf为========>" + buf.toString());
System.out.println("4.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
// 5.清空读写指针
buf.clear();
System.out.println("将读写指针清空后ByteBuf为==========>" + buf.toString());
System.out.println("5.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
// 6.再次写入一段内容,比第一段内容少
byte[] bytes2 = {1, 2, 3};
buf.writeBytes(bytes2);
System.out.println("写入的bytes为====================>" + Arrays.toString(bytes2));
System.out.println("写入一段内容后ByteBuf为===========>" + buf.toString());
System.out.println("6.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
// 7.将ByteBuf清零
buf.setZero(0, buf.capacity());
System.out.println("将内容清零后ByteBuf为==============>" + buf.toString());
System.out.println("7.ByteBuf中的内容为================>" + Arrays.toString(buf.array()) + "\n");
// 8.再次写入一段超过容量的内容
byte[] bytes3 = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11};
buf.writeBytes(bytes3);
System.out.println("写入的bytes为====================>" + Arrays.toString(bytes3));
System.out.println("写入一段内容后ByteBuf为===========>" + buf.toString());
System.out.println("8.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
// 随机访问索引 getByte
// 顺序读 read*
// 顺序写 write*
// 清除已读内容 discardReadBytes
// 清除缓冲区 clear
// 搜索操作
// 标记和重置
// 完整代码示例:参考
// 搜索操作 读取指定位置 buf.getByte(1);
//
}
}
ByteBuf动态扩容
capacity默认值:256字节、最大值:Integer.MAX_VALUE()
容量计算方法:AbstractByteBufAllocator.calculateNewCapacity(新capacity的最小要求,capacity最大值)
capacity的最小只要求,对应两套计算方案
- 没超过4兆,从64字节开始,每次增加一倍,直至计算出来的新容量满足最小要求
- 超过4兆,新容量=新容量最小要求 / 4兆 * 4兆 + 4兆
选择适合的ByteBuf实现
- Netty默认使用 堆外内存 pool 中 unsafe方式
- 如果自己使用,它推荐使用堆内 unpoll 的safe方式
Unsafe
unsafe意味着不安全的操作。但是更底层的操作会带来性能提升和特殊功能,Netty中会尽力使用unsafe
Java语言中很重要的特性是一次编写到处运行,所以它针对底层的内存或者其他操作,做了很多封装。而unsafe提供了一系列我们操作底层的方法,可能会导致不兼容或者不可知的异常
PooledByteBuf对象、内存复用
PoolThreadCache: PooledByteBufAllocator实例维护的一个线程变量。
多种分类的MemoryRegionCache数组作用内存缓存,MemoryRegionCache内部是链表,队里里面存Chunk
PoolChunk里面维护了内存引用,内存复用的做法就是把buf的memory指向chunk的memory
PooledByteBufAllocator.ioBuff运作过程:
PooledByteBuf零拷贝机制
Netty的零拷贝机制,是一种应用层的实现。和底层JVM、操作系统内存机制并无过多关联
- 将多个ByteBuf合并成为一个逻辑上的ByteBuf,内存中原始数据不变
- 将byte[]数组包装成ByteBuf对象
- 将一个ByteBuf切分成多个对象
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import java.nio.charset.Charset;
/**
* 零拷贝示例
*/
public class ZeroCopyTest {
// 包装
@org.junit.Test
public void wrapTest() {
byte[] arr = {1, 2, 3, 4, 5};
ByteBuf byteBuf = Unpooled.wrappedBuffer(arr);
System.out.println(byteBuf.getByte(4));
arr[4] = 6;
System.out.println(byteBuf.getByte(4));
}
// 拆分
@org.junit.Test
public void sliceTest() {
ByteBuf buffer1 = Unpooled.wrappedBuffer("hello".getBytes());
ByteBuf newBuffer = buffer1.slice(1, 2);
newBuffer.unwrap();
System.out.println(newBuffer.toString());
}
// 合并
@org.junit.Test
public void compositeTest() {
ByteBuf buffer1 = Unpooled.buffer(3);
buffer1.writeByte(1);
ByteBuf buffer2 = Unpooled.buffer(3);
buffer2.writeByte(4);
CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();
CompositeByteBuf newBuffer = compositeByteBuf.addComponents(true, buffer1, buffer2);
System.out.println(newBuffer);
}
}
网友评论