本文基于 netty 4.1.46
以下为了方便描述,用户缓存的对象简称 T
一、Recycler 解决的问题
Recycler 即轻量级对象池,避免同一个线程重复创建对象。和异线程回收后的重用问题,例如:线程1创建对象T,线程2回收了对象T,线程1仍然可以通过get方法拿到对象T。
避免重复创建对象的好处是,减少了因创建对象的系统消耗,减轻了jvm yong gc 回收对象的压力。
注意: 何时用轻量级对象池 Recycler , "通过对象池来避免创建对象并不是一种好的做法,除非池中的对象是非常重量级的,现代的JVM实现具有高度优化的垃圾回收期,其性能很容易就会超过轻量级对象池的性能" 《Effective java》。举个例子:netty 的 PooledDirectByteBuf 就适合用对象池,因为堆外内存的分配,要系统调用有用户态和内核态的切换比较耗时,所以非常适用对象池。
二、使用方法
public class RecycleTest {
private static Recycler<User> RECYCLER = new Recycler<User>() {
@Override
protected User newObject(Handle handle) {
return new User(handle);
}
};
private static class User {
private final Recycler.Handle<User> handle;
public User(Recycler.Handle<User> handle) {
this.handle = handle;
}
public void recycle() {
handle.recycle(this);
}
}
public static void main(String[] args) {
final User user = RECYCLER.get();
new Thread(new Runnable() {
@Override
public void run() {
//异线程回收
user.recycle();
}
}).start();
final User user1 = RECYCLER.get();
//同线程回收
user1.recycle();
}
}
三、Recycler 的实现
1、Recycler 的 uml 图
clipboard.png2、Stack 类的结构
clipboard.png注意:
每一个Recycler对象包含一个FastThreadLocal<Stack<T>> threadLocal实例;每一个线程包含一个Stack对象,该Stack对象包含一个DefaultHandle[],而DefaultHandle中有一个属性T,用于存储真实对象。也就是说,每一个T都会被包装成一个DefaultHandle对象
Recyler类包含一个类对象
static FastThreadLocal<Map<Stack<T>, WeakOrderQueue>>
每一个线程对象包含一个 Map<Stack<T>, WeakOrderQueue>,存储着当前线程中所有回收其他线程的 T 对象的集合,包括其他线程不同的 Recyler 实例产生的变量产生的 T 或 U 对象,并且分别为其创建的 WeakOrderQueue 对象
WeakOrderQueue 对象中存储一个以Head为首的Link数组,每个Link对象中存储一个DefaultHandle[]数组,用于存放回收对象。
说明:(假设线程A创建对象T)
线程 A 回收 T 时,直接将 T 的 DefaultHandle 对象,压入 Stack的DefaultHandle[] elements 中;
线程 B 回收 T 时,会首先从其 Map<Stack<T>, WeakOrderQueue> 对象中 get('线程A的Stack对象')拿到 WeakOrderQueue,然后直接将T的DefaultHandle对象压入 WeakOrderQueue 中的Link 链表中的尾部 Link 的 DefaultHandle[] 中,同时,这个 WeakOrderQueue 会与线程A的Stack中的 head 属性进行关联,用于后续对象的 pop 操作;
当线程 A 从对象池获取对象时,如果线程A的Stack中有对象,则直接弹出;如果没有对象,则先从其 head 属性所指向的 WeakorderQueue 开始遍历 queue 链表,将T对象从其他线程的WeakOrderQueue中转移到线程A的Stack中(一次pop操作只转移一个包含了元素的Link),再从 Stack 中弹出。
四、源码分析
1 、变量对应关系和含义
静态变量 | 默认值 | Recycler 实例变量 | Stack 实例变量 | 含义 |
---|---|---|---|---|
DEFAULT_MAX_CAPACITY_PER_THREAD | 4 * 1024 | maxCapacityPerThread | availableSharedCapacity | 当前线程创建的 Handle 对象,被其他线程回收,并且能缓冲到 WeakOrderQueue 中 Handle 的数量 |
INITIAL_CAPACITY | 256 | Stack 中 DefaultHandle<?>[] elements 容量 | ||
LINK_CAPACITY | 16 | Link 中的数组 DefaultHandle<?>[] elements 容量 | ||
MAX_DELAYED_QUEUES_PER_THREAD | 2*cpu | maxDelayedQueuesPerThread | maxDelayedQueues | 当前线程创建的对象能够让多少个线程进行回收 |
8 | interval | 控制对象回收频率 |
2、获取对象get()
/**
* 1、获取当前线程的 Stack
* 2、从 Stack 里弹出对象
* 3、创建对象并且和 Stack 绑定
*/
@SuppressWarnings("unchecked")
public final T get() {
//如果 Stack 的容量为 0
if (maxCapacityPerThread == 0) {
return newObject((Handle<T>) NOOP_HANDLE);
}
//获取当前线程的 Stack
Stack<T> stack = threadLocal.get();
//从 Stack 里弹出对象
DefaultHandle<T> handle = stack.pop();
if (handle == null) {
//创建对象并且和 Stack 绑定,回收对象时,可以通过 Handle 调用 Stack 进行回收
handle = stack.newHandle();
handle.value = newObject(handle);
}
return (T) handle.value;
}
2.1、从Stack中取出对象 pop 方法
@SuppressWarnings({ "unchecked", "rawtypes" })
DefaultHandle<T> pop() {
int size = this.size;
//当前线程如果没有对象
if (size == 0) {
//试着从其他线程回收的 WeakOrderQueue 对象中取 Handle
if (!scavenge()) {
return null;
}
size = this.size;
if (size <= 0) {
// double check, avoid races
return null;
}
}
size --;
DefaultHandle ret = elements[size];
elements[size] = null;
// As we already set the element[size] to null we also need to store the updated size before we do
// any validation. Otherwise we may see a null value when later try to pop again without a new element
// added before.
this.size = size;
if (ret.lastRecycledId != ret.recycleId) {
throw new IllegalStateException("recycled multiple times");
}
ret.recycleId = 0;
ret.lastRecycledId = 0;
return ret;
}
2.2 scavengeSome () 方法试着从其他线程回收的 WeakOrderQueue 对象中取 Handle
private boolean scavengeSome() {
WeakOrderQueue prev;
WeakOrderQueue cursor = this.cursor;
if (cursor == null) {
prev = null;
cursor = head;
if (cursor == null) {
return false;
}
} else {
prev = this.prev;
}
boolean success = false;
do {
//将 WeakOrderQueue 中的数据拿到 Stack 中的 DefaultHandle<?>[] elements
if (cursor.transfer(this)) {
success = true;
break;
}//如果没有拿到
WeakOrderQueue next = cursor.getNext();
//cursor 锁持有的线程 被释放掉了,那么要做一些清理工作
if (cursor.get() == null) {
// If the thread associated with the queue is gone, unlink it, after
// performing a volatile read to confirm there is no data left to collect.
// We never unlink the first queue, as we don't want to synchronize on updating the head.
if (cursor.hasFinalData()) {//如果有数据
for (;;) {
if (cursor.transfer(this)) {
success = true;
} else {
break;
}
}
}
if (prev != null) {
// Ensure we reclaim all space before dropping the WeakOrderQueue to be GC'ed.
cursor.reclaimAllSpaceAndUnlink();
//删除 cursor 节点
prev.setNext(next);
}
} else {
//否则移动 prev 指针到下一个节点
prev = cursor;
}
//移动 cursor 指针到下一个节点
cursor = next;
//如果已经到结尾了,或者 已经找到了,那么循环退出
} while (cursor != null && !success);
this.prev = prev;
this.cursor = cursor;
return success;
}
2.3 transfer(this)) 方法;将 WeakOrderQueue 中的数据拿到 Stack 中的 DefaultHandle<?>[] elements
boolean transfer(Stack<?> dst) {
Link head = this.head.link;
//如果当前 Link 中没有数据
if (head == null) {
return false;
}
//当前Link中的数据 都没取走了
if (head.readIndex == LINK_CAPACITY) {
//如果 Link 队列中没有节点了
if (head.next == null) {
return false;
}
//将当前 Head 节点移除
head = head.next;
this.head.relink(head);
}
//head 节点已经被取走的数量
final int srcStart = head.readIndex;
//head 节点的 Handle 数量
int srcEnd = head.get();
//head 节点可读的 handle 数量
final int srcSize = srcEnd - srcStart;
//head 节点已经没有可读 handle 对象了
if (srcSize == 0) {
return false;
}
//当前 Stack 中 DefaultHandle 的数量
final int dstSize = dst.size;
final int expectedCapacity = dstSize + srcSize;
//head 节点中的 Link 的 Handle 全部读出来, Stack 中 DefaultHandle[] elements 能否放的下
if (expectedCapacity > dst.elements.length) {
final int actualCapacity = dst.increaseCapacity(expectedCapacity);
//算出 需要数组的最大 下标
srcEnd = min(srcStart + actualCapacity - dstSize, srcEnd);
}
if (srcStart != srcEnd) {
final DefaultHandle[] srcElems = head.elements;
final DefaultHandle[] dstElems = dst.elements;
int newDstSize = dstSize;
for (int i = srcStart; i < srcEnd; i++) {
DefaultHandle<?> element = srcElems[i];
if (element.recycleId == 0) {
element.recycleId = element.lastRecycledId;
} else if (element.recycleId != element.lastRecycledId) {
throw new IllegalStateException("recycled already");
}
srcElems[i] = null;
//控制回收频率
if (dst.dropHandle(element)) {
// Drop the object.
continue;
}
element.stack = dst;
dstElems[newDstSize ++] = element;
}
// 遍历下一个节点
if (srcEnd == LINK_CAPACITY && head.next != null) {
// Add capacity back as the Link is GCed.
this.head.relink(head.next);
}
head.readIndex = srcEnd;
if (dst.size == newDstSize) {
return false;
}
dst.size = newDstSize;
return true;
} else {
// The destination stack is full already.
return false;
}
}
}
3、回收对象 recycle()
/**
* 1、同线程回收对象
* 2、异线程回收对象
*/
@Override
public void recycle(Object object) {
if (object != value) {
throw new IllegalArgumentException("object does not belong to handle");
}
//此处的 stack 是创建 Handle 对象的线程持有的 stack
Stack<?> stack = this.stack;
if (lastRecycledId != recycleId || stack == null) {
throw new IllegalStateException("recycled already");
}
stack.push(this);
}
void push(DefaultHandle<?> item) {
Thread currentThread = Thread.currentThread();
//当前线程是否是创建 stack 的线程
if (threadRef.get() == currentThread) {
// The current Thread is the thread that belongs to the Stack, we can try to push the object now.
pushNow(item);
} else {
// The current Thread is not the one that belongs to the Stack
// (or the Thread that belonged to the Stack was collected already), we need to signal that the push
// happens later.
pushLater(item, currentThread);
}
}
3.1 同线程回收对象
private void pushNow(DefaultHandle<?> item) {
if ((item.recycleId | item.lastRecycledId) != 0) {
throw new IllegalStateException("recycled already");
}
item.recycleId = item.lastRecycledId = OWN_THREAD_ID;
int size = this.size;
//判断存放 handle 数量是否达到上限, 或者
if (size >= maxCapacity || dropHandle(item)) {
// Hit the maximum capacity or should drop - drop the possibly youngest object.
return;
}
//如果数组被填充满了,那么重新创建一个 两倍大小的数组,上限是maxCapacity
if (size == elements.length) {
elements = Arrays.copyOf(elements, min(size << 1, maxCapacity));
}
elements[size] = item;
this.size = size + 1;
}
3.2 异线程回收对象
/**
* 异线程回收对象
*
* 1、获取 WeakOrderQueue
* 2、创建 WeakOrderQueue
* 3、将对象追加到 WeakOrderQueue
*/
private void pushLater(DefaultHandle<?> item, Thread thread) {
if (maxDelayedQueues == 0) {
// We don't support recycling across threads and should just drop the item on the floor.
return;
}
// we don't want to have a ref to the queue as the value in our weak map
// so we null it out; to ensure there are no races with restoring it later
// we impose a memory ordering here (no-op on x86)
Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get();
// this 是指创建 handle 的线程持有的 Stack
WeakOrderQueue queue = delayedRecycled.get(this);
//当前线程没有回收过,创建 handle 的线程所持有 Stack 的 handle
if (queue == null) {
//当前线程回收的 Stack 数量是否 达到上限
if (delayedRecycled.size() >= maxDelayedQueues) {
// Add a dummy queue so we know we should drop the object
delayedRecycled.put(this, WeakOrderQueue.DUMMY);
return;
}
// Check if we already reached the maximum number of delayed queues and if we can allocate at all.
if ((queue = newWeakOrderQueue(thread)) == null) {
// drop object
return;
}
delayedRecycled.put(this, queue);
} else if (queue == WeakOrderQueue.DUMMY) {
// drop object
return;
}
queue.add(item);
}
创建 WeakOrderQueue,插入到创建 handle 的线程持有的 Stack 中的 head 在链表头部插入。
static WeakOrderQueue newQueue(Stack<?> stack, Thread thread) {
// We allocated a Link so reserve the space
// 被其他线程 缓冲的对象 有没有超过上限
if (!Head.reserveSpaceForLink(stack.availableSharedCapacity)) {
return null;
}
final WeakOrderQueue queue = new WeakOrderQueue(stack, thread);
// Done outside of the constructor to ensure WeakOrderQueue.this does not escape the constructor and so
// may be accessed while its still constructed.
//插入到创建 handle 的线程持有的 Stack 中的 head 在链表头部插入 queue 例如: queue(当前线程回收的) --> queue1(线程1回收的) --> queue2(线程2回收的)
stack.setHead(queue);
return queue;
}
网友评论