美文网首页NIO
netty 面试 轻量级对象池Recycler原理

netty 面试 轻量级对象池Recycler原理

作者: 田才 | 来源:发表于2020-03-11 22:25 被阅读0次

    本文基于 netty 4.1.46

    以下为了方便描述,用户缓存的对象简称 T

    一、Recycler 解决的问题

    Recycler 即轻量级对象池,避免同一个线程重复创建对象。和异线程回收后的重用问题,例如:线程1创建对象T,线程2回收了对象T,线程1仍然可以通过get方法拿到对象T。
    避免重复创建对象的好处是,减少了因创建对象的系统消耗,减轻了jvm yong gc 回收对象的压力。

    注意: 何时用轻量级对象池 Recycler , "通过对象池来避免创建对象并不是一种好的做法,除非池中的对象是非常重量级的,现代的JVM实现具有高度优化的垃圾回收期,其性能很容易就会超过轻量级对象池的性能" 《Effective java》。举个例子:netty 的 PooledDirectByteBuf 就适合用对象池,因为堆外内存的分配,要系统调用有用户态和内核态的切换比较耗时,所以非常适用对象池。

    二、使用方法

    public class RecycleTest {
        private static Recycler<User> RECYCLER = new Recycler<User>() {
            @Override
            protected User newObject(Handle handle) {
                return new User(handle);
            }
        };
        private static class User {
            private final Recycler.Handle<User> handle;
            public User(Recycler.Handle<User> handle) {
                this.handle = handle;
            }
            public void recycle() {
                handle.recycle(this);
            }
        }
    
        public static void main(String[] args) {
            final User user = RECYCLER.get();
            new Thread(new Runnable() {
                @Override
                public void run() {
                    //异线程回收
                    user.recycle();
                }
            }).start();
            final User user1 = RECYCLER.get();
            //同线程回收
            user1.recycle();
        }
    }
    

    三、Recycler 的实现

    1、Recycler 的 uml 图
    clipboard.png
    2、Stack 类的结构
    clipboard.png

    注意:

    每一个Recycler对象包含一个FastThreadLocal<Stack<T>> threadLocal实例;每一个线程包含一个Stack对象,该Stack对象包含一个DefaultHandle[],而DefaultHandle中有一个属性T,用于存储真实对象。也就是说,每一个T都会被包装成一个DefaultHandle对象

    Recyler类包含一个类对象
    static FastThreadLocal<Map<Stack<T>, WeakOrderQueue>>
    每一个线程对象包含一个 Map<Stack<T>, WeakOrderQueue>,存储着当前线程中所有回收其他线程的 T 对象的集合,包括其他线程不同的 Recyler 实例产生的变量产生的 T 或 U 对象,并且分别为其创建的 WeakOrderQueue 对象

    WeakOrderQueue 对象中存储一个以Head为首的Link数组,每个Link对象中存储一个DefaultHandle[]数组,用于存放回收对象。

    说明:(假设线程A创建对象T)

    线程 A 回收 T 时,直接将 T 的 DefaultHandle 对象,压入 Stack的DefaultHandle[] elements 中;

    线程 B 回收 T 时,会首先从其 Map<Stack<T>, WeakOrderQueue> 对象中 get('线程A的Stack对象')拿到 WeakOrderQueue,然后直接将T的DefaultHandle对象压入 WeakOrderQueue 中的Link 链表中的尾部 Link 的 DefaultHandle[] 中,同时,这个 WeakOrderQueue 会与线程A的Stack中的 head 属性进行关联,用于后续对象的 pop 操作;

    当线程 A 从对象池获取对象时,如果线程A的Stack中有对象,则直接弹出;如果没有对象,则先从其 head 属性所指向的 WeakorderQueue 开始遍历 queue 链表,将T对象从其他线程的WeakOrderQueue中转移到线程A的Stack中(一次pop操作只转移一个包含了元素的Link),再从 Stack 中弹出。

    四、源码分析

    1 、变量对应关系和含义
    静态变量 默认值 Recycler 实例变量 Stack 实例变量 含义
    DEFAULT_MAX_CAPACITY_PER_THREAD 4 * 1024 maxCapacityPerThread availableSharedCapacity 当前线程创建的 Handle 对象,被其他线程回收,并且能缓冲到 WeakOrderQueue 中 Handle 的数量
    INITIAL_CAPACITY 256 Stack 中 DefaultHandle<?>[] elements 容量
    LINK_CAPACITY 16 Link 中的数组 DefaultHandle<?>[] elements 容量
    MAX_DELAYED_QUEUES_PER_THREAD 2*cpu maxDelayedQueuesPerThread maxDelayedQueues 当前线程创建的对象能够让多少个线程进行回收
    8 interval 控制对象回收频率

    2、获取对象get()

    /**
     * 1、获取当前线程的 Stack
     * 2、从 Stack 里弹出对象
     * 3、创建对象并且和 Stack 绑定
     */
    @SuppressWarnings("unchecked")
    public final T get() {
        //如果 Stack 的容量为 0
        if (maxCapacityPerThread == 0) {
            return newObject((Handle<T>) NOOP_HANDLE);
        }
        //获取当前线程的 Stack
        Stack<T> stack = threadLocal.get();
        //从 Stack 里弹出对象
        DefaultHandle<T> handle = stack.pop();
        if (handle == null) {
            //创建对象并且和 Stack 绑定,回收对象时,可以通过 Handle 调用 Stack 进行回收
            handle = stack.newHandle();
            handle.value = newObject(handle);
        }
        return (T) handle.value;
    }
    
    2.1、从Stack中取出对象 pop 方法
    @SuppressWarnings({ "unchecked", "rawtypes" })
    DefaultHandle<T> pop() {
        int size = this.size;
        //当前线程如果没有对象
        if (size == 0) {
            //试着从其他线程回收的 WeakOrderQueue 对象中取 Handle
            if (!scavenge()) {
                return null;
            }
            size = this.size;
            if (size <= 0) {
                // double check, avoid races
                return null;
            }
        }
        size --;
        DefaultHandle ret = elements[size];
        elements[size] = null;
        // As we already set the element[size] to null we also need to store the updated size before we do
        // any validation. Otherwise we may see a null value when later try to pop again without a new element
        // added before.
        this.size = size;
    
        if (ret.lastRecycledId != ret.recycleId) {
            throw new IllegalStateException("recycled multiple times");
        }
        ret.recycleId = 0;
        ret.lastRecycledId = 0;
        return ret;
    }
    
    2.2 scavengeSome () 方法试着从其他线程回收的 WeakOrderQueue 对象中取 Handle
    private boolean scavengeSome() {
        WeakOrderQueue prev;
        WeakOrderQueue cursor = this.cursor;
        if (cursor == null) {
            prev = null;
            cursor = head;
            if (cursor == null) {
                return false;
            }
        } else {
            prev = this.prev;
        }
    
        boolean success = false;
        do {
            //将 WeakOrderQueue 中的数据拿到 Stack 中的 DefaultHandle<?>[] elements
            if (cursor.transfer(this)) {
                success = true;
                break;
            }//如果没有拿到
    
            WeakOrderQueue next = cursor.getNext();
            //cursor 锁持有的线程 被释放掉了,那么要做一些清理工作
            if (cursor.get() == null) {
                // If the thread associated with the queue is gone, unlink it, after
                // performing a volatile read to confirm there is no data left to collect.
                // We never unlink the first queue, as we don't want to synchronize on updating the head.
                if (cursor.hasFinalData()) {//如果有数据
                    for (;;) {
                        if (cursor.transfer(this)) {
                            success = true;
                        } else {
                            break;
                        }
                    }
                }
    
                if (prev != null) {
                    // Ensure we reclaim all space before dropping the WeakOrderQueue to be GC'ed.
                    cursor.reclaimAllSpaceAndUnlink();
                    //删除 cursor 节点
                    prev.setNext(next);
                }
            } else {
                //否则移动 prev 指针到下一个节点
                prev = cursor;
            }
            //移动 cursor 指针到下一个节点
            cursor = next;
    
            //如果已经到结尾了,或者 已经找到了,那么循环退出
        } while (cursor != null && !success);
    
        this.prev = prev;
        this.cursor = cursor;
        return success;
    }
    
    2.3 transfer(this)) 方法;将 WeakOrderQueue 中的数据拿到 Stack 中的 DefaultHandle<?>[] elements
     boolean transfer(Stack<?> dst) {
            Link head = this.head.link;
            //如果当前 Link 中没有数据
            if (head == null) {
                return false;
            }
            //当前Link中的数据 都没取走了
            if (head.readIndex == LINK_CAPACITY) {
                //如果 Link 队列中没有节点了
                if (head.next == null) {
                    return false;
                }
                //将当前 Head 节点移除
                head = head.next;
                this.head.relink(head);
            }
            //head 节点已经被取走的数量
            final int srcStart = head.readIndex;
            //head 节点的 Handle 数量
            int srcEnd = head.get();
            //head 节点可读的 handle 数量
            final int srcSize = srcEnd - srcStart;
            //head 节点已经没有可读 handle 对象了
            if (srcSize == 0) {
                return false;
            }
            //当前 Stack 中 DefaultHandle 的数量
            final int dstSize = dst.size;
            final int expectedCapacity = dstSize + srcSize;
            //head 节点中的 Link 的 Handle 全部读出来, Stack 中 DefaultHandle[] elements 能否放的下
            if (expectedCapacity > dst.elements.length) {
                final int actualCapacity = dst.increaseCapacity(expectedCapacity);
                //算出 需要数组的最大 下标
                srcEnd = min(srcStart + actualCapacity - dstSize, srcEnd);
            }
    
            if (srcStart != srcEnd) {
                final DefaultHandle[] srcElems = head.elements;
                final DefaultHandle[] dstElems = dst.elements;
                int newDstSize = dstSize;
                for (int i = srcStart; i < srcEnd; i++) {
                    DefaultHandle<?> element = srcElems[i];
                    if (element.recycleId == 0) {
                        element.recycleId = element.lastRecycledId;
                    } else if (element.recycleId != element.lastRecycledId) {
                        throw new IllegalStateException("recycled already");
                    }
                    srcElems[i] = null;
                    //控制回收频率
                    if (dst.dropHandle(element)) {
                        // Drop the object.
                        continue;
                    }
                    element.stack = dst;
                    dstElems[newDstSize ++] = element;
                }
                // 遍历下一个节点
                if (srcEnd == LINK_CAPACITY && head.next != null) {
                    // Add capacity back as the Link is GCed.
                    this.head.relink(head.next);
                }
    
                head.readIndex = srcEnd;
                if (dst.size == newDstSize) {
                    return false;
                }
                dst.size = newDstSize;
                return true;
            } else {
                // The destination stack is full already.
                return false;
            }
        }
    }
    

    3、回收对象 recycle()

    /**
     * 1、同线程回收对象
     * 2、异线程回收对象
     */
    @Override
    public void recycle(Object object) {
        if (object != value) {
            throw new IllegalArgumentException("object does not belong to handle");
        }
    
        //此处的 stack 是创建 Handle 对象的线程持有的 stack
        Stack<?> stack = this.stack;
        if (lastRecycledId != recycleId || stack == null) {
            throw new IllegalStateException("recycled already");
        }
    
        stack.push(this);
    }
    
    void push(DefaultHandle<?> item) {
        Thread currentThread = Thread.currentThread();
        //当前线程是否是创建 stack 的线程
        if (threadRef.get() == currentThread) {
            // The current Thread is the thread that belongs to the Stack, we can try to push the object now.
            pushNow(item);
        } else {
            // The current Thread is not the one that belongs to the Stack
            // (or the Thread that belonged to the Stack was collected already), we need to signal that the push
            // happens later.
            pushLater(item, currentThread);
        }
    }
    
    3.1 同线程回收对象
    private void pushNow(DefaultHandle<?> item) {
        if ((item.recycleId | item.lastRecycledId) != 0) {
            throw new IllegalStateException("recycled already");
        }
        item.recycleId = item.lastRecycledId = OWN_THREAD_ID;
    
        int size = this.size;
        //判断存放 handle 数量是否达到上限, 或者
        if (size >= maxCapacity || dropHandle(item)) {
            // Hit the maximum capacity or should drop - drop the possibly youngest object.
            return;
        }
        //如果数组被填充满了,那么重新创建一个 两倍大小的数组,上限是maxCapacity
        if (size == elements.length) {
            elements = Arrays.copyOf(elements, min(size << 1, maxCapacity));
        }
    
        elements[size] = item;
        this.size = size + 1;
    }
    
    3.2 异线程回收对象
    /**
     * 异线程回收对象
     *
     * 1、获取 WeakOrderQueue
     * 2、创建 WeakOrderQueue
     * 3、将对象追加到 WeakOrderQueue
     */
    private void pushLater(DefaultHandle<?> item, Thread thread) {
        if (maxDelayedQueues == 0) {
            // We don't support recycling across threads and should just drop the item on the floor.
            return;
        }
    
        // we don't want to have a ref to the queue as the value in our weak map
        // so we null it out; to ensure there are no races with restoring it later
        // we impose a memory ordering here (no-op on x86)
        Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get();
        // this 是指创建 handle 的线程持有的 Stack
        WeakOrderQueue queue = delayedRecycled.get(this);
        //当前线程没有回收过,创建 handle 的线程所持有 Stack 的 handle
        if (queue == null) {
            //当前线程回收的 Stack 数量是否 达到上限
            if (delayedRecycled.size() >= maxDelayedQueues) {
                // Add a dummy queue so we know we should drop the object
                delayedRecycled.put(this, WeakOrderQueue.DUMMY);
                return;
            }
            // Check if we already reached the maximum number of delayed queues and if we can allocate at all.
            if ((queue = newWeakOrderQueue(thread)) == null) {
                // drop object
                return;
            }
            delayedRecycled.put(this, queue);
        } else if (queue == WeakOrderQueue.DUMMY) {
            // drop object
            return;
        }
    
        queue.add(item);
    }
    

    创建 WeakOrderQueue,插入到创建 handle 的线程持有的 Stack 中的 head 在链表头部插入。

    static WeakOrderQueue newQueue(Stack<?> stack, Thread thread) {
        // We allocated a Link so reserve the space
        // 被其他线程 缓冲的对象 有没有超过上限
        if (!Head.reserveSpaceForLink(stack.availableSharedCapacity)) {
            return null;
        }
        final WeakOrderQueue queue = new WeakOrderQueue(stack, thread);
        // Done outside of the constructor to ensure WeakOrderQueue.this does not escape the constructor and so
        // may be accessed while its still constructed.
        //插入到创建 handle 的线程持有的 Stack 中的 head 在链表头部插入 queue 例如:  queue(当前线程回收的) --> queue1(线程1回收的) --> queue2(线程2回收的)
        stack.setHead(queue);
    
        return queue;
    }
    

    相关文章

      网友评论

        本文标题:netty 面试 轻量级对象池Recycler原理

        本文链接:https://www.haomeiwen.com/subject/awrkjhtx.html