美文网首页
Netty的对象缓存池设计

Netty的对象缓存池设计

作者: 知止9528 | 来源:发表于2019-02-03 15:21 被阅读10次

对象池

将对象实例缓存起来供后续分配使用,避免瞬时大量对象的反复生成和销毁造成的分配和GC压力


设计思路

单线程

直接使用一个List容器,最简单

多线程

Netty是异步编程框架,往往申请内存的线程和释放内存的线程时不同的,所以我们猜想的是每个线程都有一个类似List的容器,不同线程的对象可以暂时放在容器内


netty内存池回收流程


netty内存池.png

Stack是通过线程变量的方式存储,来避免多线程之间的竞争

Stack与WeakOrderQueue.png

Stack维护了一个DefaultHandle数据,当数组为空时,就需要从其它线程获取,Map<Stack<?>, WeakOrderQueue>每个Stack都有一个对应的WeakOrderQueue,多个线程之间的WeakOrderQueue通过链表指针进行关联起来,然后每个WeakOrderQueue的内部数据结构如下


WeakOrderQueue数据结构.png

即WeakOrderQueue内部又维护了一个Link链表
我们再看Link的数据结构


Link数据结构.png

我们还记得刚才Stack也是维护了一个Handle数组,所以Link内部也是维护了一个Handle数组


回收链条.png

下面我们分析下源码

public class RecyclerTest {
    private  static  final  Recycler<User> RECYCLER = new Recycler<User>() {
        @Override
        protected User newObject(Handle<User> handle) {
            return new User(handle);
        }
    };
    public static void main(String[] args) {
        User user = RECYCLER.get();
        user.recyle();
        User user1 = RECYCLER.get();
        System.out.println(user==user1);
    }

    private  static  class  User{
        private  final Recycler.Handle<User> handle;

        private User(Recycler.Handle<User> handle) {
            this.handle = handle;
        }
        public  void recyle(){
            handle.recycle(this);
        }
    }
}

即是通过handle.recycle()和get()方法来进行操作的
get()方法

public final T get() {
        if (maxCapacityPerThread == 0) {
            return newObject((Handle<T>) NOOP_HANDLE);
        }
        Stack<T> stack = threadLocal.get();
        DefaultHandle<T> handle = stack.pop();
        if (handle == null) {
            handle = stack.newHandle();
            handle.value = newObject(handle);
        }
        return (T) handle.value;
    }

recycle()方法

@Override
        public void recycle(Object object) {
            if (object != value) {
                throw new IllegalArgumentException("object does not belong to handle");
            }
            stack.push(this);
        }

先来分析下handle.recycle()是怎么把对象放入的

void push(DefaultHandle<?> item) {
            Thread currentThread = Thread.currentThread();
            if (threadRef.get() == currentThread) {
                // The current Thread is the thread that belongs to the Stack, we can try to push the object now.
                pushNow(item);  当前线程,则立马进行放入
            } else {
                // The current Thread is not the one that belongs to the Stack
                // (or the Thread that belonged to the Stack was collected already), we need to signal that the push
                // happens later.
                pushLater(item, currentThread);
            }
        }

同线程回收

private void pushNow(DefaultHandle<?> item) {
            if ((item.recycleId | item.lastRecycledId) != 0) {
                throw new IllegalStateException("recycled already");
            }
            item.recycleId = item.lastRecycledId = OWN_THREAD_ID;

            int size = this.size;
            if (size >= maxCapacity || dropHandle(item)) {
                // Hit the maximum capacity or should drop - drop the possibly youngest object.
                return;
            }
            if (size == elements.length) {
                elements = Arrays.copyOf(elements, min(size << 1, maxCapacity));
            }

            elements[size] = item;
            this.size = size + 1;
        }

比较简单,就不多解释了


异线程回收

private void pushLater(DefaultHandle<?> item, Thread thread) {
            // we don't want to have a ref to the queue as the value in our weak map
            // so we null it out; to ensure there are no races with restoring it later
            // we impose a memory ordering here (no-op on x86)
            Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get();
            WeakOrderQueue queue = delayedRecycled.get(this);
            if (queue == null) {
                if (delayedRecycled.size() >= maxDelayedQueues) {
                    // Add a dummy queue so we know we should drop the object
                    delayedRecycled.put(this, WeakOrderQueue.DUMMY);
                    return;
                }
                // Check if we already reached the maximum number of delayed queues and if we can allocate at all.
                if ((queue = WeakOrderQueue.allocate(this, thread)) == null) {
                    // drop object
                    return;
                }
                delayedRecycled.put(this, queue);
            } else if (queue == WeakOrderQueue.DUMMY) {
                // drop object
                return;
            }

            queue.add(item);
        }

存在WeakOrderQueue就直接放入,不存在则新建WeakOrderQueue


我们再来看是怎么取出对象的

DefaultHandle<T> pop() {
            int size = this.size;
            if (size == 0) {  当stack里面获取不到时,需要考虑从其它线程获取
                if (!scavenge()) {
                    return null;
                }
                size = this.size;
            }
            size --;
            DefaultHandle ret = elements[size];  使用数组存储
            elements[size] = null;
            if (ret.lastRecycledId != ret.recycleId) {
                throw new IllegalStateException("recycled multiple times");
            }
            ret.recycleId = 0;
            ret.lastRecycledId = 0;
            this.size = size;
            return ret;
        }

即先从本地Stack中的Handle数组中取,没有的话则调用scavenge()从其它线程取
scavenge()方法如下

boolean scavenge() {
            // continue an existing scavenge, if any
            if (scavengeSome()) {
                return true;
            }

            // reset our scavenge cursor
            prev = null;
            cursor = head;
            return false;
        }

scavengeSome() 方法如下

 boolean scavengeSome() {
            WeakOrderQueue prev;
            WeakOrderQueue cursor = this.cursor;
            if (cursor == null) {
                prev = null;
                cursor = head;
                if (cursor == null) {
                    return false;
                }
            } else {
                prev = this.prev;
            }

            boolean success = false;
            do {
                if (cursor.transfer(this)) {  着重看这个方法,即从WeakOrderQueue里面的Link将Handle传送至Stack的Handle数组
                    success = true;
                    break;
                }
                WeakOrderQueue next = cursor.next;
                if (cursor.owner.get() == null) {  这里是假如有线程销毁了,那对应的WeakOrderQueue也会销毁,但如果该WeakOderQueue里面还有对象的话,要尽量回收
                    // If the thread associated with the queue is gone, unlink it, after
                    // performing a volatile read to confirm there is no data left to collect.
                    // We never unlink the first queue, as we don't want to synchronize on updating the head.
                    if (cursor.hasFinalData()) {
                        for (;;) {
                            if (cursor.transfer(this)) {
                                success = true;
                            } else {
                                break;
                            }
                        }
                    }

                    if (prev != null) {
                        prev.setNext(next);
                    }
                } else {
                    prev = cursor;
                }

                cursor = next;

            } while (cursor != null && !success);

            this.prev = prev;
            this.cursor = cursor;
            return success;
        }

transfer()方法如下

 boolean transfer(Stack<?> dst) {
            Link head = this.head.link;
            if (head == null) {
                return false;
            }

            if (head.readIndex == LINK_CAPACITY) {
                if (head.next == null) {
                    return false;
                }
                this.head.link = head = head.next;
            }

            final int srcStart = head.readIndex;
            int srcEnd = head.get();
            final int srcSize = srcEnd - srcStart;
            if (srcSize == 0) {
                return false;
            }

            final int dstSize = dst.size;
            final int expectedCapacity = dstSize + srcSize;

            if (expectedCapacity > dst.elements.length) {
                final int actualCapacity = dst.increaseCapacity(expectedCapacity);
                srcEnd = min(srcStart + actualCapacity - dstSize, srcEnd);
            }

            if (srcStart != srcEnd) {
                final DefaultHandle[] srcElems = head.elements; Link里面的Handle数组
                final DefaultHandle[] dstElems = dst.elements;  Stack里面的Handle数组
                int newDstSize = dstSize;
                for (int i = srcStart; i < srcEnd; i++) {
                    DefaultHandle element = srcElems[i];
                    if (element.recycleId == 0) {
                        element.recycleId = element.lastRecycledId;
                    } else if (element.recycleId != element.lastRecycledId) {
                        throw new IllegalStateException("recycled already");
                    }
                    srcElems[i] = null;

                    if (dst.dropHandle(element)) {
                        // Drop the object.
                        continue;
                    }
                    element.stack = dst;
                    dstElems[newDstSize ++] = element;
                }

                if (srcEnd == LINK_CAPACITY && head.next != null) {
                    // Add capacity back as the Link is GCed.
                    this.head.reclaimSpace(LINK_CAPACITY);
                    this.head.link = head.next;
                }

                head.readIndex = srcEnd;
                if (dst.size == newDstSize) {
                    return false;
                }
                dst.size = newDstSize;
                return true;
            } else {
                // The destination stack is full already.
                return false;
            }
        }
    }

关于Link里面的readIndex以及srcEnd可以参考Mysql的redolog的图


image

write pos是当前记录的位置,一边写一边后移,写到3号末尾就回到0号开头.

checkpoint是当前要擦除的位置,也是往后移并且循环的,擦除前需要把记录更新到数据文件.


重复回收检测

为了避免重复回收对象,可回收对象包含了两个属性recycleId和lastRecycleId. lastRecycleId用来存储最后一次回收线程的ID,recycleId在元素放入Stack数组时设置值域lastRecycleId相等,后续通过是否相等来判断是否重复回收

  • 从对象池取出的对象时判断recycleId和lastRecycleId是否相等,相等则设置两个id为0,否则抛出异常
    *对象回收至线程时判断是否2个ID为0,是则设置recycleId=lastRecycleId=本线程唯一ID,否则抛出异常
    *对象回收至其它线程时设置lastResycleId=回收线程ID
    *对象从其它线程转移至stack时如果recycleId为0则设置recycleId=lastRecycleId,如果recycleId不为0,意味着再其它线程也执行了回收动作,抛出异常.

相关文章

  • Netty的对象缓存池设计

    对象池 将对象实例缓存起来供后续分配使用,避免瞬时大量对象的反复生成和销毁造成的分配和GC压力 设计思路 单线程 ...

  • netty-Buffer轻量级对象池实现分析

    netty是使用threadlocal变量来实现轻量级的对象池的,每个线程都拥有自己的对象池。netty自己实...

  • Netty对象池-RECYCLER原理

    Netty对象池-RECYCLER原理 1.RECYCLER专门分配和回收netty的bytebuf对象的 2.其...

  • 享元模式-构建简易缓存池

    本篇一起来写一个简易的缓存池,就是对象缓存池,就是设计模式中的享元模式。其实support包(Androidx一样...

  • Netty对象池

    在平时工作中,听说和使用过连接池,线程池等.还有一种就是对象池,可以实现对象复用的功能. 当然实现对象池的方式手段...

  • 享元模式

    简介 缓存共享对象,复用共享对象,比如各种池化操作,数据库连接池缓存连接,线程池缓存线程等等。 享元模式样...

  • EventBus中的享元模式

    减少对象的创建频率1.首先准备一个对象池缓存对象2.如果缓存中存在对象

  • netty源码分析4 - Recycler对象池的设计

    本节来看一下netty的Recycler对象池的设计与源码解析。相关的源码已经抽取为一个框架:https://gi...

  • Netty-对象池(Recycler)

    在netty的池化ByteBuf分配中,包含ByteBuf对象的池化和真实内存(array或者DirectByte...

  • 单例模式

    单例模式是创建型设计模式的一种。针对全局仅需一个对象的场景,如线程池、全局缓存、window 对象等

网友评论

      本文标题:Netty的对象缓存池设计

      本文链接:https://www.haomeiwen.com/subject/rcwysqtx.html