美文网首页
ef core中对缓存并发写的处理(部分)

ef core中对缓存并发写的处理(部分)

作者: Codmowa | 来源:发表于2021-02-21 01:10 被阅读0次

    结论: 加锁

    如何加锁?

    ef core 为了性能, 需要缓存大量的表达式树(Expression) 和 表达式树 所对应的 SqlText.
    当多个线程同时对一个key进行缓存时就会出现并发. 这时候就需要先执行其中一个线程的操作 同时阻塞剩余的线程.
    同时这个阻塞不能是全局的 否则将会严重影响性能.
    ef core 的解决办法就是 一个key 对应一把锁

    源码解析

    下面是ef core 的部分源码, 这个方法的目的就是 根据表达式和参数 获取 SqlCommand .
    如果缓存中有,那么走缓存,
    如果没有则重新计算 并根据计算的结果指示进行缓存.

            private static readonly ConcurrentDictionary<object, object> _locks   = new(); # 使用线程安全的 ConcurrentDictionary 来存锁.
    
            private readonly IMemoryCache _memoryCache;
    
            public virtual IRelationalCommand GetRelationalCommand([NotNull] IReadOnlyDictionary<string, object?> parameters)
            {
                var cacheKey = new CommandCacheKey(_selectExpression, parameters); # 算出缓存的 key 
    
                if (_memoryCache.TryGetValue(cacheKey, out IRelationalCommand relationalCommand)) # 获取对象 并返回. 
                {
                    return relationalCommand;
                }
    
                // When multiple threads attempt to start processing the same query (program startup / thundering
                // herd), have only one actually process and block the others.
                // Note that the following synchronization isn't perfect - some race conditions may cause concurrent
                // processing. This is benign (and rare).
                var compilationLock = _locks.GetOrAdd(cacheKey, _ => new object());  # 取锁,如果有其他的线程, 那么将会获得同一把锁. 没有则new 一个
                try
                {
                    lock (compilationLock)  # 加锁
                    {
                        # 因为与其他线程用的是同一把锁,能进入临界区,说明 这时其他线程 可能释放掉了锁. 并且已经把对象缓存, 所以需要再次尝试获取对象
                        if (!_memoryCache.TryGetValue(cacheKey, out relationalCommand))  
                        {
                            var selectExpression = _relationalParameterBasedSqlProcessor.Optimize(
                                _selectExpression, parameters, out var canCache);
                            relationalCommand = _querySqlGeneratorFactory.Create().GetCommand(selectExpression);
    
                            if (canCache)
                            {
                                _memoryCache.Set(cacheKey, relationalCommand, new MemoryCacheEntryOptions { Size = 10 });
                            }
                        }
    
                        return relationalCommand;
                    }
                }
                finally
                {
                    _locks.TryRemove(cacheKey, out _);  # 完成 缓存 释放锁, 如果此时有其他的线程正在使用同一个cacheKey , 这个方法会return false. 
                }
            }
    
    

    简而言之, 就是利用 ConcurrentDictionary 线程安全的特性(可以理解为 访问从 ConcurrentDictionary 中获取的对象会天然的有一把琐).
    当完成对CacheKey的占用后 释放锁.

    try remove 测试代码 和 执行结果

    using System;
    using System.Collections.Concurrent;
    using System.Threading;
    using System.Threading.Tasks;
    
    namespace ConsoleApp1
    {
        class Program
        {
            private static readonly ConcurrentDictionary<object, object> _locks
        = new();
            static async Task Main(string[] args)
            {
                var key = "key";
                var task1 =  Task.Run(async () => await TestLock(key));
                 var task2 =  Task.Run(async () =>await TestLock(key));
                 await Task.WhenAll(task1, task2);
            }
    
            public static async Task TestLock(string cacheKey)
            {
                var compilationLock = _locks.GetOrAdd(cacheKey, _ => new object());
                await Task.Delay(1000);
                try
                {
                    Console.WriteLine($"Thread: {Thread.CurrentThread.ManagedThreadId} Enter Locking");
                    lock (compilationLock)
                    {
                        Console.WriteLine($"Thread: {Thread.CurrentThread.ManagedThreadId} Locking at Date {DateTime.Now.ToLongTimeString()}");
                        Thread.Sleep(10000);
                    }
                }
                finally
                {
                    Console.WriteLine($"Thread: {Thread.CurrentThread.ManagedThreadId} Remove Lock Result is  {_locks.TryRemove(cacheKey, out _)}");
                }
            }
        }
    }
    
    

    try remove 执行结果,

    因为 Thread 4 已经先一步Remove ,所以 Thread 6 失败了. 并且因为使用的同一把锁, 所以6比4晚了10秒才进入临界区

    Thread: 4 Enter Locking
    Thread: 6 Enter Locking
    Thread: 4 Locking at Date 7:17:26
    Thread: 6 Locking at Date 7:17:36
    Thread: 4 Remove Lock Result is  True
    Thread: 6 Remove Lock Result is  False
    

    参考:
    https://docs.microsoft.com/en-us/dotnet/api/system.collections.concurrent.concurrentdictionary-2?view=net-5.0
    https://github.com/dotnet/efcore/blob/main/src/EFCore/Query/Internal/CompiledQueryCache.cs

    相关文章

      网友评论

          本文标题:ef core中对缓存并发写的处理(部分)

          本文链接:https://www.haomeiwen.com/subject/vssffltx.html