nio是基于事件驱动模型的非阻塞io,这篇文章简要介绍了nio,本篇主要介绍Buffer的实现原理。
Buffer
是一块缓冲区,通常使用buffer读写数据为:
RandomAccessFile aFile = new RandomAccessFile("data/nio-data.txt", "rw");
FileChannel inChannel = aFile.getChannel();
//1.create buffer with capacity of 48 bytes
ByteBuffer buf = ByteBuffer.allocate(48);
//2.write into buffer
int bytesRead = inChannel.read(buf);
while (bytesRead != -1) {
//3.make buffer from write mode to read mode
buf.flip();
while(buf.hasRemaining()){
//4. read 1 byte from buffer
System.out.print((char) buf.get());
}
//5.调用clear()方法或者compact()方法,make buffer ready for writing
buf.clear();
bytesRead = inChannel.read(buf);
}
aFile.close();
Buffer的数据结构设计如下:
其中:
capacity
:buffer的固定大小值position
:在写模式下,表示当前写入数据的位置。在读模式下,表示当前已读到数据的位置limit
:在写模式下,表示最大可写的位置,为capacity ,在读模式下,表示最大可读位置。此外,Buffer类中还有以下参数:
mark
:初始值为-1,用于备份当前的position。address
:buffer对象持有的堆外内存(DirectByteBuffer)的内存地址,方便JNI 快速找到堆外内存地址。Buffer相关的类结构如下:
Buffer类结构.png
Buffer或ByteBuffer的方法简介:
1.Buffer的分配:
//分配一份堆内内存
public static ByteBuffer allocate(int capacity) {
if (capacity < 0)
throw new IllegalArgumentException();
return new HeapByteBuffer(capacity, capacity);
}
//分配一份堆外内存
public static ByteBuffer allocateDirect(int capacity) {
return new DirectByteBuffer(capacity);
}
2.向buffer写入数据:
//各种put方法,也可从channel中读取,inChannel.read(buf)
put(....)
3.从buffer中读取数据
//也可以这样写入channel。 int bytesWritten = inChannel.write(buf);
get()
4.flip():将Buffer从写模式切换到读模式
public final Buffer flip() {
limit = position;
position = 0;
mark = -1;
return this;
}
5.rewind():将position设回0,所以你可以重读Buffer中的所有数据
public final Buffer rewind() {
position = 0;
mark = -1;
return this;
}
6.clear()与compact():一旦读完Buffer中的数据,需要让Buffer准备好再次被写入。可以通过clear()或compact()方法来完成。
compact()方法将所有未读的数据拷贝到Buffer起始处。然后将position设到最后一个未读元素正后面。limit属性依然像clear()方法一样,设置成capacity。现在Buffer准备好写数据了,但是不会覆盖未读的数据。
public final Buffer clear() {
position = 0;
limit = capacity;
mark = -1;
return this;
}
7.mark()与reset()
通过调用Buffer.mark()方法,可以标记Buffer中的一个特定position。之后可以通过调用Buffer.reset()方法恢复到这个position。方便数据的重新读取,而流只能是单向读取。
ByteBuffer的两种实现:
HeapByteBuffer:Java中分配的非空对象都是由Java虚拟机的垃圾收集器管理的,也称为堆内内存(on-heap memory)。虚拟机会定期对垃圾内存进行回收,在某些特定的时间点,它会进行一次彻底的回收(full gc)。彻底回收时,垃圾收集器会对所有分配的堆内内存进行完整的扫描,这意味着一个重要的事实——这样一次垃圾收集对Java应用造成的影响,跟堆的大小是成正比的。过大的堆会影响Java应用的性能。JVM参数中 -Xmx
的值是新生代和老生代的和的最大值,我们在jvm参数里通常还会加一个参数-XX:MaxPermSize
来指定持久代的最大值,那么我们认识的Java堆的最大值其实是-Xmx和-XX:MaxPermSize的总和,在分代算法下,新生代,老生代和持久代是连续的虚拟地址,因为它们是一起分配的。
DirectByteBuffer:由该对象创建的在jvm之外的内存,对于生命期中等或较长的对象,正是堆外内存要解决的。堆外内存有以下特点:
对于大内存有良好的伸缩性
对垃圾回收停顿的改善可以明显感觉到
在进程间可以共享,减少虚拟机间的复制
接下来看一下DirectByteBuffer的实现:
DirectByteBuffer(int cap) { // package-private
super(-1, 0, cap, cap);
boolean pa = VM.isDirectMemoryPageAligned();
int ps = Bits.pageSize();
long size = Math.max(1L, (long)cap + (pa ? ps : 0));
//确认堆外内存是否够用
Bits.reserveMemory(size, cap);
long base = 0;
try {
//分配堆外内存,并返回堆外内存的基地址
base = unsafe.allocateMemory(size);
} catch (OutOfMemoryError x) {
Bits.unreserveMemory(size, cap);
throw x;
}
unsafe.setMemory(base, size, (byte) 0);
if (pa && (base % ps != 0)) {
// Round up to page boundary
address = base + ps - (base & (ps - 1));
} else {
address = base;
}
cleaner = Cleaner.create(this, new Deallocator(base, size, cap));
att = null;
}
Bits.reserveMemory(size, cap) 方法,该方法用于在系统中保存总分配内存(按页分配)的大小和实际内存的大小。
// These methods should be called whenever direct memory is allocated or
// freed. They allow the user to control the amount of direct memory
// which a process may access. All sizes are specified in bytes.
static void reserveMemory(long size, int cap) {
if (!memoryLimitSet && VM.isBooted()) {
maxMemory = VM.maxDirectMemory();
memoryLimitSet = true;
}
// optimist!内存够用!
if (tryReserveMemory(size, cap)) {
return;
}
final JavaLangRefAccess jlra = SharedSecrets.getJavaLangRefAccess();
// retry while helping enqueue pending Reference objects
// which includes executing pending Cleaner(s) which includes
// Cleaner(s) that free direct buffer memory
while (jlra.tryHandlePendingReference()) {
if (tryReserveMemory(size, cap)) {
return;
}
}
// trigger VM's Reference processing
System.gc();
// a retry loop with exponential back-off delays
// (this gives VM some time to do it's job)
boolean interrupted = false;
try {
long sleepTime = 1;
int sleeps = 0;
while (true) {
if (tryReserveMemory(size, cap)) {
return;
}
if (sleeps >= MAX_SLEEPS) {
break;
}
if (!jlra.tryHandlePendingReference()) {
try {
Thread.sleep(sleepTime);
sleepTime <<= 1;
sleeps++;
} catch (InterruptedException e) {
interrupted = true;
}
}
}
// no luck
throw new OutOfMemoryError("Direct buffer memory");
} finally {
if (interrupted) {
// don't swallow interrupts
Thread.currentThread().interrupt();
}
}
}
如果堆内存不够分配的话,jlra.tryHandlePendingReference()
将触发一次非阻塞的Reference#tryHandlePending(false)
,该方法会将已经被JVM垃圾回收的DirectBuffer对象的堆外内存释放。
如果还是无法释放足够的内存,将会触发System.gc()
,该方法会触发一个full gc,如果JVM参数没有设置-XX:+DisableExplicitGC
。但是调用System.gc()并不能够保证full gc马上就能被执行。所以在后面打代码中,会进行最多9次尝试,看是否有足够的可用堆外内存来分配堆外内存。并且每次尝试之前,都对延迟等待时间,已给JVM足够的时间去完成full gc操作。如果9次尝试后依旧没有足够的可用堆外内存来分配本次堆外内存,则抛出OutOfMemoryError("Direct buffer memory”)异常。
之所以用使用full gc的很重要的一个原因是:System.gc()会对新生代和老生代都进行内存回收,这样会比较彻底地回收DirectByteBuffer对象以及他们关联的堆外内存。
有两个问题:堆外内存是多大?
代码中maxMemory = VM.maxDirectMemory();
private static long directMemory = 64 * 1024 * 1024; //64MB
public static long maxDirectMemory() {
return directMemory;
}
实际上在 JVM启动时,会对System做初始化,实际上堆外内存的大小设置逻辑为:
如果通过-Dsun.nio.MaxDirectMemorySize指定了这个属性,只要它不等于-1,那效果和加了-XX:MaxDirectMemorySize一样的,如果两个参数都没指定,那么最大堆外内存的值来自于directMemory = Runtime.getRuntime().maxMemory()。
其中在我们使用CMS GC的情况下的实现如下,其实是新生代的最大值-一个survivor的大小+老生代的最大值,也就是我们设置的-Xmx的值里除去一个survivor的大小就是默认的堆外内存的大小。
堆外内存的回收机制是什么?
Cleaner
是PhantomReference的子类,并通过自身的next和prev字段维护的一个双向链表
。PhantomReference
的作用在于跟踪垃圾回收过程,并不会对对象的垃圾回收过程造成任何的影响。
DirectByteBuffer对象在创建的时候关联了一个Cleaner,(cleaner = Cleaner.create(this, new Deallocator(base, size, cap));
)说到PhantomReference它其实主要是用来跟踪对象何时被回收的,它不能影响gc决策,但是gc过程中如果发现某个对象除了只有PhantomReference引用它之外,并没有其他的地方引用它了,那将会把这个引用放到java.lang.ref.Reference.pending队列里,在gc完毕的时候通知ReferenceHandler这个守护线程去执行一些后置处理,而DirectByteBuffer关联的cleaner是PhantomReference的一个子类,在最终的处理里会通过Unsafe的free接口来释放DirectByteBuffer对应的堆外内存块.
private static class ReferenceHandler extends Thread {
ReferenceHandler(ThreadGroup g, String name) {
super(g, name);
}
public void run() {
for (;;) {
Reference r;
synchronized (lock) {
if (pending != null) {
r = pending;
Reference rn = r.next;
pending = (rn == r) ? null : rn;
r.next = r;
} else {
try {
lock.wait();
} catch (InterruptedException x) { }
continue;
}
}
// Fast path for cleaners
if (r instanceof Cleaner) {
//
((Cleaner)r).clean();
continue;
}
ReferenceQueue q = r.queue;
if (q != ReferenceQueue.NULL) q.enqueue(r);
}
}
}
//如果System.gc();
被禁止,也会触发堆外内存的回收
Reference#tryHandlePending(false)
static boolean tryHandlePending(boolean var0) {
Reference var1;
Cleaner var2;
try {
Reference.Lock var3 = lock;
synchronized(lock) {
if (pending == null) {
if (var0) {
lock.wait();
}
return var0;
}
var1 = pending;
//cleaner对象
var2 = var1 instanceof Cleaner ? (Cleaner)var1 : null;
pending = var1.discovered;
var1.discovered = null;
}
} catch (OutOfMemoryError var6) {
Thread.yield();
return true;
} catch (InterruptedException var7) {
return true;
}
if (var2 != null) {
//clean方法回收
var2.clean();
return true;
} else {
ReferenceQueue var8 = var1.queue;
if (var8 != ReferenceQueue.NULL) {
var8.enqueue(var1);
}
return true;
}
}
public void clean() {
//将当前Cleaner从Cleaner链表中移除,这样当clean()执行完后,Cleaner就是一个无引用指向的对象了,也就是可被GC回收的对象
if (remove(this)) {
try {
//thunk 在directByteBuffer 是Deallocator 对象
this.thunk.run();
} catch (final Throwable var2) {
AccessController.doPrivileged(new PrivilegedAction<Void>() {
public Void run() {
if (System.err != null) {
(new Error("Cleaner terminated abnormally", var2)).printStackTrace();
}
System.exit(1);
return null;
}
});
}
}
}
private Deallocator(long address, long size, int capacity) {
assert (address != 0);
this.address = address;
this.size = size;
this.capacity = capacity;
}
public void run() {
if (address == 0) {
// Paranoia
return;
}
//回收掉堆外内存
unsafe.freeMemory(address);
address = 0;
//修改堆外内存的剩余容量大小
Bits.unreserveMemory(size, capacity);
}
所以如果一直触发不了cms gc或者full gc,老年代的DirectByteBuffer对象不能被回收,那么堆外内存就一直不能被回收,可能导致内存泄漏。
参考资料:
http://ifeve.com/buffers/
http://lovestblog.cn/blog/2015/05/12/direct-buffer/
https://www.jianshu.com/p/007052ee3773
网友评论