美文网首页
FastThreadLocal

FastThreadLocal

作者: PPchair | 来源:发表于2018-05-06 16:20 被阅读0次

    Netty中的FastThreadLocal

    版本:4.1.23
    大家都应该接触过Jdk的ThreadLocal,它使用每个Thread中的ThreadLocalMap存储ThreadLocal,ThreadLocalMap内部使用ThreadLocalMap.Entry 数组存储每一个ThreadLocal,存储计算和HashMap类似,要计算key的索引位置=key.threadLocalHashCode&(len-1),中间还需要计算冲突,使用的是线程探测方法(当前索引在被占用下,使用下一个索引)。达到一定条件后,还需扩充数组长度,rehash,可为效率不是太高。另外,Jdk的ThreadLocal,还需要使用者注意内存泄漏问题。作为高性能框架的Netty为了解决上面的两个问题重构了TheadLocal,产生了FastThreadLocal。下面讲解如何具体解决刚才说的问题的。

    1、与TheadLocal内部使用类对比

    不同对象 Jdk Netty 备注
    线程 Thead FastThreadLocalThread:继成JDK的Thread netty使用自己的DefaultThreadFactory
    map ThreadLocalMap InternalThreadLocalMap map
    map内部数组 ThreadLocalMap.entry UnpaddedInternalThreadLocalMap.indexedVariables 存储theadLocal
    Runnable Runnable FastThreadLocalRunnable 为了防止内存泄漏,netty的Runnable包装了Runable
    ThreadLocal ThreadLocal FastThreadLocalMap
    Thead与FastThreadLocalThread
    //继成了Thread,使用InternalThreadLocalMap替代了Thread中的TheadLocal 
    public class FastThreadLocalThread extends Thread {
    // This will be set to true if we have a chance to wrap the Runnable.
        private final boolean cleanupFastThreadLocals;
        private InternalThreadLocalMap threadLocalMap;
        
        //....省略
    }
    
    DefaultThreadFactory
    public class DefaultThreadFactory implements ThreadFactory {
        //....省略
    @Override
        public Thread newThread(Runnable r) {
            //使用 FastThreadLocalRunnable
            Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
            try {
                if (t.isDaemon() != daemon) {
                    t.setDaemon(daemon);
                }
    
                if (t.getPriority() != priority) {
                    t.setPriority(priority);
                }
            } catch (Exception ignored) {
                // Doesn't matter even if failed to set.
            }
            return t;
        }
        //使用FastThreadLocal
        protected Thread newThread(Runnable r, String name) {
            return new FastThreadLocalThread(threadGroup, r, name);
        }
    }
    
    FastThreadLocalRunnable
    //继成Runnable
    final class FastThreadLocalRunnable implements Runnable {
        private final Runnable runnable;
    
        private FastThreadLocalRunnable(Runnable runnable) {
            this.runnable = ObjectUtil.checkNotNull(runnable, "runnable");
        }
    
        @Override
        public void run() {
            try {
                runnable.run();
            } finally {
                //线程执行完成。删除theadLocal,防止内存泄漏
                FastThreadLocal.removeAll();
            }
        }
    
        static Runnable wrap(Runnable runnable) {
            return runnable instanceof FastThreadLocalRunnable ? runnable : new FastThreadLocalRunnable(runnable);
        }
    }
    
    UnpaddedInternalThreadLocalMap
    class UnpaddedInternalThreadLocalMap {
    
        static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = new ThreadLocal<InternalThreadLocalMap>(); //对没有使用netty的FastThreadLocalThread的使用底层统一使用netty的InternalThreadLocalMap封装V,但使用JDk的ThreadLocal来存储
        static final AtomicInteger nextIndex = new AtomicInteger();
    
        /** Used by {@link FastThreadLocal} */
        Object[] indexedVariables; //底层存储threadLocal的V的数组
    
        // Core thread-locals
        int futureListenerStackDepth;
        int localChannelReaderStackDepth;
        Map<Class<?>, Boolean> handlerSharableCache;
        IntegerHolder counterHashCode;
        ThreadLocalRandom random;
        Map<Class<?>, TypeParameterMatcher> typeParameterMatcherGetCache;
        Map<Class<?>, Map<String, TypeParameterMatcher>> typeParameterMatcherFindCache;
    
        // String-related thread-locals
        StringBuilder stringBuilder;
        Map<Charset, CharsetEncoder> charsetEncoderCache;
        Map<Charset, CharsetDecoder> charsetDecoderCache;
    
        // ArrayList-related thread-locals
        ArrayList<Object> arrayList;
    
        UnpaddedInternalThreadLocalMap(Object[] indexedVariables) {
            this.indexedVariables = indexedVariables;
        }
    }
    
    

    2、FastThreadLocal源代码

    FastThreadLocal中的三个index
    //记录remove index 
    private static final int variablesToRemoveIndex =  InternalThreadLocalMap.nextVariableIndex(); //这里是所有的FastThreadLocal实例使用的删除索引
    
    private final int index; //v 索引
    private final int cleanerFlagIndex; //是否放入清除线程队列标记,后面补充
    //在构造器内初始化
     public FastThreadLocal() {
            index = InternalThreadLocalMap.nextVariableIndex();
            cleanerFlagIndex = InternalThreadLocalMap.nextVariableIndex();
        }
    
    
    //InternalThreadLocalMap 自增
        public static int nextVariableIndex() {
            int index = nextIndex.getAndIncrement(); //AtomicInteger,
            if (index < 0) {
                nextIndex.decrementAndGet();
                throw new IllegalStateException("too many thread-local indexed variables");
            }
            return index;
        }
    
    
    set()

    设置v过程是最难得部分,包括创建InternalThreadLocalMap,放入remove Set,非FastThreadLocalThread的线程还需要放入待清楚任务队列

     /**
         * Set the value for the current thread.
         */
        public final void set(V value) {
            if (value != InternalThreadLocalMap.UNSET) { //判断是否是要删除threadLocal,InternalThreadLocalMap.UNSET 是Netty内部使用的一个Object,底层数组使用这个默认初始化数据
                //获取当前线程的InternalThreadLocalMap
                InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
                //设置v
                if (setKnownNotUnset(threadLocalMap, value)) {
                    //添加清除map的线程,针对使用Jdk的Thread,防止内存泄漏
                    registerCleaner(threadLocalMap);
                }
            } else {
                remove();//删除对象,清除内存防止内存泄漏
            }
        }
    
    InternalThreadLocalMap.get()
    public static InternalThreadLocalMap get() {
            Thread thread = Thread.currentThread();
            if (thread instanceof FastThreadLocalThread) {
                return fastGet((FastThreadLocalThread) thread); //获取FastThreadLocalThread的
            } else {
                return slowGet();//获取非FastThreadLocalThread的,一般是Thread
            }
        }
    
        private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
            InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
            if (threadLocalMap == null) { //没有则创建一个
                thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
            }
            return threadLocalMap;
        }
    
        private static InternalThreadLocalMap slowGet() {
            ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap; //使用UnpaddedInternalThreadLocalMap的
            //ThreadLocal<InternalThreadLocalMap> 存储
        //static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = new ThreadLocal<InternalThreadLocalMap>();
            
            InternalThreadLocalMap ret = slowThreadLocalMap.get();
            if (ret == null) {
                ret = new InternalThreadLocalMap();//没有则创建一个
                slowThreadLocalMap.set(ret);
            }
            return ret;
        }
    
    InternalThreadLocalMap初始化
       
      UnpaddedInternalThreadLocalMap(Object[] indexedVariables) {
            this.indexedVariables = indexedVariables;
        }
      private InternalThreadLocalMap() {
            super(newIndexedVariableTable());//父类构造方法初始化indexedVariables 存储v的数组
        }
    
        private static Object[] newIndexedVariableTable() { //初始化32size的数组 并默认值UNSET
            Object[] array = new Object[32];
            Arrays.fill(array, UNSET);
            return array;
        }
    
    setKnownNotUnset
    //set值 ,并记录当remove 线程时,或主动删除时要clear的threadLocal
    private boolean setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {
            if (threadLocalMap.setIndexedVariable(index, value)) { //使用索引index记录存储数组索引
                addToVariablesToRemove(threadLocalMap, this);
                return true;
            }
            return false;
        }
    
    //记录要回收清除的内存
    @SuppressWarnings("unchecked")
      private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
            Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);  //底层都是用
          //UnpaddedInternalThreadLocalMap的 indexedVariables 
            Set<FastThreadLocal<?>> variablesToRemove;
          //v搞成set集合,目的很简单,set里面不会放置重复的 threadLocal,放置同一个threadLocal多次 所有使用TheadLocal都会放到 variablesToRemoveIndex 数组中这个索引位置的
            if (v == InternalThreadLocalMap.UNSET || v == null) {
                variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>());
            
                threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove);
            } else {
                variablesToRemove = (Set<FastThreadLocal<?>>) v;
            }
    
            variablesToRemove.add(variable);//放到要清楚set里面
        }
    
     //threadLocalMap//
    /**
         * @return {@code true} if and only if a new thread-local variable has been created
         */
        public boolean setIndexedVariable(int index, Object value) {
            Object[] lookup = indexedVariables;
            if (index < lookup.length) { //判断是否会扩充
                Object oldValue = lookup[index];
                lookup[index] = value;
                return oldValue == UNSET; //只有在覆盖的时候才会返回false
            } else {
                expandIndexedVariableTableAndSet(index, value);//这个是扩充底层数组,类似hashMap底层扩展
                return true;
            }
        }
    

    //这个是将当前线程的threadLocalmap放入ObjectCleaner清除队里里面,当线程被回收情况下回主动remove threadLocalmap 来回收数据
        private void registerCleaner(final InternalThreadLocalMap threadLocalMap) {
            Thread current = Thread.currentThread();
            //如果是FastThreadLocalThread 线程 则不需要,只需要清除非FastThreadLocalThread的线程的,因为FastThreadLocalThread run中执行的方法在执行完成后会自动remove
            //cleanerFlagIndex 记录是否已经放入,保证放入一次
            if (FastThreadLocalThread.willCleanupFastThreadLocals(current) ||
                threadLocalMap.indexedVariable(cleanerFlagIndex) != InternalThreadLocalMap.UNSET) {
                return;
            }
            // removeIndexedVariable(cleanerFlagIndex) isn't necessary because the finally cleanup is tied to the lifetime
            // of the thread, and this Object will be discarded if the associated thread is GCed.
            threadLocalMap.setIndexedVariable(cleanerFlagIndex, Boolean.TRUE);
    
            // We will need to ensure we will trigger remove(InternalThreadLocalMap) so everything will be released
            // and FastThreadLocal.onRemoval(...) will be called.
            ObjectCleaner.register(current, new Runnable() {
                @Override
                public void run() {
                    remove(threadLocalMap); //在curent线程被GC回收时执行,用来清除线程的threadLocalMap
    
                    // It's fine to not call InternalThreadLocalMap.remove() here as this will only be triggered once
                    // the Thread is collected by GC. In this case the ThreadLocal will be gone away already.
                }
            });
        }
    
    ObjectCleaner

    //这个是防止内存泄漏的核心代码,和FastThreadLocal绑定的线程当被回收时,执行该类中的任务来清除map中的数据

    package io.netty.util.internal;
    
    import io.netty.util.concurrent.FastThreadLocalThread;
    
    import java.lang.ref.ReferenceQueue;
    import java.lang.ref.WeakReference;
    import java.security.AccessController;
    import java.security.PrivilegedAction;
    import java.util.Set;
    import java.util.concurrent.atomic.AtomicBoolean;
    
    import static io.netty.util.internal.SystemPropertyUtil.getInt;
    import static java.lang.Math.max;
    
    /**
     * Allows a way to register some {@link Runnable} that will executed once there are no references to an {@link Object}
     * anymore.
     */
    public final class ObjectCleaner {
        private static final int REFERENCE_QUEUE_POLL_TIMEOUT_MS =
                max(500, getInt("io.netty.util.internal.ObjectCleaner.refQueuePollTimeout", 10000));
    
        // Package-private for testing
        static final String CLEANER_THREAD_NAME = ObjectCleaner.class.getSimpleName() + "Thread";
        // This will hold a reference to the AutomaticCleanerReference which will be removed once we called cleanup()
        private static final Set<AutomaticCleanerReference> LIVE_SET = new ConcurrentSet<AutomaticCleanerReference>();
        private static final ReferenceQueue<Object> REFERENCE_QUEUE = new ReferenceQueue<Object>();
        private static final AtomicBoolean CLEANER_RUNNING = new AtomicBoolean(false);
        private static final Runnable CLEANER_TASK = new Runnable() {
            @Override
            public void run() {
                boolean interrupted = false;
                for (;;) {
                    // Keep on processing as long as the LIVE_SET is not empty and once it becomes empty
                    // See if we can let this thread complete.
                    while (!LIVE_SET.isEmpty()) {
                        final AutomaticCleanerReference reference;
                        try {
                            reference = (AutomaticCleanerReference) REFERENCE_QUEUE.remove(REFERENCE_QUEUE_POLL_TIMEOUT_MS); //当有线程被GC时,会获取到AutomaticCleanerReference
                        } catch (InterruptedException ex) {
                            // Just consume and move on
                            interrupted = true;
                            continue;
                        }
                        if (reference != null) {
                            try {
                                reference.cleanup(); //执行清除threadLocalmap动作
                            } catch (Throwable ignored) {
                                // ignore exceptions, and don't log in case the logger throws an exception, blocks, or has
                                // other unexpected side effects.
                            }
                            LIVE_SET.remove(reference);
                        }
                    }
                    CLEANER_RUNNING.set(false);
    
                    // Its important to first access the LIVE_SET and then CLEANER_RUNNING to ensure correct
                    // behavior in multi-threaded environments.
                    if (LIVE_SET.isEmpty() || !CLEANER_RUNNING.compareAndSet(false, true)) {
                        // There was nothing added after we set STARTED to false or some other cleanup Thread
                        // was started already so its safe to let this Thread complete now.
                        break;
                    }
                }
                if (interrupted) {
                    // As we caught the InterruptedException above we should mark the Thread as interrupted.
                    Thread.currentThread().interrupt();
                }
            }
        };
    
        /**
         * Register the given {@link Object} for which the {@link Runnable} will be executed once there are no references
         * to the object anymore.
         *
         * This should only be used if there are no other ways to execute some cleanup once the Object is not reachable
         * anymore because it is not a cheap way to handle the cleanup.
         */
        //将线程或要执行的任务放入包装为AutomaticCleanerReference然后放入队列
        public static void register(Object object, Runnable cleanupTask) {
            //AutomaticCleanerReference继成WeakReference
            AutomaticCleanerReference reference = new AutomaticCleanerReference(object,
                    ObjectUtil.checkNotNull(cleanupTask, "cleanupTask"));
            
            // Its important to add the reference to the LIVE_SET before we access CLEANER_RUNNING to ensure correct
            // behavior in multi-threaded environments.
            LIVE_SET.add(reference);
    
            // Check if there is already a cleaner running.
            if (CLEANER_RUNNING.compareAndSet(false, true)) { 
                //CAS 如果改线程已经执行则不用启动,没有创建线程去执行CLEANER_TASK任务
                final Thread cleanupThread = new FastThreadLocalThread(CLEANER_TASK);
                cleanupThread.setPriority(Thread.MIN_PRIORITY); //优先级
                // Set to null to ensure we not create classloader leaks by holding a strong reference to the inherited
                // classloader.
                // See:
                // - https://github.com/netty/netty/issues/7290
                // - https://bugs.openjdk.java.net/browse/JDK-7008595
                AccessController.doPrivileged(new PrivilegedAction<Void>() {
                    @Override
                    public Void run() {
                        cleanupThread.setContextClassLoader(null);
                        return null;
                    }
                });
                cleanupThread.setName(CLEANER_THREAD_NAME);
    
                // Mark this as a daemon thread to ensure that we the JVM can exit if this is the only thread that is
                // running.
                cleanupThread.setDaemon(true);
                cleanupThread.start();
            }
        }
    
        public static int getLiveSetCount() {
            return LIVE_SET.size();
        }
    
        private ObjectCleaner() {
            // Only contains a static method.
        }
    
        private static final class AutomaticCleanerReference extends WeakReference<Object> {
            private final Runnable cleanupTask;
    //AutomaticCleanerReference继成WeakReference,特点是当referent被回收时会将对应的引用对象放入指定的REFERENCE_QUEUE队列,我们可以使用这个功能来跟踪即将被回收的对象,在被回收之前做些额外的工作 比如复活
            AutomaticCleanerReference(Object referent, Runnable cleanupTask) {
                super(referent, REFERENCE_QUEUE);
                this.cleanupTask = cleanupTask;
            }
           //执行
            void cleanup() {
                cleanupTask.run(); 
            }
    
            @Override
            public Thread get() {
                return null;
            }
    
            @Override
            public void clear() { //从LIVE_SET移除
                LIVE_SET.remove(this);
                super.clear();
            }
        }
    }
    
    

    remove

    清除执行动作

     public final void remove() {
            remove(InternalThreadLocalMap.getIfSet());取到当前线程的InternalThreadLocalMap
        }
    
        /**
         * Sets the value to uninitialized for the specified thread local map;
         * a proceeding call to get() will trigger a call to initialValue().
         * The specified thread local map must be for the current thread.
         */
        @SuppressWarnings("unchecked")
        public final void remove(InternalThreadLocalMap threadLocalMap) {
            if (threadLocalMap == null) {
                return;
            }
    
            Object v = threadLocalMap.removeIndexedVariable(index);//清楚数据
            removeFromVariablesToRemove(threadLocalMap, this);
            
            if (v != InternalThreadLocalMap.UNSET) {
                try {
                    onRemoval((V) v); //目前什么也没做
                } catch (Exception e) {
                    PlatformDependent.throwException(e);
                }
            }
        }
    
    //清除当前FastThreadLocal
     private static void removeFromVariablesToRemove(
                InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
    
            Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
    
            if (v == InternalThreadLocalMap.UNSET || v == null) {
                return;
            }
        
            @SuppressWarnings("unchecked")
            Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;
            variablesToRemove.remove(variable);
        }
    
    //InternalThreadLocalMap内的方法 
     public Object removeIndexedVariable(int index) {
            Object[] lookup = indexedVariables;
            if (index < lookup.length) { //清除
                Object v = lookup[index];
                lookup[index] = UNSET; //填充默认
                return v;
            } else {
                return UNSET;
            }
        }
    
    FastThreadLocal.removeAll()
    //在当前线程执行完成后执行的动作调用地方在FastThreadLocalRunnable内  防止内存泄露 
    
    public static void removeAll() {
            //获取当前线程 InternalThreadLocalMap
            InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet();
           
            if (threadLocalMap == null) {
                return;
            }
    
            try {
                Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
                if (v != null && v != InternalThreadLocalMap.UNSET) {
                    @SuppressWarnings("unchecked")
                    Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;
                    FastThreadLocal<?>[] variablesToRemoveArray =
                            variablesToRemove.toArray(new FastThreadLocal[variablesToRemove.size()]);
                    for (FastThreadLocal<?> tlv: variablesToRemoveArray) {
                        tlv.remove(threadLocalMap); //清除当前FastThreadLocal中的v
                    }
                }
            } finally {
                InternalThreadLocalMap.remove();
            }
        }
    
    get
     @SuppressWarnings("unchecked")
        public final V get() {
            InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
            Object v = threadLocalMap.indexedVariable(index);
            if (v != InternalThreadLocalMap.UNSET) {
                return (V) v;
            }
    
            V value = initialize(threadLocalMap); //初始化 返回null
            registerCleaner(threadLocalMap);//放到清除队列
            return value;
        }
    

    总结:

    1.从代码来看,Netty内部使用了FastThreadLocal关联的一些自定义类,线程,threadLocalMap,runnable等。

    2.为防止内存泄露,FastThreadLocal针对Netty内部自己的线程和用户自定义线程在清除map数据有不同的处理方法

    3.底层和Jdk使用数组来存储threadLocal的值,但netty直接使用fastThreadLocal的索引来直接定位在数组的位置,高效,但也应清楚,每一个threadLocal都是用了数组两个空间(index,cleanerFlagIndex),所有的threadlocal都使用了variablesToRemoveIndex来存储要清除的threadlocal。相比JDK的ThreadLocal,使用了空间换时间效率。

    3.使用非FastThreadLocalThread时,底层也是封装了JDK的thredLocal来存储,如2所述,不管哪类线程,都有对应的防止内存泄露方法。

    相关文章

      网友评论

          本文标题:FastThreadLocal

          本文链接:https://www.haomeiwen.com/subject/fdyurftx.html