美文网首页
面试问题之redis缓存穿透,缓存击穿,缓存雪崩解决方案分析

面试问题之redis缓存穿透,缓存击穿,缓存雪崩解决方案分析

作者: xuezhongyu01 | 来源:发表于2020-12-28 22:47 被阅读0次

    怎么预防redis的缓存击穿

    缓存穿透

    缓存穿透是指查询一个一定不存在的数据,由于缓存是不命中时被动写的,并且出于容错考虑,如果从存储层查不到数据则不写入缓存,这将导致这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义。在流量大时,可能DB就挂掉了,要是有人利用不存在的key频繁攻击我们的应用,这就是漏洞。

    解决方案

    1.接口层增加校验,如用户鉴权校验,id做基础校验,id<=0的直接拦截;
    2. 有很多种方法可以有效地解决缓存穿透问题,最常见的则是采用布隆过滤器,将所有可能存在的数据哈希到一个足够大的bitmap中,一个一定不存在的数据会被 这个bitmap拦截掉,从而避免了对底层存储系统的查询压力。另外也有一个更为简单粗暴的方法(我们采用的就是这种),如果一个查询返回的数据为空(不管是数 据不存在,还是系统故障),我们仍然把这个空结果进行缓存,但它的过期时间会很短,最长不超过五分钟。

    bloomfilter就类似于一个hash set,用于快速判某个元素是否存在于集合中,其典型的应用场景就是快速判断一个key是否存在于某容器,不存在就直接返回。布隆过滤器的关键就在于hash算法和容器大小,下面先来简单的实现下看看效果,我这里用guava实现的布隆过滤器:

    <dependencies>  
         <dependency>  
             <groupId>com.google.guava</groupId>  
             <artifactId>guava</artifactId>  
             <version>23.0</version>  
         </dependency>  
    </dependencies> 
    
    public class BloomFilterTest {
     
        private static final int capacity = 1000000;
        private static final int key = 999998;
     
        private static BloomFilter<Integer> bloomFilter = BloomFilter.create(Funnels.integerFunnel(), capacity);
     
        static {
            for (int i = 0; i < capacity; i++) {
                bloomFilter.put(i);
            }
        }
     
        public static void main(String[] args) {
            /*返回计算机最精确的时间,单位微妙*/
            long start = System.nanoTime();
     
            if (bloomFilter.mightContain(key)) {
                System.out.println("成功过滤到" + key);
            }
            long end = System.nanoTime();
            System.out.println("布隆过滤器消耗时间:" + (end - start));
            int sum = 0;
            for (int i = capacity + 20000; i < capacity + 30000; i++) {
                if (bloomFilter.mightContain(i)) {
                    sum = sum + 1;
                }
            }
            System.out.println("错判率为:" + sum);
        }
    }
    
    成功过滤到999998
    布隆过滤器消耗时间:215518
    错判率为:318
    

    可以看到,100w个数据中只消耗了约0.2毫秒就匹配到了key,速度足够快。然后模拟了1w个不存在于布隆过滤器中的key,匹配错误率为318/10000,也就是说,出错率大概为3%,跟踪下BloomFilter的源码发现默认的容错率就是0.03:

     
    public static <T> BloomFilter<T> create(Funnel<T> funnel, int expectedInsertions /* n */) {
      return create(funnel, expectedInsertions, 0.03); // FYI, for 3%, we always get 5 hash functions
    }
    

    我们项目中很多都是变化的数据,所以空数据很快就会增加。

    缓存雪崩

    缓存雪崩是指在我们设置缓存时采用了相同的过期时间,导致缓存在某一时刻同时失效,请求全部转发到DB,DB瞬时压力过重雪崩。

    解决方案

    缓存失效时的雪崩效应对底层系统的冲击非常可怕。大多数系统设计者考虑用加锁或者队列的方式保证缓存的单线 程(进程)写,从而避免失效时大量的并发请求落到底层存储系统上。这里分享一个简单方案就时讲缓存失效时间分散开,比如我们可以在原有的失效时间基础上增加一个随机值,比如1-5分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件。

    缓存击穿

    对于一些设置了过期时间的key,如果这些key可能会在某些时间点被超高并发地访问,是一种非常“热点”的数据。这个时候,需要考虑一个问题:缓存被“击穿”的问题,这个和缓存雪崩的区别在于这里针对某一key缓存,前者则是很多key。

    缓存在某个时间点过期的时候,恰好在这个时间点对这个Key有大量的并发请求过来,这些请求发现缓存过期一般都会从后端DB加载数据并回设到缓存,这个时候大并发的请求可能会瞬间把后端DB压垮。

    解决方案

    1.使用互斥锁(mutex key)

    业界比价普遍的一种做法,即根据key获取value值为空时,锁上,从数据库中load数据后再释放锁。若其它线程获取锁失败,则等待一段时间后重试。这里要注意,分布式环境中要使用分布式锁,单机的话用普通的锁(synchronized、Lock)就够了。

    业界比较常用的做法,是使用mutex。简单地来说,就是在缓存失效的时候(判断拿出来的值为空),不是立即去load db,而是先使用缓存工具的某些带成功操作返回值的操作(比如Redis的SETNX或者Memcache的ADD)去set一个mutex key,当操作返回成功时,再进行load db的操作并回设缓存;否则,就重试整个get缓存的方法。

    SETNX,是「SET if Not eXists」的缩写,也就是只有不存在的时候才设置,可以利用它来实现锁的效果。在redis2.6.1之前版本未实现setnx的过期时间,所以这里给出两种版本代码参考:

    //2.6.1前单机版本锁
    String get(String key) {  
       String value = redis.get(key);  
       if (value  == null) {  
        if (redis.setnx(key_mutex, "1")) {  
            // 3 min timeout to avoid mutex holder crash  
            redis.expire(key_mutex, 3 * 60)  
            value = db.get(key);  
            redis.set(key, value);  
            redis.delete(key_mutex);  
        } else {  
            //其他线程休息50毫秒后重试  
            Thread.sleep(50);  
            get(key);  
        }  
      }  
     
    }
    

    最新版本代码:

     
    public String get(key) {
          String value = redis.get(key);
          if (value == null) { //代表缓存值过期
              //设置3min的超时,防止del操作失败的时候,下次缓存过期一直不能load db
              if (redis.setnx(key_mutex, 1, 3 * 60) == 1) {  //代表设置成功
                   value = db.get(key);
                          redis.set(key, value, expire_secs);
                          redis.del(key_mutex);
                  } else {  //这个时候代表同时候的其他线程已经load db并回设到缓存了,这时候重试获取缓存值即可
                          sleep(50);
                          get(key);  //重试
                  }
              } else {
                  return value;      
              }
     }
    
    public String getWithLock(String key, Jedis jedis, String lockKey, String uniqueId, long expireTime) {
        // 通过key获取value
        String value = redisService.get(key);
        if (StringUtil.isEmpty(value)) {
            // 分布式锁,详细可以参考https://blog.csdn.net/fanrenxiang/article/details/79803037
            //封装的tryDistributedLock包括setnx和expire两个功能,在低版本的redis中不支持
            try {
                boolean locked = redisService.tryDistributedLock(jedis, lockKey, uniqueId, expireTime);
                if (locked) {
                    value = userService.getById(key);
                    redisService.set(key, value);
                    redisService.del(lockKey);
                    return value;
                } else {
                    // 其它线程进来了没获取到锁便等待50ms后重试
                    Thread.sleep(50);
                    getWithLock(key, jedis, lockKey, uniqueId, expireTime);
                }
            } catch (Exception e) {
                log.error("getWithLock exception=" + e);
                return value;
            } finally {
                redisService.releaseDistributedLock(jedis, lockKey, uniqueId);
            }
        }
        return value;
     
    }
    

    这样做思路比较清晰,也从一定程度上减轻数据库压力,但是锁机制使得逻辑的复杂度增加,吞吐量也降低了,有点治标不治本。

    2. "提前"使用互斥锁(mutex key):

    在value内部设置1个超时值(timeout1), timeout1比实际的memcache timeout(timeout2)小。当从cache读取到timeout1发现它已经过期时候,马上延长timeout1并重新设置到cache。然后再从数据库加载数据并设置到cache中。伪代码如下:

     
    v = memcache.get(key);  
    if (v == null) {  
        if (memcache.add(key_mutex, 3 * 60 * 1000) == true) {  
            value = db.get(key);  
            memcache.set(key, value);  
            memcache.delete(key_mutex);  
        } else {  
            sleep(50);  
            retry();  
        }  
    } else {  
        if (v.timeout <= now()) {  
            if (memcache.add(key_mutex, 3 * 60 * 1000) == true) {  
                // extend the timeout for other threads  
                v.timeout += 3 * 60 * 1000;  
                memcache.set(key, v, KEY_TIMEOUT * 2);  
      
                // load the latest value from db  
                v = db.get(key);  
                v.timeout = KEY_TIMEOUT;  
                memcache.set(key, value, KEY_TIMEOUT * 2);  
                memcache.delete(key_mutex);  
            } else {  
                sleep(50);  
                retry();  
            }  
        }  
     
    }
    

    3、接口限流与熔断、降级
    重要的接口一定要做好限流策略,防止用户恶意刷接口,同时要降级准备,当接口中的某些服务不可用时候,进行熔断,失败快速返回机制。

    相关文章

      网友评论

          本文标题:面试问题之redis缓存穿透,缓存击穿,缓存雪崩解决方案分析

          本文链接:https://www.haomeiwen.com/subject/xasxnktx.html