美文网首页Java
FastThreadLocal详解

FastThreadLocal详解

作者: 爱健身的兔子 | 来源:发表于2020-10-26 09:26 被阅读0次

    FastThreadLocal的引入背景和原理简介

    既然jdk已经有ThreadLocal,为何netty还要自己造个FastThreadLocal?FastThreadLocal快在哪里?

    这需要从jdk ThreadLocal的本身说起。如下图:


    jdk ThreadLocal

    在java线程中,每个线程都有一个ThreadLocalMap实例变量(如果不使用ThreadLocal,不会创建这个Map,一个线程第一次访问某个ThreadLocal变量时,才会创建)。该Map是使用线性探测的方式解决hash冲突的问题,如果没有找到空闲的slot,就不断往后尝试,直到找到一个空闲的位置,插入entry,这种方式在经常遇到hash冲突时,影响效率。

    FastThreadLocal(下文简称ftl)直接使用数组避免了hash冲突的发生,具体做法是:每一个FastThreadLocal实例创建时,分配一个下标index;分配index使用AtomicInteger实现,每个FastThreadLocal都能获取到一个不重复的下标。当调用ftl.get()方法获取值时,直接从数组获取返回,如return array[index],如下图:


    netty FastThreadLocal

    FastThreadLocalThread

    在Netty中,要使用 FastThreadLocal 实现线程本地变量需要将线程包装成 FastThreadLocalThread ,如果不是 FastThreadLocalThread ,会使用 slowThreadLocalMap的 ThreadLocal 来存储变量副本。

    ================io.netty.util.concurrent.DefaultThreadFactory =============
    @Override
    public Thread newThread(Runnable r) {
        Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
        // 一般daemon为false,意思是不设置为守护线程
        if (t.isDaemon() != daemon) {
            t.setDaemon(daemon);
        }
        // 优先级 默认为5
        if (t.getPriority() != priority) {
            t.setPriority(priority);
        }
        return t;
    }
    
    protected Thread newThread(Runnable r, String name) {
        return new FastThreadLocalThread(threadGroup, r, name);
    }
    

    FastThreadLocalThread 继承自Thread类,有如下成员变量:

    ===============io.netty.util.concurrent.FastThreadLocalThread============
    // 任务执行完,是否清除FastThreadLocal的标记
    private final boolean cleanupFastThreadLocals;
    // 类似于Thread类中ThreadLocalMap,为了实现FastThreadLocal
    private InternalThreadLocalMap threadLocalMap;
    

    InternalThreadLocalMap分析

    ftl的实现,涉及到InternalThreadLocalMap。InternalThreadLocalMap 类的继承关系图如下:


    在这里插入图片描述

    UnpaddedInternalThreadLocalMap的主要属性

    static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = new ThreadLocal<InternalThreadLocalMap>();
    static final AtomicInteger nextIndex = new AtomicInteger();
    Object[] indexedVariables;
    

    数组 indexedVariables 就是用来存储 ftl 的 value 的,使用下标的方式直接访问。nextIndex在ftl实例创建时用来给每个ftl实例分配一个下标,slowThreadLocalMap在线程不是 FastThreadLocalThread 时使用到。

    InternalThreadLocalMap 底层数据结构

    InternalThreadLocalMap中指定了 indexedVariables 的初始化的值UNSET。

    public static final Object UNSET = new Object();// 用于标识数组的槽位还未使用
    
    private BitSet cleanerFlags;//标记indexedVariables 索引的值是否被
    

    注意:

    有些情况下,我们自己创建的线程并没有使用FastThreadLocalThread,为了适配这种情况,Netty通过Thread原生的ThreadLocal来保存InternalThreadLocalMap。关系图如下:


    netty

    即Thread中的属性threadLocalMap中key是上面定义的:ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap,注意这是一个静态变量。

    如果使用了FastThreadLocalThread,那么存储数据的结构直接使用Netty定义的InternalThreadLocalMap,通过Object[] indexedVariables保存key(FastThreadLocal)和value。nextIndex是一个自增的数组下表。JDK的ThreadLocal是通过Hash计算到下表,遇到冲突通过线性探测法解决。Netty确定了每一个key对应的value在数组中的下标,因此不会有冲突发生。


    Netty

    下面看一下它的构造方法

    private InternalThreadLocalMap() {
      super(newIndexedVariableTable());
    }
    private static Object[] newIndexedVariableTable() {
        // 创建数据,填充Object对象
        Object[] array = new Object[32];
        Arrays.fill(array, UNSET);
        return array;
    }
    

    FastThreadLocal

    FastThreadLocal 的主要属性是 index,作为 InternalThreadLocalMap 的 indexedVariables 数组的下标,在创建时有 InternalThreadLocalMap的 nextIndex 原子类生成一个全局唯一的递增下标。

    ==============================FastThreadLocal================================
    // 常量0,存放FastThreadLocal集合的下标
    private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex();
    // 每一个FastThreadLocal对应的数组的下标
    private final int index;
    
    public FastThreadLocal() {
        index = InternalThreadLocalMap.nextVariableIndex();
    }
    

    set方法

    set方法的目的是以当前FastThreadLocal为key,设置value到当前FastThreadLocalThread类型线程的 InternalThreadLocalMap实例中。

    ==============================FastThreadLocal================================
    
    public final void set(V value) {
        // UNSET为空的Object对象,不允许为空
        if (value != InternalThreadLocalMap.UNSET) {
                InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
                setKnownNotUnset(threadLocalMap, value);
            } else {
                remove();
          }
    }
    

    InternalThreadLocalMap.get

    ==============InternalThreadLocalMap====================================
    
    public static InternalThreadLocalMap get() {
        Thread thread = Thread.currentThread();//获取当前
        if (thread instanceof FastThreadLocalThread) {
            return fastGet((FastThreadLocalThread) thread);
        } else {
            return slowGet();
        }
    }
    

    判断当前线程的类型,如果是FastThreadLocalThread类型,则使用该类内部属性InternalThreadLocalMap,否则使用原生Thread对象中的ThreadLocalMap属性来,然后在此map中保存的key为JDK中的ThreadLocal对象,value为InternalThreadLocalMap(Netty中的类)。

    fastGet:

    获取InternalThreadLocalMa,如果不存在那么新创建一个:

    ==============InternalThreadLocalMap====================================
    
    private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
        InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
        if (threadLocalMap == null) {
            // 设置到FastThreadLocalThread对象内部的属性中
            thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
        }
        return threadLocalMap;
    }
    

    slowGet:

    获取当前线程ThreadLocaMap中key为slowThreadLocalMap(定义声明在UnpaddedInternalThreadLocalMap的常量),value是InternalThreadLocalMap。

    ==============InternalThreadLocalMap====================================
    
    private static InternalThreadLocalMap slowGet() {
        ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap;
        InternalThreadLocalMap ret = slowThreadLocalMap.get();
        if (ret == null) {
            ret = new InternalThreadLocalMap();
            slowThreadLocalMap.set(ret);
        }
        return ret;
    }
    

    setKnownNotUnset:

    当 InternalThreadLocalMap.get() 返回了 一个 InternalThreadLocalMap,接下来就可以保存数据了。

    =======================FastThreadLocal==================================
    
    private boolean setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {
        // 设置value
        if (threadLocalMap.setIndexedVariable(index, value)) {
            //将key(FastThreadLoca)添加到数组的第一个元素(Set集合)中
            addToVariablesToRemove(threadLocalMap, this);
            return true;
        }
        return false;
    }
    

    setIndexedVariable

    ======================InternalThreadLocalMap==============================
    
    public boolean setIndexedVariable(int index, Object value) {
        // indexedVariables是构造函数中创建的Object数组
        Object[] lookup = indexedVariables;
        // index是从FastThreadLcal中传入的值,初始为1(自增),因为0被静态变量variablesToRemoveIndex抢占了
        if (index < lookup.length) {
            Object oldValue = lookup[index];
            lookup[index] = value;
            return oldValue == UNSET;
        } else {
            expandIndexedVariableTableAndSet(index, value);
            return true;
        }
    }
    

    整体逻辑比较直观,如果index没有越界,那么进行覆盖,如果old value为 UNSET,那么返回true,否则返回false。如果越界则进行扩容并返回true。

    注意: 只有一种情况会返回fasle,即某个元素已经被赋值过了。那么返回false直接影响的是addToVariablesToRemove方法不会执行。即key不会再次被添加到Set集合中。因为已经被添加过了。

    expandIndexedVariableTableAndSet:

    ======================InternalThreadLocalMap===========================
    
    private void expandIndexedVariableTableAndSet(int index, Object value) {
        Object[] oldArray = indexedVariables;
        final int oldCapacity = oldArray.length;
        int newCapacity = index;
        newCapacity |= newCapacity >>>  1;
        newCapacity |= newCapacity >>>  2;
        newCapacity |= newCapacity >>>  4;
        newCapacity |= newCapacity >>>  8;
        newCapacity |= newCapacity >>> 16;
        newCapacity ++;
    
        Object[] newArray = Arrays.copyOf(oldArray, newCapacity);
        Arrays.fill(newArray, oldCapacity, newArray.length, UNSET);
        newArray[index] = value;
        indexedVariables = newArray;
    }
    

    这段代码的作用就是按原来的容量扩容2倍。并且保证结果是2的幂次方。JDK中HashMap的扩容也类似。扩容完成之后填充UNSET,并且将保存value。至此保存value的操作已经完毕。

    addToVariablesToRemove:

    =========================InternalThreadLocalMap===========================
    
    private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
        // 获取到下表为0的数组元素
        Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
        Set<FastThreadLocal<?>> variablesToRemove;
        // 如果未设置过或者为null
        if (v == InternalThreadLocalMap.UNSET || v == null) {
            // 创建一个IdentityHashMap的实现,IdentityHashMap和HashMap类似,只是key的相同判断方式以 ‘==’来决定
            variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>());
             // 将这个 Set 放到这个 Map 数组的下标 0 处
            threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove);
        } else {
            // 如果拿到的不是 UNSET ,说明这是第二次操作了,因此可以强转为 Set
            variablesToRemove = (Set<FastThreadLocal<?>>) v;
        }
        // 将 FastThreadLocal 放置到 Set 中
        variablesToRemove.add(variable);
    }
    

    这个方法的目的是将一条线程的所有FastThreadLocal对象保存到一个 Set 中,静态方法 removeAll 就需要使用到这个 Set,可以快速的删除线程 Map 里的所有 FTL 对应的 Value。 如果不使用 Set,那么就需要遍历 InternalThreadLocalMap。

    get方法

    get方法极为简单,实现如下:

    ===========================FastThreadLocal==========================
    
    public final V get() {
            InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
            Object v = threadLocalMap.indexedVariable(index);
            if (v != InternalThreadLocalMap.UNSET) {
                return (V) v;
            }
    
            return initialize(threadLocalMap);
        }
    

    首先获取当前线程的map,然后根据 FastThreadLocal的index 获取value,然后返回,如果是空对象,则通过 initialize 返回,initialize 方法会将返回值设置到 map 的槽位中,并放进 Set 中。

    initialize

    ============================FastThreadLocal==========================
    
    private V initialize(InternalThreadLocalMap threadLocalMap) {
        V v = null;
        try {
            //1、获取初始值
            v = initialValue();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        // 2、设置value到InternalThreadLocalMap中
        threadLocalMap.setIndexedVariables(index, v);
        // 3、添加当前的FastThreadLocal到InternalThreadLocalMap的Set<FastThreadLocal<?>>中
        addToVariablesToRemove(threadLocalMap, this);
    
        return v;
    }
    
    //初始化参数:由子类复写
    protected V initialValue() throws Exception {
        return null;
    }
    

    ftl的资源回收机制

    netty中ftl的两种回收机制回收机制:

    • 自动:使用ftlt执行一个被FastThreadLocalRunnable wrap的Runnable任务,在任务执行完毕后会自动进行ftl的清理。

    • 手动:ftl和InternalThreadLocalMap都提供了remove方法,在合适的时候用户可以(有的时候也是必须,例如普通线程的线程池使用ftl)手动进行调用,进行显示删除。

    FastThreadLocalRunnable

    final class FastThreadLocalRunnable implements Runnable {
        private final Runnable runnable;
    
        @Override
        public void run() {
            try {
                runnable.run();
            } finally {
                FastThreadLocal.removeAll();
            }
        }
    
        static Runnable wrap(Runnable runnable) {
            return runnable instanceof FastThreadLocalRunnable 
                    ? runnable : new FastThreadLocalRunnable(runnable);
        }
    }
    

    如果将线程执行的任务包装成 FastThreadLocalRunnable,那么在任务执行完后自动删除ftl的资源。

    ===============================FastThreadLocal===========================
    
    public static void removeAll() {
        // 获取到map
        InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet();
        if (threadLocalMap == null) {
            return;
        }
    
        try {
            // 获取到Set<FastThreadLocal>集合
            Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
            if (v != null && v != InternalThreadLocalMap.UNSET) {
                @SuppressWarnings("unchecked")
                Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;
                // 将Set转换为数组
                FastThreadLocal<?>[] variablesToRemoveArray =
                        variablesToRemove.toArray(new FastThreadLocal[variablesToRemove.size()]);
                // 遍历数组,删除每一个FastThreadLocal对应的value
                for (FastThreadLocal<?> tlv: variablesToRemoveArray) {
                    tlv.remove(threadLocalMap);
                }
            }
        } finally {
            // 删除当前线程的InternalThreadLocalMap
            InternalThreadLocalMap.remove();
        }
    }
    
    public static void remove() {
        Thread thread = Thread.currentThread();
        if (thread instanceof FastThreadLocalThread) {
             // 将FastThreadLocalThread 内部的map置位null
            ((FastThreadLocalThread) thread).setThreadLocalMap(null);
        } else {
            // 将 ThreadLocal内部ThreadLocalMap 中的value置位null
            slowThreadLocalMap.remove();
        }
    }
    

    remove方法:

    ===============================FastThreadLocal==========================
    
    private void remove() {
        remove(InternalThreadLocalMap.getIfSet());
    }
    
    private void remove(InternalThreadLocalMap threadLocalMap) {
        if (threadLocalMap == null) {
            return;
        }
        // 从 InternalThreadLocalMap 中删除当前的FastThreadLocal对应的value并设UNSET
        Object v = threadLocalMap.removeIndexedVariable(index);
        // 从 InternalThreadLocalMap 中的Set<FastThreadLocal<?>>中删除当前的FastThreadLocal对象
        removeFromVariablesToRemove(threadLocalMap, this);
        // 如果删除的是有效值,则进行onRemove方法的回调
        if (v != InternalThreadLocalMap.UNSET) {
            try {
                // 回调子类复写的onRemoved方法,默认为空实现
                onRemoved((V) v);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }
    

    图解netty:FastThreadLocal实现原理分析_Joel.Wang老王的专栏-CSDN博客

    Netty进阶:自顶向下解析FastThreadLocal_TheLudlows的博客-CSDN博客

    相关文章

      网友评论

        本文标题:FastThreadLocal详解

        本文链接:https://www.haomeiwen.com/subject/tgazpktx.html