前言
现在正是抗击新型冠状病毒关键时期,为了响应国家号召在家休息减少外出这是作为我们普通老百姓对国家最大的贡献,同时必要外出时必须要戴上口罩,回家后需要使用消毒剂或肥皂洗手。相信国家和一线的工作者还有我们广大的人民群众我们一定能够战胜。有关最新疫情实时数据可以查看官方数据以下链接直达!
什么是ThreadLocal
回到今天的主题,ThreadLocal我相信有个工作经验的小伙伴一定有所了解,或许你可能已经深入源码了解过,so 今天我们一起来从头到尾、简单到复杂以及深入源码一起来分析。
那么什么是TheadLocal呢?到目前为止你可以这样理解,联想到我们的JMM内存模型,以下代码说明:
public static void main(String[] args) {
Integer temp = Integer.valueOf(10);
for (int i = 0; i < 10; i++) {
new Thread(() -> System.out.println("The thread is hold the "+temp+" temp "));
}
}
可以从上面的代码看出我们的java内存模型
- 首先在虚拟机内存分配一个
Integer
大小的内存然后一个temp
内存指向改内存地址 - 开启10个线程,每个线程从主存中获取
temp
指向地址内存的副本 - 然后输出副本内容
But我们的ThreadLocal和JMM有点类似,即每个线程都持有ThreadLocal的一个本地副本线程持有
,并且互相隔离、不相互影响。
ThreadLocal使用场景
入门示例
package com.ouwen.springboot.juc;
import java.util.concurrent.TimeUnit;
/**
* @author <a href="http://youngitman.tech">青年IT男</a>
* @version v1.0.0
* @className ThreadLocalTest
* @description
* @date 2020-01-19 22:06
* @JunitTest: {@link }
**/
public class ThreadLocalTest {
public static void main(String[] args) throws InterruptedException {
testForNonInit();
// testForInit();
TimeUnit.MILLISECONDS.sleep(10_000);//jvm在所有非Daemon线程退出后停止
}
/***
*
* 测试不带初始化方式
*
* @author liyong
* @date 11:32 2020-01-29
* * @param
* @exception
* @return void
**/
private static void testForNonInit() {
ThreadLocal threadLocal = new ThreadLocal<Integer>();//new 一个示例无初始值
for (int i = 0; i < 10; i++) {
final int index = I;
new Thread(() -> {
System.out.println("The thread" + Thread.currentThread().getName() + " index number is " + threadLocal.get() + " at before");
threadLocal.set(index);
System.out.println("The thread" + Thread.currentThread().getName() + " index number is " + threadLocal.get() + " at after ");
threadLocal.remove();
}).start();
}
testMain(threadLocal);
}
/***
*
* 测试带初始化值方式
*
* @author liyong
* @date 11:32 2020-01-29
* * @param
* @exception
* @return void
**/
private static void testForInit() {
ThreadLocal<Integer> threadLocal = ThreadLocal.withInitial(() -> 10);//通过静态方法withInitial传入一个初始化方法,更灵活
for (int i = 0; i < 10; i++) {
final int index = I;
new Thread(() -> {
System.out.println("The thread" + Thread.currentThread().getName() + " index number is " + threadLocal.get() + " at before");
threadLocal.set(index);
System.out.println("The thread" + Thread.currentThread().getName() + " index number is " + threadLocal.get() + " at after ");
threadLocal.remove();
}).start();
}
testMain(threadLocal);
}
/***
*
* 测试主线程
*
* @author liyong
* @date 11:45 2020-01-29
* * @param threadLocal
* @exception
* @return void
**/
private static void testMain(ThreadLocal threadLocal){
System.out.println("The thread" + Thread.currentThread().getName() + " index number is " + threadLocal.get() + " at before");
threadLocal.set("`mainindex`");
System.out.println("The thread" + Thread.currentThread().getName() + " index number is " + threadLocal.get() + " at after ");
threadLocal.remove();
}
}
-
testForNonInit()输出结果
The threadThread-9 index number is null at before The threadThread-0 index number is null at before The threadThread-6 index number is null at before The threadThread-7 index number is null at before The threadThread-4 index number is null at before The threadThread-3 index number is null at before The threadThread-5 index number is null at before The threadThread-1 index number is null at before The threadThread-9 index number is 9 at after The threadThread-8 index number is null at before The threadThread-7 index number is 7 at after The threadThread-2 index number is null at before The threadThread-4 index number is 4 at after The threadThread-1 index number is 1 at after The threadThread-5 index number is 5 at after The threadThread-6 index number is 6 at after The threadThread-8 index number is 8 at after The threadThread-0 index number is 0 at after The threadmain index number is null at before The threadThread-3 index number is 3 at after The threadThread-2 index number is 2 at after The threadmain index number is `mainindex` at after
可以看出在不带初始化值的方式是
threadLocal.get()
是没有值的,同时每一个线程threadLocal.set(index)
设置的值互不相影响、相互隔离。 -
testForInit()输出结果
The threadThread-4 index number is 10 at before The threadThread-2 index number is 10 at before The threadThread-8 index number is 10 at before The threadThread-2 index number is 2 at after The threadThread-8 index number is 8 at after The threadThread-5 index number is 10 at before The threadThread-5 index number is 5 at after The threadThread-7 index number is 10 at before The threadThread-7 index number is 7 at after The threadThread-0 index number is 10 at before The threadmain index number is 10 at before The threadThread-0 index number is 0 at after The threadThread-1 index number is 10 at before The threadThread-1 index number is 1 at after The threadThread-4 index number is 4 at after The threadThread-3 index number is 10 at before The threadThread-3 index number is 3 at after The threadThread-9 index number is 10 at before The threadThread-9 index number is 9 at after The threadThread-6 index number is 10 at before The threadThread-6 index number is 6 at after The threadmain index number is `mainindex` at after
可以看出在不带初始化值的方式是
threadLocal.get()
是有 默认值的,同时每一个线程threadLocal.set(index)
设置的值互不相影响、相互隔离。 -
图解
ThreadLocal
Spring事务使用
在spring中对事物管理的抽象如下:
AbstractTransactionManager其中DataSourceTransactionManager
是我们着重关注的,首先看下我们的使用demo
@Override
public void hello2() {
// 构造一个准备使用此事务的定义信息~~~
DefaultTransactionDefinition transactionDefinition = new DefaultTransactionDefinition();
transactionDefinition.setReadOnly(false);
//隔离级别,-1表示使用数据库默认级别
transactionDefinition.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED);
transactionDefinition.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
// 根据此事务属性,拿到一个事务实例 注意此处的入参是一个:TransactionDefinition
TransactionStatus transaction = transactionManager.getTransaction(transactionDefinition);
try {
// =================做你的逻辑start 必须try住,但无需写finally=======================
// 向数据库插入一条记录
String sql = "insert into user (name,age) values ('fsx',21)";
jdbcTemplate.update(sql);
// 做其余的事情 可能抛出异常
System.out.println(1 / 0);
// =================做你的逻辑start=======================
// 提交事务
transactionManager.commit(transaction);
} catch (Exception e) {
// 若发现异常 事务进行回滚
transactionManager.rollback(transaction);
throw e;
}
}
从AbstractPlatformTransactionManager.getTransaction
中可以到
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
Object transaction = doGetTransaction();//着重看这里其他忽略掉
//...
}
看DataSourceTransactionManager.doGetTransaction
的实现
@Override
protected Object doGetTransaction() {
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
txObject.setSavepointAllowed(isNestedTransactionAllowed());
ConnectionHolder conHolder =
(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());//从事务管理器中获取ConnectionHolder,而ConnectionHolder封装了对数据连接的持有
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
继续跟踪org.springframework.jdbc.datasource.DataSourceTransactionManager#obtainDataSource
/**
* Obtain the DataSource for actual use.获取真实的数据库连接
* @return the DataSource (never {@code null})
* @throws IllegalStateException in case of no DataSource set
* @since 5.0
*/
protected DataSource obtainDataSource() {
DataSource dataSource = getDataSource();
Assert.state(dataSource != null, "No DataSource set");
return dataSource;
}
然后使用org.springframework.transaction.support.TransactionSynchronizationManager#getResource
public static Object getResource(Object key) {
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);//获取真实对象 Unwrap the given resource handle if necessary; otherwise return the given handle as-is
Object value = doGetResource(actualKey);
if (value != null && logger.isTraceEnabled()) {
logger.trace("Retrieved value [" + value + "] for key [" + actualKey + "] bound to thread [" +
Thread.currentThread().getName() + "]");
}
return value;
}
关键的地方到了org.springframework.transaction.support.TransactionSynchronizationManager#doGetResource
private static final ThreadLocal<Map<Object, Object>> resources =
new NamedThreadLocal<>("Transactional resources");//事务和线程绑定 <DataSource,ConnectionHolder>
/**
* Actually check the value of the resource that is bound for the given key.
*/
@Nullable
private static Object doGetResource(Object actualKey) {
Map<Object, Object> map = resources.get();
if (map == null) {
return null;
}
Object value = map.get(actualKey);//获取这个DataSource对应的ConnectionHolder
// Transparently remove ResourceHolder that was marked as void...
if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
map.remove(actualKey);
// Remove entire ThreadLocal if empty...
if (map.isEmpty()) {
resources.remove();//help gc
}
value = null;
}
return value;
}
ThreadLocal源码分析
下面我们一起来通过上面的入门示例分析ThreadLocal源码
-
ThreadLocal的实例化
-
new ThreadLocal()
空的构造函数略过/** * Creates a thread local variable. * @see #withInitial(java.util.function.Supplier) */ public ThreadLocal() { }
-
ThreadLocal.withInitial(method)
需要提供一个Supplier
方法这是jdk8提供的方式(提供一个生产者
方法)public static <S> ThreadLocal<S> withInitial(Supplier<? extends S> supplier) { return new SuppliedThreadLocal<>(supplier); }
SuppliedThreadLocal
是TheadLocal的内部类static final class SuppliedThreadLocal<T> extends ThreadLocal<T> { private final Supplier<? extends T> supplier; SuppliedThreadLocal(Supplier<? extends T> supplier) { this.supplier = Objects.requireNonNull(supplier); } //重写ThreadLocal的initialValue方法,后面会降到调用地方 @Override protected T initialValue() { return supplier.get();//调用supplier的get方法执行method方法体 } }
-
-
ThreadLocal.get()方法调用
public T get() { Thread t = Thread.currentThread();//拿到当前线程 ThreadLocalMap map = getMap(t);//从t线程中获取 ThreadLocal.ThreadLocalMap threadLocals持有的值 if (map != null) {//当前线程存在ThreadLocalMap ThreadLocalMap.Entry e = map.getEntry(this);//以当天ThreadLocal对象为key从ThreadLocal.ThreadLocalMap获取ThreadLocalMap.Entry值 if (e != null) { @SuppressWarnings("unchecked") T result = (T)e.value;//强制转换T类型 return result; } } return setInitialValue();//调用 }
使用
java.lang.ThreadLocal#getMap
获取当前线程持有的ThreadLocal.ThreadLocalMap
ThreadLocalMap getMap(Thread t) { return t.threadLocals; }
分析
java.lang.ThreadLocal.ThreadLocalMap#getEntry
方法获取ThreadLocalMap.Entry
private Entry getEntry(ThreadLocal<?> key) { int i = key.threadLocalHashCode & (table.length - 1);//获取在数组中索引为止。可以认为就是我们日常所说的取模,区别是这个二进制运算更高效且数据更为分散碰撞几率低等特点(使用了斐波那契散列法)。 Entry e = table[I]; if (e != null && e.get() == key)//e不等于null并且Entry的key==传入的ThreadLocal return e; else return getEntryAfterMiss(key, i, e);//e==null或者key和当前i查找出来TheadLocal不相等 }
接下来继续分析
java.lang.ThreadLocal.ThreadLocalMap#getEntryAfterMiss
方法private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) { Entry[] tab = table; int len = tab.length; //根据进入条件e==null或者key和当前i查找,可以知道当进入条件e==null时直接return null while (e != null) { ThreadLocal<?> k = e.get(); if (k == key)//再次判断ThreadLoacl对象是否相等 return e; if (k == null)//k等于null说明被gc回收,触发一些`stale`的清理(后面具体分析回收原理) expungeStaleEntry(i); else i = nextIndex(i, len);//可理解为环查找 e = tab[I]; } return null; } }
下面重点分析java.lang.ThreadLocal.ThreadLocalMap#expungeStaleEntry
方法,这个方法主要做了一些stale
的Entry的清理工作并且重新整理存在hash索引冲突的元素位置。
private int expungeStaleEntry(int staleSlot) {//staleSlot表示这个Entry的key被回收的索引位置
Entry[] tab = table;
int len = tab.length;
// expunge entry at staleSlot to help gc
tab[staleSlot].value = null;
tab[staleSlot] = null;
size--;//统计减1
// Rehash until we encounter null
Entry e;
int I;
for (i = nextIndex(staleSlot, len);//初始位置
(e = tab[i]) != null;//数组中的Entry不为null
i = nextIndex(i, len)) {//从i位置开始查找下个索引位置(环型查找)
ThreadLocal<?> k = e.get();
if (k == null) {//说明Entry的key(ThreadLocal)被gc回收
e.value = null;//help gc
tab[i] = null;
size--;
} else {
int h = k.threadLocalHashCode & (len - 1);
if (h != i) {//说明i位置的ThreadLocal和h位置的ThreadLocal索引不等,存在冲突
tab[i] = null;
// Unlike Knuth 6.4 Algorithm R, we must scan until
// null because multiple entries could have been stale.
while (tab[h] != null)//从h位置向下查找为null的位置
h = nextIndex(h, len);
tab[h] = e;//把i这个位置的entry放到查找到为null值的位置去(整理冲突位置元素)
}
}
}
return I;
}
理解下java.lang.ThreadLocal.ThreadLocalMap#nextIndex
,相当于是环型查找
private static int nextIndex(int i, int len) {
return ((i + 1 < len) ? i + 1 : 0);
}
当ThreadLocalMap不存在值调用java.lang.ThreadLocal#setInitialValue
进行相关初始化
private T setInitialValue() {
T value = initialValue();//调用SuppliedThreadLocal重写的initialValue方法
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);//添加当前ThreadLocal的值到ThreadLocalMap
else
createMap(t, value);//创建ThreadLocalMap
return value;
}
-
ThreadLocal.set()方法调用
public void set(T value) { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t);//首先获取当前线程持有的 ThreadLocal.ThreadLocalMap if (map != null) map.set(this, value);//覆盖该值 else createMap(t, value);//不存在新建 ThreadLocal.ThreadLocalMap }
下面重点分析java.lang.ThreadLocal.ThreadLocalMap#set
方法
private void set(ThreadLocal<?> key, Object value) {
// We don't use a fast path as with get() because it is at
// least as common to use set() to create new entries as
// it is to replace existing ones, in which case, a fast
// path would fail more often than not.
Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1);//获取在数组中索引为止。可以认为就是我们日常所说的取模,区别是这个二进制运算更高效且数据更为分散碰撞几率低等特点(使用了斐波那契散列法)。
for (Entry e = tab[i];//i索引位置初始值
e != null;//循环条件Entry不为null
e = tab[i = nextIndex(i, len)]) {//环型查找元素
ThreadLocal<?> k = e.get();
if (k == key) {//Entry数组中存在以这个ThreadLocal对象,替换它的值
e.value = value;
return;
}
if (k == null) {//该Entry的ThreadLocal被gc回收
replaceStaleEntry(key, value, i);
return;
}
}
//不存在Entry,new一个对象
tab[i] = new Entry(key, value);
int sz = ++size;//统计值增加1
if (!cleanSomeSlots(i, sz) && sz >= threshold)
rehash();
}
分析一下java.lang.ThreadLocal.ThreadLocalMap#replaceStaleEntry
方法,该方法从staleSlot开始查找替换stale
的值(被gc回收了的)
private void replaceStaleEntry(ThreadLocal<?> key, Object value,
int staleSlot) {
Entry[] tab = table;
int len = tab.length;
Entry e;
// Back up to check for prior stale entry in current run.
// We clean out whole runs at a time to avoid continual
// incremental rehashing due to garbage collector freeing
// up refs in bunches (i.e., whenever the collector runs).
int slotToExpunge = staleSlot;
for (int i = prevIndex(staleSlot, len);//staleSlot索引位置开始向前查找
(e = tab[i]) != null;//循环查找到Entry不为null为止
i = prevIndex(i, len))//向前环型查找(反向)
if (e.get() == null)//查找Entry的key被gc回收的索引位置
slotToExpunge = I;
// Find either the key or trailing null slot of run, whichever
// occurs first
for (int i = nextIndex(staleSlot, len);//初始值索引位置
(e = tab[i]) != null;//查找到Entry不为null为止
i = nextIndex(i, len)) {//环型向下查找索引(正向)
ThreadLocal<?> k = e.get();
// If we find key, then we need to swap it
// with the stale entry to maintain hash table order.
// The newly stale slot, or any other stale slot
// encountered above it, can then be sent to expungeStaleEntry
// to remove or rehash all of the other entries in run.
if (k == key) {//因为ThreadLocal通过线性探针方式,在存在hash冲突的时候查找下一个null位置插入元素,索引这里就存在交换会原谅未冲突的索引位置,提高后面的查找效率
e.value = value;
tab[i] = tab[staleSlot];
tab[staleSlot] = e;
// Start expunge at preceding stale entry if it exists
if (slotToExpunge == staleSlot)
slotToExpunge = I;
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
return;
}
// If we didn't find stale entry on backward scan, the
// first stale entry seen while scanning for key is the
// first still present in the run.
if (k == null && slotToExpunge == staleSlot)
slotToExpunge = I;
}
// If key not found, put new entry in stale slot
tab[staleSlot].value = null;//help gc
tab[staleSlot] = new Entry(key, value);//新建一个Entry
// If there are any other stale entries in run, expunge them
if (slotToExpunge != staleSlot)
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
}
下面看下java.lang.ThreadLocal.ThreadLocalMap#cleanSomeSlots
方法,该方法主要通过启发式的查找过时(stale)的Entry,同时两种触发方式分别是:新增元素、有其他stale
的Entry被清理的时候。扫描逻辑为:它执行对数数量的扫描,以平衡无扫描(快速但保留垃圾)和与元素数量成比例的扫描次数
/**
* Heuristically scan some cells looking for stale entries.
* This is invoked when either a new element is added, or
* another stale one has been expunged. It performs a
* logarithmic number of scans, as a balance between no
* scanning (fast but retains garbage) and a number of scans
* proportional to number of elements, that would find all
* garbage but would cause some insertions to take O(n) time.
*
* @param i a position known NOT to hold a stale entry. The
* scan starts at the element after i.
*
* @param n scan control: {@code log2(n)} cells are scanned,
* unless a stale entry is found, in which case
* {@code log2(table.length)-1} additional cells are scanned.
* When called from insertions, this parameter is the number
* of elements, but when from replaceStaleEntry, it is the
* table length. (Note: all this could be changed to be either
* more or less aggressive by weighting n instead of just
* using straight log n. But this version is simple, fast, and
* seems to work well.)
*
* @return true if any stale entries have been removed.
*/
private boolean cleanSomeSlots(int i, int n) {
boolean removed = false;
Entry[] tab = table;
int len = tab.length;
do {
i = nextIndex(i, len);
Entry e = tab[I];
if (e != null && e.get() == null) {
n = len;
removed = true;
i = expungeStaleEntry(i);
}
} while ( (n >>>= 1) != 0);//对数
return removed;
}
当前线程不存在ThreadLocalMap是调用java.lang.ThreadLocal#createMap
void createMap(Thread t, T firstValue) {
t.threadLocals = new ThreadLocalMap(this, firstValue);//new一个对象
}
这里关键看ThreadLocalMap
构造函数代码如下:
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
table = new Entry[INITIAL_CAPACITY];
int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);//获取在数组中索引为止。可以认为就是我们日常所说的取模,区别是这个二进制运算更高效且数据更为分散碰撞几率低等特点(使用了斐波那契散列法)。
table[i] = new Entry(firstKey, firstValue);
size = 1;
setThreshold(INITIAL_CAPACITY);//设置容量阀值
}
可以参考散列算法了解斐波那契散列法。
ThreadLocal彩蛋
InheritableThreadLocal
在一些特定的场景需要在当前线程开启另外一个线程同时当前现在的ThreadLocal中的ThreadLocal.ThreadLocalMap的值需要传递到被开启的线程中那么InheritableThreadLocal就是这个目的。在接下来的文章会分析这个类。
从早上10点左右开始整理和编写这篇文章确实写得有点累,但对于坚持在一线疫区的英雄们他们才是真正的英雄,向他们致敬!希望通过我写的这篇文章帮助到一些小伙伴,同时希望大家一起行动起来我们一起来打赢这场没有硝烟的战争。
网友评论