结论: 加锁
如何加锁?
ef core 为了性能, 需要缓存大量的表达式树(Expression) 和 表达式树 所对应的 SqlText.
当多个线程同时对一个key进行缓存时就会出现并发. 这时候就需要先执行其中一个线程的操作 同时阻塞剩余的线程.
同时这个阻塞不能是全局的 否则将会严重影响性能.
ef core 的解决办法就是 一个key 对应一把锁
源码解析
下面是ef core 的部分源码, 这个方法的目的就是 根据表达式和参数 获取 SqlCommand .
如果缓存中有,那么走缓存,
如果没有则重新计算 并根据计算的结果指示进行缓存.
private static readonly ConcurrentDictionary<object, object> _locks = new(); # 使用线程安全的 ConcurrentDictionary 来存锁.
private readonly IMemoryCache _memoryCache;
public virtual IRelationalCommand GetRelationalCommand([NotNull] IReadOnlyDictionary<string, object?> parameters)
{
var cacheKey = new CommandCacheKey(_selectExpression, parameters); # 算出缓存的 key
if (_memoryCache.TryGetValue(cacheKey, out IRelationalCommand relationalCommand)) # 获取对象 并返回.
{
return relationalCommand;
}
// When multiple threads attempt to start processing the same query (program startup / thundering
// herd), have only one actually process and block the others.
// Note that the following synchronization isn't perfect - some race conditions may cause concurrent
// processing. This is benign (and rare).
var compilationLock = _locks.GetOrAdd(cacheKey, _ => new object()); # 取锁,如果有其他的线程, 那么将会获得同一把锁. 没有则new 一个
try
{
lock (compilationLock) # 加锁
{
# 因为与其他线程用的是同一把锁,能进入临界区,说明 这时其他线程 可能释放掉了锁. 并且已经把对象缓存, 所以需要再次尝试获取对象
if (!_memoryCache.TryGetValue(cacheKey, out relationalCommand))
{
var selectExpression = _relationalParameterBasedSqlProcessor.Optimize(
_selectExpression, parameters, out var canCache);
relationalCommand = _querySqlGeneratorFactory.Create().GetCommand(selectExpression);
if (canCache)
{
_memoryCache.Set(cacheKey, relationalCommand, new MemoryCacheEntryOptions { Size = 10 });
}
}
return relationalCommand;
}
}
finally
{
_locks.TryRemove(cacheKey, out _); # 完成 缓存 释放锁, 如果此时有其他的线程正在使用同一个cacheKey , 这个方法会return false.
}
}
简而言之, 就是利用 ConcurrentDictionary 线程安全的特性(可以理解为 访问从 ConcurrentDictionary 中获取的对象会天然的有一把琐).
当完成对CacheKey的占用后 释放锁.
try remove 测试代码 和 执行结果
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApp1
{
class Program
{
private static readonly ConcurrentDictionary<object, object> _locks
= new();
static async Task Main(string[] args)
{
var key = "key";
var task1 = Task.Run(async () => await TestLock(key));
var task2 = Task.Run(async () =>await TestLock(key));
await Task.WhenAll(task1, task2);
}
public static async Task TestLock(string cacheKey)
{
var compilationLock = _locks.GetOrAdd(cacheKey, _ => new object());
await Task.Delay(1000);
try
{
Console.WriteLine($"Thread: {Thread.CurrentThread.ManagedThreadId} Enter Locking");
lock (compilationLock)
{
Console.WriteLine($"Thread: {Thread.CurrentThread.ManagedThreadId} Locking at Date {DateTime.Now.ToLongTimeString()}");
Thread.Sleep(10000);
}
}
finally
{
Console.WriteLine($"Thread: {Thread.CurrentThread.ManagedThreadId} Remove Lock Result is {_locks.TryRemove(cacheKey, out _)}");
}
}
}
}
try remove 执行结果,
因为 Thread 4 已经先一步Remove ,所以 Thread 6 失败了. 并且因为使用的同一把锁, 所以6比4晚了10秒才进入临界区
Thread: 4 Enter Locking
Thread: 6 Enter Locking
Thread: 4 Locking at Date 7:17:26
Thread: 6 Locking at Date 7:17:36
Thread: 4 Remove Lock Result is True
Thread: 6 Remove Lock Result is False
参考:
https://docs.microsoft.com/en-us/dotnet/api/system.collections.concurrent.concurrentdictionary-2?view=net-5.0
https://github.com/dotnet/efcore/blob/main/src/EFCore/Query/Internal/CompiledQueryCache.cs
网友评论