对象池
将对象实例缓存起来供后续分配使用,避免瞬时大量对象的反复生成和销毁造成的分配和GC压力
设计思路
单线程
直接使用一个List容器,最简单
多线程
Netty是异步编程框架,往往申请内存的线程和释放内存的线程时不同的,所以我们猜想的是每个线程都有一个类似List的容器,不同线程的对象可以暂时放在容器内
netty内存池回收流程

即Stack是通过线程变量的方式存储,来避免多线程之间的竞争

Stack维护了一个DefaultHandle数据,当数组为空时,就需要从其它线程获取,Map<Stack<?>, WeakOrderQueue>每个Stack都有一个对应的WeakOrderQueue,多个线程之间的WeakOrderQueue通过链表指针进行关联起来,然后每个WeakOrderQueue的内部数据结构如下

即WeakOrderQueue内部又维护了一个Link链表
我们再看Link的数据结构

我们还记得刚才Stack也是维护了一个Handle数组,所以Link内部也是维护了一个Handle数组

下面我们分析下源码
public class RecyclerTest {
private static final Recycler<User> RECYCLER = new Recycler<User>() {
@Override
protected User newObject(Handle<User> handle) {
return new User(handle);
}
};
public static void main(String[] args) {
User user = RECYCLER.get();
user.recyle();
User user1 = RECYCLER.get();
System.out.println(user==user1);
}
private static class User{
private final Recycler.Handle<User> handle;
private User(Recycler.Handle<User> handle) {
this.handle = handle;
}
public void recyle(){
handle.recycle(this);
}
}
}
即是通过handle.recycle()和get()方法来进行操作的
get()方法
public final T get() {
if (maxCapacityPerThread == 0) {
return newObject((Handle<T>) NOOP_HANDLE);
}
Stack<T> stack = threadLocal.get();
DefaultHandle<T> handle = stack.pop();
if (handle == null) {
handle = stack.newHandle();
handle.value = newObject(handle);
}
return (T) handle.value;
}
recycle()方法
@Override
public void recycle(Object object) {
if (object != value) {
throw new IllegalArgumentException("object does not belong to handle");
}
stack.push(this);
}
先来分析下handle.recycle()是怎么把对象放入的
void push(DefaultHandle<?> item) {
Thread currentThread = Thread.currentThread();
if (threadRef.get() == currentThread) {
// The current Thread is the thread that belongs to the Stack, we can try to push the object now.
pushNow(item); 当前线程,则立马进行放入
} else {
// The current Thread is not the one that belongs to the Stack
// (or the Thread that belonged to the Stack was collected already), we need to signal that the push
// happens later.
pushLater(item, currentThread);
}
}
同线程回收
private void pushNow(DefaultHandle<?> item) {
if ((item.recycleId | item.lastRecycledId) != 0) {
throw new IllegalStateException("recycled already");
}
item.recycleId = item.lastRecycledId = OWN_THREAD_ID;
int size = this.size;
if (size >= maxCapacity || dropHandle(item)) {
// Hit the maximum capacity or should drop - drop the possibly youngest object.
return;
}
if (size == elements.length) {
elements = Arrays.copyOf(elements, min(size << 1, maxCapacity));
}
elements[size] = item;
this.size = size + 1;
}
比较简单,就不多解释了
异线程回收
private void pushLater(DefaultHandle<?> item, Thread thread) {
// we don't want to have a ref to the queue as the value in our weak map
// so we null it out; to ensure there are no races with restoring it later
// we impose a memory ordering here (no-op on x86)
Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get();
WeakOrderQueue queue = delayedRecycled.get(this);
if (queue == null) {
if (delayedRecycled.size() >= maxDelayedQueues) {
// Add a dummy queue so we know we should drop the object
delayedRecycled.put(this, WeakOrderQueue.DUMMY);
return;
}
// Check if we already reached the maximum number of delayed queues and if we can allocate at all.
if ((queue = WeakOrderQueue.allocate(this, thread)) == null) {
// drop object
return;
}
delayedRecycled.put(this, queue);
} else if (queue == WeakOrderQueue.DUMMY) {
// drop object
return;
}
queue.add(item);
}
存在WeakOrderQueue就直接放入,不存在则新建WeakOrderQueue
我们再来看是怎么取出对象的
DefaultHandle<T> pop() {
int size = this.size;
if (size == 0) { 当stack里面获取不到时,需要考虑从其它线程获取
if (!scavenge()) {
return null;
}
size = this.size;
}
size --;
DefaultHandle ret = elements[size]; 使用数组存储
elements[size] = null;
if (ret.lastRecycledId != ret.recycleId) {
throw new IllegalStateException("recycled multiple times");
}
ret.recycleId = 0;
ret.lastRecycledId = 0;
this.size = size;
return ret;
}
即先从本地Stack中的Handle数组中取,没有的话则调用scavenge()从其它线程取
scavenge()方法如下
boolean scavenge() {
// continue an existing scavenge, if any
if (scavengeSome()) {
return true;
}
// reset our scavenge cursor
prev = null;
cursor = head;
return false;
}
scavengeSome() 方法如下
boolean scavengeSome() {
WeakOrderQueue prev;
WeakOrderQueue cursor = this.cursor;
if (cursor == null) {
prev = null;
cursor = head;
if (cursor == null) {
return false;
}
} else {
prev = this.prev;
}
boolean success = false;
do {
if (cursor.transfer(this)) { 着重看这个方法,即从WeakOrderQueue里面的Link将Handle传送至Stack的Handle数组
success = true;
break;
}
WeakOrderQueue next = cursor.next;
if (cursor.owner.get() == null) { 这里是假如有线程销毁了,那对应的WeakOrderQueue也会销毁,但如果该WeakOderQueue里面还有对象的话,要尽量回收
// If the thread associated with the queue is gone, unlink it, after
// performing a volatile read to confirm there is no data left to collect.
// We never unlink the first queue, as we don't want to synchronize on updating the head.
if (cursor.hasFinalData()) {
for (;;) {
if (cursor.transfer(this)) {
success = true;
} else {
break;
}
}
}
if (prev != null) {
prev.setNext(next);
}
} else {
prev = cursor;
}
cursor = next;
} while (cursor != null && !success);
this.prev = prev;
this.cursor = cursor;
return success;
}
transfer()方法如下
boolean transfer(Stack<?> dst) {
Link head = this.head.link;
if (head == null) {
return false;
}
if (head.readIndex == LINK_CAPACITY) {
if (head.next == null) {
return false;
}
this.head.link = head = head.next;
}
final int srcStart = head.readIndex;
int srcEnd = head.get();
final int srcSize = srcEnd - srcStart;
if (srcSize == 0) {
return false;
}
final int dstSize = dst.size;
final int expectedCapacity = dstSize + srcSize;
if (expectedCapacity > dst.elements.length) {
final int actualCapacity = dst.increaseCapacity(expectedCapacity);
srcEnd = min(srcStart + actualCapacity - dstSize, srcEnd);
}
if (srcStart != srcEnd) {
final DefaultHandle[] srcElems = head.elements; Link里面的Handle数组
final DefaultHandle[] dstElems = dst.elements; Stack里面的Handle数组
int newDstSize = dstSize;
for (int i = srcStart; i < srcEnd; i++) {
DefaultHandle element = srcElems[i];
if (element.recycleId == 0) {
element.recycleId = element.lastRecycledId;
} else if (element.recycleId != element.lastRecycledId) {
throw new IllegalStateException("recycled already");
}
srcElems[i] = null;
if (dst.dropHandle(element)) {
// Drop the object.
continue;
}
element.stack = dst;
dstElems[newDstSize ++] = element;
}
if (srcEnd == LINK_CAPACITY && head.next != null) {
// Add capacity back as the Link is GCed.
this.head.reclaimSpace(LINK_CAPACITY);
this.head.link = head.next;
}
head.readIndex = srcEnd;
if (dst.size == newDstSize) {
return false;
}
dst.size = newDstSize;
return true;
} else {
// The destination stack is full already.
return false;
}
}
}
关于Link里面的readIndex以及srcEnd可以参考Mysql的redolog的图

write pos是当前记录的位置,一边写一边后移,写到3号末尾就回到0号开头.
checkpoint是当前要擦除的位置,也是往后移并且循环的,擦除前需要把记录更新到数据文件.
重复回收检测
为了避免重复回收对象,可回收对象包含了两个属性recycleId和lastRecycleId. lastRecycleId用来存储最后一次回收线程的ID,recycleId在元素放入Stack数组时设置值域lastRecycleId相等,后续通过是否相等来判断是否重复回收
- 从对象池取出的对象时判断recycleId和lastRecycleId是否相等,相等则设置两个id为0,否则抛出异常
*对象回收至线程时判断是否2个ID为0,是则设置recycleId=lastRecycleId=本线程唯一ID,否则抛出异常
*对象回收至其它线程时设置lastResycleId=回收线程ID
*对象从其它线程转移至stack时如果recycleId为0则设置recycleId=lastRecycleId,如果recycleId不为0,意味着再其它线程也执行了回收动作,抛出异常.
网友评论