1. Netty 自己的ByteBuf
ByteBuf是为解决ByteBuffer的问题和满足网络应用程序开发人员的日常需求而设计的。
- JDK ByteBuffer的缺点:
- 无法动态扩容
长度固定,无法动态扩展和收缩,当数据大于ByteBuffer容量时,会发生索引越界异常。
- API使用复杂
读写的时候需要手工调用flip() 和 rewind() 等方法,使用时需要非常谨慎的使用这些api,否则很容易出现错误。
2. ByteBuf做了哪些增强
- API操作便捷
- 动态扩容
- 多种ByteBuf实现
- 高效的零拷贝机制
3. ByteBuf 操作
ByteBuf三个重要属性:capacity容量、readerIndex读取位置、writerlndex写入位置。
提供了两个指针变量来支持顺序读和写操作,分别是readerlndex和写操作writerIndex。
- 常用方法定义:
- 随机访问索引 getByte
- 顺序读 read*
- 顺序写 write*
- 清除已读内容 discardReadBytes
- 清除缓冲区 clear
- 搜索操作
- 标记和重置
- 引用计数和释放
- 下图显示了一个缓冲区是如何被两个指针分割成三个区域的:
- 一个完整的示例:
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.junit.Test;
import java.util.Arrays;
/**
* bytebuf的常规API操作示例
*/
public class ByteBufDemo {
@Test
public void apiTest() {
// +-------------------+------------------+------------------+
// | discardable bytes | readable bytes | writable bytes |
// | | (CONTENT) | |
// +-------------------+------------------+------------------+
// | | | |
// 0 <= readerIndex <= writerIndex <= capacity
// 1.创建一个非池化的ByteBuf,大小为10个字节
ByteBuf buf = Unpooled.buffer(10);
System.out.println("原始ByteBuf为====================>" + buf.toString());
System.out.println("1.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
// 2.写入一段内容
byte[] bytes = {1, 2, 3, 4, 5};
buf.writeBytes(bytes);
System.out.println("写入的bytes为====================>" + Arrays.toString(bytes));
System.out.println("写入一段内容后ByteBuf为===========>" + buf.toString());
System.out.println("2.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
// 3.读取一段内容
byte b1 = buf.readByte();
byte b2 = buf.readByte();
System.out.println("读取的bytes为====================>" + Arrays.toString(new byte[]{b1, b2}));
System.out.println("读取一段内容后ByteBuf为===========>" + buf.toString());
System.out.println("3.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
// 4.将读取的内容丢弃
buf.discardReadBytes();
System.out.println("将读取的内容丢弃后ByteBuf为========>" + buf.toString());
System.out.println("4.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
// 5.清空读写指针
buf.clear();
System.out.println("将读写指针清空后ByteBuf为==========>" + buf.toString());
System.out.println("5.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
// 6.再次写入一段内容,比第一段内容少
byte[] bytes2 = {1, 2, 3};
buf.writeBytes(bytes2);
System.out.println("写入的bytes为====================>" + Arrays.toString(bytes2));
System.out.println("写入一段内容后ByteBuf为===========>" + buf.toString());
System.out.println("6.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
// 7.将ByteBuf清零
buf.setZero(0, buf.capacity());
System.out.println("将内容清零后ByteBuf为==============>" + buf.toString());
System.out.println("7.ByteBuf中的内容为================>" + Arrays.toString(buf.array()) + "\n");
// 8.再次写入一段超过容量的内容
byte[] bytes3 = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11};
buf.writeBytes(bytes3);
System.out.println("写入的bytes为====================>" + Arrays.toString(bytes3));
System.out.println("写入一段内容后ByteBuf为===========>" + buf.toString());
System.out.println("8.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
// 随机访问索引 getByte
// 顺序读 read*
// 顺序写 write*
// 清除已读内容 discardReadBytes
// 清除缓冲区 clear
// 搜索操作
// 标记和重置
// 完整代码示例:参考
// 搜索操作 读取指定位置 buf.getByte(1);
//
}
}
- 执行结果
原始ByteBuf为====================>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 0, cap: 10)
1.ByteBuf中的内容为===============>[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
写入的bytes为====================>[1, 2, 3, 4, 5]
写入一段内容后ByteBuf为===========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 5, cap: 10)
2.ByteBuf中的内容为===============>[1, 2, 3, 4, 5, 0, 0, 0, 0, 0]
读取的bytes为====================>[1, 2]
读取一段内容后ByteBuf为===========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 2, widx: 5, cap: 10)
3.ByteBuf中的内容为===============>[1, 2, 3, 4, 5, 0, 0, 0, 0, 0]
将读取的内容丢弃后ByteBuf为========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 3, cap: 10)
4.ByteBuf中的内容为===============>[3, 4, 5, 4, 5, 0, 0, 0, 0, 0]
将读写指针清空后ByteBuf为==========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 0, cap: 10)
5.ByteBuf中的内容为===============>[3, 4, 5, 4, 5, 0, 0, 0, 0, 0]
写入的bytes为====================>[1, 2, 3]
写入一段内容后ByteBuf为===========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 3, cap: 10)
6.ByteBuf中的内容为===============>[1, 2, 3, 4, 5, 0, 0, 0, 0, 0]
将内容清零后ByteBuf为==============>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 3, cap: 10)
7.ByteBuf中的内容为================>[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
写入的bytes为====================>[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
写入一段内容后ByteBuf为===========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 14, cap: 64)
8.ByteBuf中的内容为===============>[0, 0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
4. ByteBuf 动态扩容
capacity默认值: 256 字节、最大值:Integer.MAX_VALUE(2GB)
write* 方法调用时,通过AbstractByteBuf.ensureWriteable0 进行检查。
容量计算方法:AbstractByteBufAllocator.calculateNewCapacity (新capacity的最小要求,capacity最大值)
根据新capacity的最小值要求,对应有两套计算方法:
-
没超过4兆:从64字节开始,每次增加一倍,直至计算出来的newCapacity满足新容量最小要求。
示例:当前大小256,已写250,继续写10字节数据,需要的容量最小要求是261,则新容量是6422*2=512 -
超过4兆:新容量=新容量最小要求/4兆*4兆+4兆
示例:当前大小3兆,已写3兆,继续写2兆数据,需要的容量最小要求是5兆,则新容量是9兆〈不能超过最大值)。 -
4兆的来源:一个固定的阈值AbstractByteBufAllocator.CALCULATE_THRESHOLD
5. 选择合适的ButeBuf 实现
了解核心的: 3个维度的划分方式,8 种具体实现
在使用中,都是通过ByteBufAllocator分配器进行申请,同时分配器具备有内存管理的功能
Netty 中默认使用的是 PooledUnsafeDirectByteBuf,但是推荐我们使用的是UnpooledHeapByteBuf。
- 例子
package com.study.hc.net.netty.demo;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.junit.Test;
import java.util.Arrays;
/**
* 堆外内存的常规API操作示例
*/
public class DirectByteBufDemo {
@Test
public void apiTest() {
// +-------------------+------------------+------------------+
// | discardable bytes | readable bytes | writable bytes |
// | | (CONTENT) | |
// +-------------------+------------------+------------------+
// | | | |
// 0 <= readerIndex <= writerIndex <= capacity
// 1.创建一个非池化的ByteBuf,大小为10个字节
ByteBuf buf = Unpooled.directBuffer(10);
System.out.println("原始ByteBuf为====================>" + buf.toString());
// 下面的这个没有实现,所以无法正常运行
// System.out.println("1.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
// 2.写入一段内容
byte[] bytes = {1, 2, 3, 4, 5};
buf.writeBytes(bytes);
System.out.println("写入的bytes为====================>" + Arrays.toString(bytes));
System.out.println("写入一段内容后ByteBuf为===========>" + buf.toString());
//System.out.println("2.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
// 3.读取一段内容
byte b1 = buf.readByte();
byte b2 = buf.readByte();
System.out.println("读取的bytes为====================>" + Arrays.toString(new byte[]{b1, b2}));
System.out.println("读取一段内容后ByteBuf为===========>" + buf.toString());
//System.out.println("3.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
// 4.将读取的内容丢弃
buf.discardReadBytes();
System.out.println("将读取的内容丢弃后ByteBuf为========>" + buf.toString());
//System.out.println("4.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
// 5.清空读写指针
buf.clear();
System.out.println("将读写指针清空后ByteBuf为==========>" + buf.toString());
//System.out.println("5.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
// 6.再次写入一段内容,比第一段内容少
byte[] bytes2 = {1, 2, 3};
buf.writeBytes(bytes2);
System.out.println("写入的bytes为====================>" + Arrays.toString(bytes2));
System.out.println("写入一段内容后ByteBuf为===========>" + buf.toString());
// System.out.println("6.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
// 7.将ByteBuf清零
buf.setZero(0, buf.capacity());
System.out.println("将内容清零后ByteBuf为==============>" + buf.toString());
// System.out.println("7.ByteBuf中的内容为================>" + Arrays.toString(buf.array()) + "\n");
// 8.再次写入一段超过容量的内容
byte[] bytes3 = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11};
buf.writeBytes(bytes3);
System.out.println("写入的bytes为====================>" + Arrays.toString(bytes3));
System.out.println("写入一段内容后ByteBuf为===========>" + buf.toString());
// System.out.println("8.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
// 随机访问索引 getByte
// 顺序读 read*
// 顺序写 write*
// 清除已读内容 discardReadBytes
// 清除缓冲区 clear
// 搜索操作
// 标记和重置
// 完整代码示例:参考
// 搜索操作 读取指定位置 buf.getByte(1);
//
}
}
在使用中,都是通过ByteBufAllocator 分配器进行申请,同时分配器具备有内存管理的功能。
6. Unsafe的实现
unsafe意味着不安全的操作。但是更底层的操作会带来性能提升和特殊功能,Netty中会尽力使用unsafe.
Java语言很重要的特性是“一次编写到处运行”,所以它针对底层的内存或者其他操作,做了很多封装。而unsafe提供了一系列我们操作底层的方法,可能会导致不兼容或者不可知的异常。
7. PooledByteBuf对象、内存复用
PoolThreadCache : PooledByteBufAllocator实例维护的一个线程变量。
多种分类的MemoryRegionCache数组用作内存缓存,MemoryRegionCache内部是链表,队列里面存ChunkoPoolChunk里面维护了内存引用,内存复用的做法就是把buf的memory指向chunk的memoryo
PooledByteBufAllocator.ioBuffer运作过程梳理:
8. 零拷贝机制
Netty的零拷贝机制,是一种应用层的实现。和底层JVM、操作系统内存机制并无过多关联。
1、CompositeByteBuf,将多个ByteBuf合并为一个逻辑上的ByteBuf,避免了各个ByteBuf之间的拷贝。
2、wrapedBuffer()方法,将byte[]数组包装成ByteBuf对象。
3、slice()方法。将一个ByteBuf对象切分成多个ByteBuf对象。
使用ByteBuf是Netty高性能很重要的一个原因!
- 示例
package com.study.hc.net.netty.demo;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import java.nio.charset.Charset;
/**
* 零拷贝示例
*/
public class ZeroCopyTest {
@org.junit.Test
public void wrapTest() {
byte[] arr = {1, 2, 3, 4, 5};
ByteBuf byteBuf = Unpooled.wrappedBuffer(arr);
System.out.println(byteBuf.getByte(4));
arr[4] = 6;
System.out.println(byteBuf.getByte(4));
}
@org.junit.Test
public void sliceTest() {
ByteBuf buffer1 = Unpooled.wrappedBuffer("hello".getBytes());
ByteBuf newBuffer = buffer1.slice(1, 2);
newBuffer.unwrap();
System.out.println(newBuffer.toString());
}
@org.junit.Test
public void compositeTest() {
ByteBuf buffer1 = Unpooled.buffer(3);
buffer1.writeByte(1);
ByteBuf buffer2 = Unpooled.buffer(3);
buffer2.writeByte(4);
CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();
CompositeByteBuf newBuffer = compositeByteBuf.addComponents(true, buffer1, buffer2);
System.out.println(newBuffer);
}
}
如果觉得有收获就点个赞吧,更多知识,请点击关注查看我的主页信息哦~
网友评论