美文网首页深入浅出Netty源码剖析
Netty源码-FastThreadLocal原理

Netty源码-FastThreadLocal原理

作者: persisting_ | 来源:发表于2019-05-23 22:42 被阅读8次

1 概述

Netty为了提高应用性能做了许多方面的努力,本篇文章介绍其实现的其改进版的ThreadLocal实现FastThreadLocal,Netty实现的FastThreadLocal相较于Java原生优化了ThreadLocal的访问速度、内存泄漏等方面性能。

阅读过Java ThreadLocal源码的都知道其实现原理,Java的每个Thread实例都有ThreadLocal.ThreadLocalMap类型的字段threadLocals,我们使用的ThreadLocal对象实例则作为key,实际的值为value,ThreadLocalMap使用数组保存key-value,具体的数据结构是hash列表并使用线性探测再散列解决hash冲突。还有一点要注意的是上述key-value组成的EntryWeakReference子类,所以在对ThreadLocal进行setget时会删除被GC回收的无效Entry,在使用不当时可能会造成内存泄漏,另外使用hash列表实现的底层数据结构也具有较高的时间复杂度。

Netty实现的FastThreadLocal底层也是通过数据维护Key-value对象,与Java原生ThreadLocal使用ThreadLocal作为Key不同的是,FastThreadLocal通过保存数组下标实现了对value的快速访问。同时FastThreadLocal也实现了多种方式避免了内存泄漏问题,下面会对这些内容进行分别介绍。

为了叙述方便,下文使用FTL指代Netty的FastThreadLocal,使用TL指代Java原生ThreadLocal

2 FTL相关数据结构和类结构

Netty为了采用FTL,在Java Thread的基础上实现了自己的FastThreadLocalThread。为了理解FTL,我们需要关注FastThreadLocalThread的两个字段cleanupFastThreadLocalsthreadLocalMap,其中cleanupFastThreadLocals和FTL是否能主动清理有关,我们后面会介绍,threadLocalMap则类似JavaThread类的threadLocals,用于保存该FastThreadLocalThread持有的所有FTL数据。

2.1 InternalThreadLocalMap

FastThreadLocalThread.threadLocalMapInternalThreadLocalMap对象实例。
在第一次获取FTL数据时,会初始化FastThreadLocalThread.threadLocalMap,调用的构造函数如下:

//InternalThreadLocalMap
private InternalThreadLocalMap() {
    //为了简便,InternalThreadLocalMap父类
    //UnpaddedInternalThreadLocalMap不展开介绍
    super(newIndexedVariableTable());
}
//默认的数组大小为32,且使用UNSET对象填充数组
//如果下标处数据为UNSET,则表示没有数据
private static Object[] newIndexedVariableTable() {
    Object[] array = new Object[32];
    Arrays.fill(array, UNSET);
    return array;
}

InternalThreadLocalMap源码可以发现,为了避免伪共享(false sharing)问题,其使用了缓存行填充技术,在类定义中声明了如下long字段进行填充,具体可以参考Disruptor相关文献,基本上所有介绍Disruptor的文章都会提到伪共享问题,在Java8中则可以使用@sun.misc.Contended注解避免伪共享问题。

//InternalThreadLocalMap
// Cache line padding (must be public)
// With CompressedOops enabled, an instance of this class should occupy at least 128 bytes.
public long rp1, rp2, rp3, rp4, rp5, rp6, rp7, rp8, rp9;

上面我们说到FTL保存了数组下标,FTL使用的数组下标是由InternalThreadLocalMap中的静态变量nextIndex统一递增生成的:

//InternalThreadLocalMap

static final AtomicInteger nextIndex = new AtomicInteger();


public static int nextVariableIndex() {
    //Netty中所有FTL数组下标都是通过递增这个静态变量实现的
    //采用静态变量生成所有FTL元素在数组中的下标会造成一个问题,
    //会造成InternalThreadLocalMap中数组不必要的自动扩容
    //这个问题我在文章最后会列出来供讨论。
    int index = nextIndex.getAndIncrement();
    if (index < 0) {
        nextIndex.decrementAndGet();
        throw new IllegalStateException("too many thread-local indexed variables");
    }
    return index;
}

InternalThreadLocalMap中数组大小默认为32,在FTL构造函数中会调用上面的InternalThreadLocalMap.nextVariableIndex()方法获取FTL在该FastThreadLocalThread.threadLocalMap数组下标,因为InternalThreadLocalMap.nextVariableIndex()使用静态域nextIndex递增维护所有FTL的下标,会造成后面实例化的FTL下标过大,如果FTL下标大于其对应FastThreadLocalThread.threadLocalMap数组的Length,会进行数组的自动扩容,如下:

//InternalThreadLocalMap
private void expandIndexedVariableTableAndSet(int index, Object value) {
    Object[] oldArray = indexedVariables;
    final int oldCapacity = oldArray.length;
    //下面复杂的实现是为了将newCapacity规范为最接近的一个2的指数
    int newCapacity = index;
    newCapacity |= newCapacity >>>  1;
    newCapacity |= newCapacity >>>  2;
    newCapacity |= newCapacity >>>  4;
    newCapacity |= newCapacity >>>  8;
    newCapacity |= newCapacity >>> 16;
    newCapacity ++;

    Object[] newArray = Arrays.copyOf(oldArray, newCapacity);
    Arrays.fill(newArray, oldCapacity, newArray.length, UNSET);
    newArray[index] = value;
    indexedVariables = newArray;
}

2.2 FTL类结构

和TL一样,为了使用FTL,我们需要实现其子类,重写初始化函数,在第一次获取时会使用初始化函数initialValue对FTL实际持有的数据进行初始化:

/**
* Returns the initial value for this thread-local variable.
*/
protected V initialValue() throws Exception {
    return null;
}

//FTL还提供了onRemoval钩子函数,
//让使用者在该FTL被移除时可以有机会做些操作。
/**
* Invoked when this thread local variable is removed by {@link #remove()}.
*/
protected void onRemoval(@SuppressWarnings("UnusedParameters") V value) throws Exception { }

上面的继承、初始化原理和TL差不多,不再介绍,如想看实例,则可以看PooledByteBufAllocator.PoolThreadLocalCache的源码。

下面再介绍FTL的三个非常重要的下标域,和TL不同的是,FTL不仅在FastThreadLocalThread.threadLocalMap中保存了用户实际使用的对象(在数组中的下标为index),还在数组中保存为了实现清理记录的相关数据,也即下标variablesToRemoveIndex和cleanerFlagIndex,所以一个FTL的一个实例最多会消耗三个下标,也就最多会占用三个数组位置。

//FastThreadLocal
//如果在该FTL中放入了数据,也就实际调用了其set或get函数,会在
//该FastThreadLocalThread.threadLocalMap数组的
//variablesToRemoveIndex下标处放置一个IdentityHashMap,
//并将该FTL放入IdentityHashMap中,在后续清理时会取出
//variablesToRemoveIndex下标处的IdentityHashMap进行清理
private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex();

//在threadLocalMap数组中存放实际数据的下标
private final int index;

//存放是否已经加入到后台清理线程队列的标志
private final int cleanerFlagIndex;

public FastThreadLocal() {
    index = InternalThreadLocalMap.nextVariableIndex();
    cleanerFlagIndex = InternalThreadLocalMap.nextVariableIndex();
}

下面看FTL的get方法实现:

//FastThreadLocal
/**
* Returns the current value for the current thread
*/
@SuppressWarnings("unchecked")
public final V get() {
    //初始化或获取该线程的threadLocalMap,下面会分析其源码实现
    InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
    //获取数组在该FTL index下标处的元素
    Object v = threadLocalMap.indexedVariable(index);
    //如果不是默认填充的UNSET,则表示是有效数据,直接返回
    if (v != InternalThreadLocalMap.UNSET) {
        return (V) v;
    }

    //如果没有有效数据,则进行初始化initialize会调用子类
    //实现的initialValue获取初始化数据,如下面列出源码所示
    V value = initialize(threadLocalMap);
    //如果需要的话,将该FTL注册到后台清理任务中,
    //后文在介绍清理时会介绍
    registerCleaner(threadLocalMap);
    return value;
}

 private V initialize(InternalThreadLocalMap threadLocalMap) {
    V v = null;
    try {
        v = initialValue();
    } catch (Exception e) {
        PlatformDependent.throwException(e);
    }
    //设置该下标处的值为初始值,如果下标大于数组容量,则会进行扩容
    threadLocalMap.setIndexedVariable(index, v);
    //将FTL放到该map数组下标variablesToRemoveIndex对应的
    //IdentityHashMap中为后续清理做准备。
    addToVariablesToRemove(threadLocalMap, this);
    return v;
}

下面看InternalThreadLocalMap.get()实现:

//InternalThreadLocalMap
public static InternalThreadLocalMap get() {
    Thread thread = Thread.currentThread();
    //首先看当前thread是否为FastThreadLocalThread实例
    //如果是的话,上文介绍过,可以快速获取到其threadLocalMap
    if (thread instanceof FastThreadLocalThread) {
        return fastGet((FastThreadLocalThread) thread);
    } else {
        //如果不是,则慢速获取到其threadLocalMap
        return slowGet();
    }
}

//如果当前thread是FastThreadLocalThread实例,则直接获取其
//threadLocalMap域,第一次获取则进行初始化
private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
    InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
    if (threadLocalMap == null) {
        thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
    }
    return threadLocalMap;
}

//如果当前thread不是FastThreadLocalThread实例,则在Java
//原生ThreadLocal中放置一个InternalThreadLocalMap,然后再
//返回,InternalThreadLocalMap放在原生TL中,获取速度比较慢
private static InternalThreadLocalMap slowGet() {
    //见下面代码,这样访问会在Java原生ThreadLocal
    //中放置一个InternalThreadLocalMap
    ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap;
    InternalThreadLocalMap ret = slowThreadLocalMap.get();
    if (ret == null) {
        ret = new InternalThreadLocalMap();
        slowThreadLocalMap.set(ret);
    }
    return ret;
}

//UnpaddedInternalThreadLocalMap
static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = new ThreadLocal<InternalThreadLocalMap>();

3 FTL数据设置或获取

3.1 数据放置

数据放置比较简单,主要涉及到FTL的getset方法,get方法上面已经列出过源码,下面主要看set方法:

//FastThreadLocal
/**
* Set the value for the current thread.
*/
public final void set(V value) {
    //如果设置的值不为默认填充的UNSET才进行设置
    if (value != InternalThreadLocalMap.UNSET) {
        //获取该线程对应的map对象
        InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
        //在map对象中放置该值
        if (setKnownNotUnset(threadLocalMap, value)) {
            //如果需要的话,将该map注册到后台清理任务中,
            //后文在介绍清理时会介绍
            registerCleaner(threadLocalMap);
        }
    } else {
        //如果放置的对象为UNSET,则表示清理,会对该FTL进行清理
        remove();
    }
}


/**
* @return see {@link InternalThreadLocalMap#setIndexedVariable(int, Object)}.
*/
private boolean setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {
    //在数组下标index处放置实际对象,如果index大于数组length,
    //会进行数组扩容,可见上面的源码。
    //放置成功之后,将该FTL加入到variablesToRemoveIndex下标的
    //IdentityHashMap,等待后续清理
    if (threadLocalMap.setIndexedVariable(index, value)) {
        addToVariablesToRemove(threadLocalMap, this);
        return true;
    }
    return false;
}

//该FTL加入到variablesToRemoveIndex下标的IdentityHashMap
//IdentityHashMap的特性可以保证同一个实例不会被多次加入到该位置
@SuppressWarnings("unchecked")
private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
    //获取variablesToRemoveIndex下标处的IdentityHashMap
    Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
    Set<FastThreadLocal<?>> variablesToRemove;
    //如果是第一次获取,则variablesToRemoveIndex下标处的值
    //为UNSET,所以下面会新建一个新的IdentityHashMap并
    //放入到下标variablesToRemoveIndex处
    if (v == InternalThreadLocalMap.UNSET || v == null) {
        variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>());
        threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove);
    } else {
        //如果不是第一次访问variablesToRemoveIndex下标,则该
        //下标对应的元素已经是IdentityHashMap,所以直接类型转换
        variablesToRemove = (Set<FastThreadLocal<?>>) v;
    }
    //将该FTL放入该IdentityHashMap中
    variablesToRemove.add(variable);
}

3.2 数据获取

FTL数据获取见get方法,上面已经列出其源码,这里不再赘述。

4 清理

4.1 主动清理

上面在介绍FastThreadLocalThread类时,提到其有一个字段为cleanupFastThreadLocals,这个字段则用于标识该线程在结束时是否会主动清理FTL。下面看FastThreadLocalThread构造函数,被FastThreadLocalRunnable.wrap方法修饰的Runnable会将cleanupFastThreadLocals置为true,表示FTL会在线程结束时被主动清理,wrap方法会把原Runnable.run方法放在try里,然后在finally中调用FastThreadLocal.removeAll()方法,该方法会对FTL进行清理,具体可看下面列出的源码。没有被wrap的则cleanupFastThreadLocals为false,需要将FTL放入后台清理线程的队列中,其实就对应上面出现多次的registerCleaner方法:

//FastThreadLocalThread
public FastThreadLocalThread() {
    cleanupFastThreadLocals = false;
}

public FastThreadLocalThread(Runnable target) {
    super(FastThreadLocalRunnable.wrap(target));
    cleanupFastThreadLocals = true;
}

public FastThreadLocalThread(ThreadGroup group, Runnable target) {
    super(group, FastThreadLocalRunnable.wrap(target));
    cleanupFastThreadLocals = true;
}

public FastThreadLocalThread(String name) {
    super(name);
    cleanupFastThreadLocals = false;
}

public FastThreadLocalThread(ThreadGroup group, String name) {
    super(group, name);
    cleanupFastThreadLocals = false;
}

public FastThreadLocalThread(Runnable target, String name) {
    super(FastThreadLocalRunnable.wrap(target), name);
    cleanupFastThreadLocals = true;
}

public FastThreadLocalThread(ThreadGroup group, Runnable target, String name) {
    super(group, FastThreadLocalRunnable.wrap(target), name);
    cleanupFastThreadLocals = true;
}

public FastThreadLocalThread(ThreadGroup group, Runnable target, String name, long stackSize) {
    super(group, FastThreadLocalRunnable.wrap(target), name, stackSize);
    cleanupFastThreadLocals = true;
}

//FastThreadLocalRunnable
final class FastThreadLocalRunnable implements Runnable {
    private final Runnable runnable;

    private FastThreadLocalRunnable(Runnable runnable) {
        this.runnable = ObjectUtil.checkNotNull(runnable, "runnable");
    }

    @Override
    public void run() {
        try {
            runnable.run();
        } finally {
            FastThreadLocal.removeAll();
        }
    }

    static Runnable wrap(Runnable runnable) {
        //被wrap的Runable会变成FastThreadLocalRunnable对象
        //FastThreadLocalRunnable在run方法的finally会调用
        //FastThreadLocal.removeAll();在线程结束时对FTL
        //进行主动清理
        return runnable instanceof FastThreadLocalRunnable ? runnable : new FastThreadLocalRunnable(runnable);
    }
}

主动清理调用的FastThreadLocal.removeAll()源码如下:

//FastThreadLocal
/**
* Removes all {@link FastThreadLocal} variables bound to the current thread.  This operation is useful when you
* are in a container environment, and you don't want to leave the thread local variables in the threads you do not
* manage.
*/
public static void removeAll() {
    //首先看能够获取该线程对应的threadLocalMap,如果从来没有
    //访问过该线程的FTL,则获取的为null,不用执行任何清理,直接
    //return
    InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet();
    if (threadLocalMap == null) {
        return;
    }

    try {
        //否则获取下标variablesToRemoveIndex上对应的IdentityHashMap
        Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
        //成功获取到IdentityHashMap
        if (v != null && v != InternalThreadLocalMap.UNSET) {
            @SuppressWarnings("unchecked")
            Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;
            //转成数组
            FastThreadLocal<?>[] variablesToRemoveArray =
                    variablesToRemove.toArray(new FastThreadLocal[variablesToRemove.size()]);
            //对IdentityHashMap中的每个FTL,进行清理
            for (FastThreadLocal<?> tlv: variablesToRemoveArray) {
                tlv.remove(threadLocalMap);
            }
        }
    } finally {
        //因为这里是线程结束的主动清理,线程结束则threadLocalMap
        //也不再会被使用,所以这里清理线程的threadLocalMap
        InternalThreadLocalMap.remove();
    }
}


 /**
* Sets the value to uninitialized for the specified thread local map;
* a proceeding call to get() will trigger a call to initialValue().
* The specified thread local map must be for the current thread.
*/
@SuppressWarnings("unchecked")
public final void remove(InternalThreadLocalMap threadLocalMap) {
    if (threadLocalMap == null) {
        return;
    }
    //移除下标index处的实际数据(应用程序初始化的数据)
    Object v = threadLocalMap.removeIndexedVariable(index);
    //获取下标variablesToRemoveIndex处的IdentityHashMap,并将该
    //IdentityHashMap中记录的正在移除的FTL删除
    removeFromVariablesToRemove(threadLocalMap, this);

    if (v != InternalThreadLocalMap.UNSET) {
        try {
            //这里调用上面2.2节提到的可供用户自定义的onRemoval钩子函
            //数
            onRemoval((V) v);
        } catch (Exception e) {
            PlatformDependent.throwException(e);
        }
    }
}

线程主动清理的实现比较简单,首先找到线程的threadLocalMap,因为每次在访问FTL时,FTL都会将自己放置到数组下标variablesToRemoveIndex处的IdentityHashMap中,清理时也就找到在下标variablesToRemoveIndex处的IdentityHashMap,移除所有的FTL,最后线程退出,将线程的threadLocalMap置空。

4.2 后台线程清理

没能被FastThreadLocalRunnable.wrap方法修饰的Runnable任务,无法通过finally语句块做到主动清理,因此FastThreadLocalThread.cleanupFastThreadLocals = false,上面在介绍FastThreadLocalsetget方法时,发现都调用了registerCleaner方法,这个方法就是将清理任务注册到专门负责清理工作的后台线程中,下面看其源码:

//FastThreadLocal
private void registerCleaner(final InternalThreadLocalMap threadLocalMap) {
    Thread current = Thread.currentThread();
    //willCleanupFastThreadLocals就是排除上面主动清理的情形,如果
    //线程能自己主动清理,那么不需要将清理任务注册到后台清理线程中
    if (FastThreadLocalThread.willCleanupFastThreadLocals(current) ||
        //数组下标cleanerFlagIndex在上面介绍过,主要为了标记
        //该FTL有没有被注册到后台清理线程中,第一次注册之后,
        //会将该下标对应的数组位置置为true,如果已经注册过,
        //则不用重复注册
        threadLocalMap.indexedVariable(cleanerFlagIndex) != InternalThreadLocalMap.UNSET) {
        return;
    }
    // removeIndexedVariable(cleanerFlagIndex) isn't necessary because the finally cleanup is tied to the lifetime
    // of the thread, and this Object will be discarded if the associated thread is GCed.
    //如果不是主动清理,且第一次注册,则将该下标对应的
    //数组元素置为true,表示已经注册过
    threadLocalMap.setIndexedVariable(cleanerFlagIndex, Boolean.TRUE);

    // We will need to ensure we will trigger remove(InternalThreadLocalMap) so everything will be released
    // and FastThreadLocal.onRemoval(...) will be called.
    //向后台线程注册一个清理任务
    ObjectCleaner.register(current, new Runnable() {
        @Override
        public void run() {
            remove(threadLocalMap);

            // It's fine to not call InternalThreadLocalMap.remove() here as this will only be triggered once
            // the Thread is collected by GC. In this case the ThreadLocal will be gone away already.
        }
    });
}

这里run函数里实际清理方法remove(threadLocalMap)的实现的实现在上面介绍主动清理时已经介绍,下面主要看ObjectCleaner.register实现:

//ObjectCleaner
/**
* Register the given {@link Object} for which the {@link Runnable} will be executed once there are no references
* to the object anymore.
*
* This should only be used if there are no other ways to execute some cleanup once the Object is not reachable
* anymore because it is not a cheap way to handle the cleanup.
*/
public static void register(Object object, Runnable cleanupTask) {
    //根据当前线程和上面的清理任务构造一个reference
    //AutomaticCleanerReference是WeakReference子类
    AutomaticCleanerReference reference = new AutomaticCleanerReference(object,
            ObjectUtil.checkNotNull(cleanupTask, "cleanupTask"));
    // Its important to add the reference to the LIVE_SET before we access CLEANER_RUNNING to ensure correct
    // behavior in multi-threaded environments.
    //将该reference放入LIVE_SET中,等待清理线程清理
    LIVE_SET.add(reference);

    //如果清理线程已经退出,则新建一个清理线程
    // Check if there is already a cleaner running.
    if (CLEANER_RUNNING.compareAndSet(false, true)) {
        //该线程实际工作由Runnable CLEANER_TASK定义
        final Thread cleanupThread = new FastThreadLocalThread(CLEANER_TASK);
        cleanupThread.setPriority(Thread.MIN_PRIORITY);
        // Set to null to ensure we not create classloader leaks by holding a strong reference to the inherited
        // classloader.
        // See:
        // - https://github.com/netty/netty/issues/7290
        // - https://bugs.openjdk.java.net/browse/JDK-7008595
        //避免classLoader造成内存泄漏,所以设置线程上下文
        //加载器为null
        AccessController.doPrivileged(new PrivilegedAction<Void>() {
            @Override
            public Void run() {
                cleanupThread.setContextClassLoader(null);
                return null;
            }
        });
        cleanupThread.setName(CLEANER_THREAD_NAME);

        //设置为daemon线程,避免不退出
        // Mark this as a daemon thread to ensure that we the JVM can exit if this is the only thread that is
        // running.
        cleanupThread.setDaemon(true);
        cleanupThread.start();
    }
}

后台清理线程的实际工作定义在CLEANER_TASK中,所以下面看其源码实现:

//ObjectCleaner
private static final Runnable CLEANER_TASK = new Runnable() {
    @Override
    public void run() {
        boolean interrupted = false;
        for (;;) {
            // Keep on processing as long as the LIVE_SET is not empty and once it becomes empty
            // See if we can let this thread complete.
            //如果LIVE_SET不为空,则表示还有清理任务没有完成
            //使用while循环进行清理
            while (!LIVE_SET.isEmpty()) {
                //AutomaticCleanerReference是WeakReference子类
                //所以如果没有强引用,那么在GC时会被回收
                //并将引用自身放入队列中
                final AutomaticCleanerReference reference;
                try {
                    //这里就从队列中取出被回收的reference
                    //remove可能会被中断,在catch中会记录
                    //中断标志
                    reference = (AutomaticCleanerReference) REFERENCE_QUEUE.remove(REFERENCE_QUEUE_POLL_TIMEOUT_MS);
                } catch (InterruptedException ex) {
                    // Just consume and move on
                    //记录中断标志
                    interrupted = true;
                    continue;
                }
                if (reference != null) {
                    try {
                        //这里的cleanup方法实际上调用的是
                        //FastThreadLocal注册的Runnable,主要
                        //调用remove(threadLocalMap)方法
                        reference.cleanup();
                    } catch (Throwable ignored) {
                        // ignore exceptions, and don't log in case the logger throws an exception, blocks, or has
                        // other unexpected side effects.
                    }
                    //清理完毕从LIVE_SET中移除该引用
                    LIVE_SET.remove(reference);
                }
            }//while
            //设置该清理线程运行状态为非运行
            CLEANER_RUNNING.set(false);

            // Its important to first access the LIVE_SET and then CLEANER_RUNNING to ensure correct
            // behavior in multi-threaded environments.
            //发生下面两种情况线程可以结束:
            //(1)如果LIVE_SET为空,已经没有清理任务
            //(2)上面已经将CLEANER_RUNNING置为false,如果
            //下面compareAndSet(false, true)失败,
            //则表示已经有其他线程注册清理
            //任务时重新启动了一个新的线程,所以为了保证只有一个
            //线程在运行,这里也退出线程
            if (LIVE_SET.isEmpty() || !CLEANER_RUNNING.compareAndSet(false, true)) {
                // There was nothing added after we set STARTED to false or some other cleanup Thread
                // was started already so its safe to let this Thread complete now.
                break;
            }
        }//for
        //如果线程被中断过,则恢复中断标志
        if (interrupted) {
            // As we caught the InterruptedException above we should mark the Thread as interrupted.
            Thread.currentThread().interrupt();
        }
    }
};

5 阅读源码发现的一个问题

上面说到FTL会维护三个下标,一个保存实际的数据,其他两个和数据清理有关,且三个下标都由InternalThreadLocalMap的静态域nextIndex递增产生,每个threadLocalMap数组默认大小为32,这样就会有一个问题,如果有12个线程,每个线程声明了1个FTL,且每个FTL都实际消耗了三个下标,那么前面11个线程的FTL会将nextIndex递增为32,第12个线程的FTL数据下标为34(这里是34,因为FTL的variablesToRemoveIndex先于数据下标index被初始化为33),该线程的threadLocalMap容量为32,但是因为该FTL下标为34,所以在设置数据时,如下面代码所示,会触发一次扩容,虽然数组容量32一个位置都还没有被实际使用:

//InternalThreadLocalMap
public boolean setIndexedVariable(int index, Object value) {
    Object[] lookup = indexedVariables;
    //index为34, lookup.length为32,所以进入else进行扩容
    if (index < lookup.length) {
        Object oldValue = lookup[index];
        lookup[index] = value;
        return oldValue == UNSET;
    } else {
        expandIndexedVariableTableAndSet(index, value);
        return true;
    }
}

相关文章

网友评论

    本文标题:Netty源码-FastThreadLocal原理

    本文链接:https://www.haomeiwen.com/subject/zcsizqtx.html