从Spring框架中的一个BUG来分析锁使用的问题
锁的使用是一个非常细致的事情,如果使用不当会导致系统性能急剧下降,下面我们通过分析Spring框架中一个锁使用错误的一个BUG来总结一下Lock这部分的知识。 关于BUG的细节,可以Google " MimeTypeUtils LRU cache " 关键词找到。
本文将关键的代码摘出。
测试有问题版本的 ConcurrentLruCache
注意: metrics 属性及其逻辑,是为了收集指标加入的,原本Spring框架代码中没有这段
/**
* 并发的Lur Cache
* @param <K>
* @param <V>
*/
private static class ConcurrentLruCache<K, V> implements LruCache<K, V>{
private final int maxSize;
private final ConcurrentLinkedQueue<K> queue = new ConcurrentLinkedQueue<>();
private final ConcurrentHashMap<K, V> cache = new ConcurrentHashMap<>();
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final Function<K, V> generator;
private final List<Long> metrics;
public ConcurrentLruCache(int maxSize, Function<K, V> generator , List<Long> metrics) {
Assert.isTrue(maxSize > 0, "LRU max size should be positive");
Assert.notNull(generator, "Generator function should not be null");
this.maxSize = maxSize;
this.generator = generator;
this.metrics = metrics;
}
@Override
public V get(K key) {
this.lock.readLock().lock();
try {
if (this.queue.size() < this.maxSize / 2) {
V cached = this.cache.get(key);
if (cached != null) {
return cached;
}
}
else if (this.queue.remove(key)) {
this.queue.add(key);
return this.cache.get(key);
}
}
finally {
this.lock.readLock().unlock();
}
this.lock.writeLock().lock();
long s = System.currentTimeMillis();
try {
// retrying in case of concurrent reads on the same key
if (this.queue.remove(key)) {
this.queue.add(key);
return this.cache.get(key);
}
if (this.queue.size() == this.maxSize) {
K leastUsed = this.queue.poll();
if (leastUsed != null) {
this.cache.remove(leastUsed);
}
}
V value = this.generator.apply(key);
this.queue.add(key);
this.cache.put(key, value);
return value;
}
finally {
metrics.add(System.currentTimeMillis()-s);
this.lock.writeLock().unlock();
}
}
}
我们通过 benchmarkLruCache 的程序进行测试,这个程序也会测试修改后的版本。
/**
*
* @param loop 循环次数
* @param sample 模拟的VALUE不同种类数
* @param lruCache 测试的Lru实现
* @return
*/
private static List<Long> benchmarkLruCache(int loop , int sample , LruCache<String,Object> lruCache){
List<Long> times = Collections.synchronizedList(new ArrayList<>(loop));
CountDownLatch countDownLatch = new CountDownLatch(loop);
ExecutorService executor = Executors.newFixedThreadPool(100);
IntStream.rangeClosed(1,loop)
.forEach(i ->{
executor.execute(() -> {
long s = System.currentTimeMillis();
IntStream.rangeClosed(1,sample)
.forEach(n -> {
String v = n+"";
if(!lruCache.get(v).equals(v .hashCode())){
throw new RuntimeException("结果错误");
}
});
times.add((System.currentTimeMillis() - s));
countDownLatch.countDown();
});
});
executor.shutdown();
try {
countDownLatch.await();
}catch (InterruptedException e){}
return times;
}
构造 concurrentLruCache 进行测试
int loop = 10000;
int maxSize = 64;
int sample = 65;
List<Long> metrics1 = Collections.synchronizedList(new ArrayList<>(loop * sample));
LruCache<String,Object> concurrentLruCache = new ConcurrentLruCache<String, Object>(maxSize, v -> v.hashCode(),metrics1);
System.out.println("有问题的 ConcurrentLruCache 版本总耗时 : "+benchmarkLruCache(loop,sample,concurrentLruCache).stream()
// .peek(System.out::println)
.mapToLong(v -> v)
.sum());
System.out.println("有问题的 ConcurrentLruCache 写次数与耗时 : "+metrics1.stream().mapToLong(v -> v).summaryStatistics());
测试结果
有问题的 ConcurrentLruCache 版本总耗时 : LongSummaryStatistics{count=650000, sum=127519404, min=0, average=196.183698, max=1222}
有问题的 ConcurrentLruCache 写次数与耗时 : LongSummaryStatistics{count=267141, sum=20882, min=0, average=0.078168, max=2}
测试优化后版本的 ConcurrentLruCache
private static class OptimizeConcurrentLruCache<K, V> implements LruCache<K, V>{
private final int maxSize;
private final ConcurrentLinkedDeque<K> queue = new ConcurrentLinkedDeque<>();
private final ConcurrentHashMap<K, V> cache = new ConcurrentHashMap<>();
private final ReadWriteLock lock;
private final Function<K, V> generator;
private volatile int size = 0;
private final List<Long> metrics;
public OptimizeConcurrentLruCache(int maxSize, Function<K, V> generator ,List<Long> metrics ) {
Assert.isTrue(maxSize > 0, "LRU max size should be positive");
Assert.notNull(generator, "Generator function should not be null");
this.maxSize = maxSize;
this.generator = generator;
this.lock = new ReentrantReadWriteLock();
this.metrics = metrics;
}
@Override
public V get(K key) {
V cached = this.cache.get(key);
if (cached != null) {
if (this.size < this.maxSize) {
return cached;
}
this.lock.readLock().lock();
try {
if (this.queue.removeLastOccurrence(key)) {
this.queue.offer(key);
}
return cached;
}
finally {
this.lock.readLock().unlock();
}
}
this.lock.writeLock().lock();
long s = System.currentTimeMillis();
try {
// Retrying in case of concurrent reads on the same key
cached = this.cache.get(key);
if (cached != null) {
if (this.queue.removeLastOccurrence(key)) {
this.queue.offer(key);
}
return cached;
}
// Generate value first, to prevent size inconsistency
V value = this.generator.apply(key);
int cacheSize = this.size;
if (cacheSize == this.maxSize) {
K leastUsed = this.queue.poll();
if (leastUsed != null) {
this.cache.remove(leastUsed);
cacheSize--;
}
}
this.queue.offer(key);
this.cache.put(key, value);
this.size = cacheSize + 1;
return value;
}
finally {
metrics.add(System.currentTimeMillis() - s);
this.lock.writeLock().unlock();
}
}
}
构造 concurrentLruCache 进行测试,测试参数保持一致
List<Long> metrics2 = Collections.synchronizedList(new ArrayList<>(loop * sample));
LruCache<String,Object> optimizeConcurrentLruCache = new OptimizeConcurrentLruCache<String, Object>(maxSize, v -> v.hashCode(),metrics2);
System.out.println("优化后的 ConcurrentLruCache 版本总耗时 : "+benchmarkLruCache(loop,sample,optimizeConcurrentLruCache).stream()
// .peek(System.out::println)
.mapToLong(v -> v)
.summaryStatistics());
System.out.println("优化后的 ConcurrentLruCache 写次数与耗时 : "+metrics2.stream().mapToLong(v -> v).summaryStatistics());
测试结果
优化后的 ConcurrentLruCache 版本总耗时 : LongSummaryStatistics{count=650000, sum=1881120, min=0, average=2.894031, max=62}
优化后的 ConcurrentLruCache 写次数与耗时 : LongSummaryStatistics{count=56995, sum=44, min=0, average=0.000772, max=19}
原因分析
我们通过 jstack 打印出线程堆栈
"pool-1-thread-100" #109 prio=5 os_prio=31 tid=0x00007fdc0708c800 nid=0xe403 waiting on condition [0x000070000a660000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
// .... 省略
at com.izhengyin.demo.concurrent.part6.LruCacheIssueTest$ConcurrentLruCache.get(LruCacheIssueTest.java:133)
// .... 省略
"pool-1-thread-97" #106 prio=5 os_prio=31 tid=0x00007fdc07817800 nid=0xe003 waiting on condition [0x000070000a357000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
// .... 省略
at com.izhengyin.demo.concurrent.part6.LruCacheIssueTest$ConcurrentLruCache.get(LruCacheIssueTest.java:114)
// .... 省略
通过堆栈信息,可以直观的看到,线程都在 WAITING 状态,分别是在 LruCacheIssueTest.java 133行与114行,查看源码,这两行分别对应的写锁的获取与读锁的获取
类: ConcurrentLruCache
public V get(K key) {
... 省略
this.lock.readLock().lock(); 114 行
... 省略
this.lock.writeLock().lock(); 113 行
}
分析源码,按照我们给定的参数 (maxSize < sample)
-
get 方法每次执行时都会先获取读锁 , 如果此时写锁为释放那么就会阻塞
-
按照我们的样品数,分析读锁代码块里的代码,这段代码有三个问题 (看注释)
2.1 操作queue未加写锁,因此 queue 会被不断的Remove / add , 同时也导致了很多 this.queue.remove(key) 是不会命中的,这样就讲流程下放到了写操作,这种情况线程数越多竞争的越厉害,情况越会加剧
2.2 this.queue 的 remove 是从头往后找,而add是网队尾添加,所以大部分情况都会进行N次扫描,效率不高
2.3 ConcurrentLinkedQueue remove ,存在内存泄漏问题。据描述是8u60版本引入的,我下下载了8u101与8u102的源代码,确定问题在8u102被修复,也就是说java8的部分版本也收到了影响。
//这段代码不会被满足 ,因为 queue "一直" 是满的
if (this.queue.size() < this.maxSize / 2) {
V cached = this.cache.get(key);
if (cached != null) {
return cached;
}
}
//这段代码有二个问题
// 1. 此次操作queue未加写锁,因此 queue 会被不断的Remove / add , 同时也导致了很多 this.queue.remove(key) 是不会命中的,这样就讲流程下放到了写操作,这种情况线程数越多竞争的越厉害,情况越会加剧
// 2. this.queue 的 remove 是从头往后找,而add是网队尾添加,所以大部分情况都会进行N次扫描,效率不高
else if (this.queue.remove(key)) {
this.queue.add(key);
return this.cache.get(key);
}
- 写操作上逻辑上没有太大问题,主要是大量的写锁或阻塞读,读锁也会反过来阻塞写锁的获取。所以问题代码主要还是读操作代码段的问题
优化后的版本分析
- 采用了 ConcurrentLinkedDeque 替换了 ConcurrentLinkedQueue , 通过 removeLastOccurrence 从后往前删除,减少了删除操作的扫码次数。
- 读操作,先不加锁,只要能读到,就返回,极大的减少写流程的次数,这也就减少了获取锁的等待
V cached = this.cache.get(key);
if (cached != null) {
//这个条件不会成立,但影响不大了
if (this.size < this.maxSize) {
return cached;
}
this.lock.readLock().lock();
try {
// 采用了 ConcurrentLinkedDeque 替换了 ConcurrentLinkedQueue , 通过 removeLastOccurrence 从后往前删除,减少了删除操作的扫码次数。
if (this.queue.removeLastOccurrence(key)) {
this.queue.offer(key);
}
// 只要能读到就返回
return cached;
}
finally {
this.lock.readLock().unlock();
}
}
我们将两次的结果放在一起做一个直观对比,可以很直观的看到优化前后的差异。
有问题的 ConcurrentLruCache 版本总耗时 : LongSummaryStatistics{count=650000, sum=127519404, min=0, average=196.183698, max=1222}
有问题的 ConcurrentLruCache 写次数与耗时 : LongSummaryStatistics{count=267141, sum=20882, min=0, average=0.078168, max=2}
优化后的 ConcurrentLruCache 版本总耗时 : LongSummaryStatistics{count=650000, sum=1881120, min=0, average=2.894031, max=62}
优化后的 ConcurrentLruCache 写次数与耗时 : LongSummaryStatistics{count=56995, sum=44, min=0, average=0.000772, max=19}
网友评论