美文网首页
GuavaCache缓存架构和过期元素清理

GuavaCache缓存架构和过期元素清理

作者: monk87 | 来源:发表于2019-10-06 14:40 被阅读0次

    build创建缓存

    static Cache<String, String> cache = CacheBuilder.newBuilder()
                .maximumSize(40000)
                .expireAfterWrite(3, TimeUnit.SECONDS)
                .expireAfterAccess(3, TimeUnit.SECONDS)
                .removalListener(notification -> {
                    System.out.println(Thread.currentThread().getName() + " -> remove : " + notification.getKey());
                    return;
                }).build();
    

    build一个cache实例,实例内部是由多个segemnt组成.上面的参数 会创建4个segment ,并且买个segment最大容量是10000.代码如下:

    //new
    public <K1 extends K, V1 extends V> Cache<K1, V1> build() {
        checkWeightWithWeigher();
        checkNonLoadingCache();
        //↓↓↓↓
        return new LocalCache.LocalManualCache<K1, V1>(this);
      }
    //manual cache  
     LocalManualCache(CacheBuilder<? super K, ? super V> builder) {
         //↓↓↓
         this(new LocalCache<K, V>(builder, null));
     }
    //核心创建逻辑
    LocalCache(
          CacheBuilder<? super K, ? super V> builder, @Nullable CacheLoader<? super K, V> loader) {
        concurrencyLevel = Math.min(builder.getConcurrencyLevel(), MAX_SEGMENTS);
    
        keyStrength = builder.getKeyStrength();
        valueStrength = builder.getValueStrength();
    
        keyEquivalence = builder.getKeyEquivalence();
        valueEquivalence = builder.getValueEquivalence();
    
        maxWeight = builder.getMaximumWeight();
        weigher = builder.getWeigher();
        expireAfterAccessNanos = builder.getExpireAfterAccessNanos();
        expireAfterWriteNanos = builder.getExpireAfterWriteNanos();
        refreshNanos = builder.getRefreshNanos();
        //创建的listener 此处加入
        removalListener = builder.getRemovalListener();
        removalNotificationQueue =
            (removalListener == NullListener.INSTANCE)
                ? LocalCache.<RemovalNotification<K, V>>discardingQueue()
                : new ConcurrentLinkedQueue<RemovalNotification<K, V>>();
    
        ticker = builder.getTicker(recordsTime());
        entryFactory = EntryFactory.getFactory(keyStrength, usesAccessEntries(), usesWriteEntries());
        globalStatsCounter = builder.getStatsCounterSupplier().get();
        defaultLoader = loader;
    
        int initialCapacity = Math.min(builder.getInitialCapacity(), MAXIMUM_CAPACITY);
        if (evictsBySize() && !customWeigher()) {
          initialCapacity = Math.min(initialCapacity, (int) maxWeight);
        }
    
        // Find the lowest power-of-two segmentCount that exceeds concurrencyLevel, unless
        // maximumSize/Weight is specified in which case ensure that each segment gets at least 10
        // entries. The special casing for size-based eviction is only necessary because that eviction
        // happens per segment instead of globally, so too many segments compared to the maximum size
        // will result in random eviction behavior.
        int segmentShift = 0;
        int segmentCount = 1;
        //这里进行segment的计算,最终得到 segment=4
        while (segmentCount < concurrencyLevel && (!evictsBySize() || segmentCount * 20 <= maxWeight)) {
          ++segmentShift;
          segmentCount <<= 1;
        }
        this.segmentShift = 32 - segmentShift;
        segmentMask = segmentCount - 1;
        //创建segemnt数组
        this.segments = newSegmentArray(segmentCount);
        //这里计算segment的size , 总build中分成四份
        int segmentCapacity = initialCapacity / segmentCount;
        if (segmentCapacity * segmentCount < initialCapacity) {
          ++segmentCapacity;
        }
    
        int segmentSize = 1;
        while (segmentSize < segmentCapacity) {
          segmentSize <<= 1;
        }
        //是否根据最大容量进行逐出 ,本实例是按照容量逐出的
        if (evictsBySize()) {
          // Ensure sum of segment max weights = overall max weights
          long maxSegmentWeight = maxWeight / segmentCount + 1;
          long remainder = maxWeight % segmentCount;
          for (int i = 0; i < this.segments.length; ++i) {
            if (i == remainder) {
              maxSegmentWeight--;
            }
              //对segment中的对象进行初始化 ,
            this.segments[i] =
                createSegment(segmentSize, maxSegmentWeight, builder.getStatsCounterSupplier().get());
          }
        } else {
          for (int i = 0; i < this.segments.length; ++i) {
              //如果不是按照容量进行逐出,则第二个参数为-1(UNSET_INT) ,表示无上限的限制
            this.segments[i] =
                createSegment(segmentSize, UNSET_INT, builder.getStatsCounterSupplier().get());
          }
        }
      }
    

    总结: Guava通过将LocalManualCache与LocalLoadingCache设计为LocalCache的静态内部类,并在LocalManualCache类中设置一个final修饰的LocalCache成员变量,在缓存器构建时完成对LocalCache成员变量的初始化,这样不管缓存器是Cache或LoadingCache类型,用户对缓存器的操作都可以转换为对LocalCache的操作。

    put 操作

    // put 发生了哪些动作
     cache.put("abc", "value");
    

    put的过程会首先对key进行hash找到他对应的segment,然后对segment进行预清理.然后才执行put ,put之后又会执行后续的其他清理.下面根据代码一步一步查看
    注意put的过程会对当前的segment进行过期key的清理操作

    //# LocalManualCache
    @Override
    public void put(K key, V value) {
      localCache.put(key, value);
    }
    
    // localcache 中的put
    @Override
    public V put(K key, V value) {
      checkNotNull(key);
      checkNotNull(value);
      int hash = hash(key);
      return segmentFor(hash).put(key, hash, value, false);
    }
    

    首先根据key找到他的segment

    /**
       * Returns the segment that should be used for a key with the given hash.
       *
       * @param hash the hash code for the key
       * @return the segment
       */
      Segment<K, V> segmentFor(int hash) {
        // TODO(fry): Lazily create segments?
        return segments[(hash >>> segmentShift) & segmentMask];
      }
    

    然后对segemnt进行put,segment中就是一个localcache对象 ,下面是他的构造函数

    //可以看到 ,segment构造的时候需要把localcache传入构造器
     Segment(
           LocalCache<K, V> map,
           int initialCapacity,
           long maxSegmentWeight,
           StatsCounter statsCounter) {
         this.map = map;
         this.maxSegmentWeight = maxSegmentWeight;
         this.statsCounter = checkNotNull(statsCounter);
         initTable(newEntryArray(initialCapacity));
    
         keyReferenceQueue = map.usesKeyReferences() ? new ReferenceQueue<K>() : null;
    
         valueReferenceQueue = map.usesValueReferences() ? new ReferenceQueue<V>() : null;
    
         recencyQueue =
             map.usesAccessQueue()
                 ? new ConcurrentLinkedQueue<ReferenceEntry<K, V>>()
                 : LocalCache.<ReferenceEntry<K, V>>discardingQueue();
    
         writeQueue =
             map.usesWriteQueue()
                 ? new WriteQueue<K, V>()
                 : LocalCache.<ReferenceEntry<K, V>>discardingQueue();
    
         accessQueue =
             map.usesAccessQueue()
                 ? new AccessQueue<K, V>()
                 : LocalCache.<ReferenceEntry<K, V>>discardingQueue();
       }
    

    下面是核心部分的put逻辑 ,重点关注注释的地方: 预清理,末尾通知

    // put执行过程中包含了 preWriteCleanup 和 finally中的postWriteCleanup
    @Nullable
        V put(K key, int hash, V value, boolean onlyIfAbsent) {
          lock();
          try {
            long now = map.ticker.read();
            //预清理动作,这个操作会清理掉当前segemnt中所有过期的元素
            preWriteCleanup(now);
    
            int newCount = this.count + 1;
            if (newCount > this.threshold) { // ensure capacity
              expand();
              newCount = this.count + 1;
            }
    
            AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
            int index = hash & (table.length() - 1);
            ReferenceEntry<K, V> first = table.get(index);
    
            // Look for an existing entry.
            for (ReferenceEntry<K, V> e = first; e != null; e = e.getNext()) {
              K entryKey = e.getKey();
              if (e.getHash() == hash
                  && entryKey != null
                  && map.keyEquivalence.equivalent(key, entryKey)) {
                // We found an existing entry.
    
                ValueReference<K, V> valueReference = e.getValueReference();
                V entryValue = valueReference.get();
    
                if (entryValue == null) {
                  ++modCount;
                  if (valueReference.isActive()) {
                    enqueueNotification(
                        key, hash, entryValue, valueReference.getWeight(), RemovalCause.COLLECTED);
                    setValue(e, key, value, now);
                    newCount = this.count; // count remains unchanged
                  } else {
                    setValue(e, key, value, now);
                    newCount = this.count + 1;
                  }
                  this.count = newCount; // write-volatile
                  evictEntries(e);
                  return null;
                } else if (onlyIfAbsent) {
                  // Mimic
                  // "if (!map.containsKey(key)) ...
                  // else return map.get(key);
                  recordLockedRead(e, now);
                  return entryValue;
                } else {
                  // clobber existing entry, count remains unchanged
                  ++modCount;
                  enqueueNotification(
                      key, hash, entryValue, valueReference.getWeight(), RemovalCause.REPLACED);
                  setValue(e, key, value, now);
                  evictEntries(e);
                  return entryValue;
                }
              }
            }
    
            // Create a new entry.
            ++modCount;
            ReferenceEntry<K, V> newEntry = newEntry(key, hash, first);
            setValue(newEntry, key, value, now);
            table.set(index, newEntry);
            newCount = this.count + 1;
            this.count = newCount; // write-volatile
            evictEntries(newEntry);
            return null;
          } finally {
            unlock();
            //这里面进行过期元素的通知 ,上面的预清理动作已经把过期 的元素加入到了,指定队列,这里面直接调用当前队列的listener.会触发用户的回调
            postWriteCleanup();
          }
        }
    

    下面着重看下 预清理通知

    @GuardedBy("this")
    void preWriteCleanup(long now) {
      runLockedCleanup(now);
    }
    //
     void runLockedCleanup(long now) {
      if (tryLock()) {
        try {
          drainReferenceQueues();
          //这个expire 方法是关键
          expireEntries(now); // calls drainRecencyQueue
          readCount.set(0);
        } finally {
          unlock();
        }
      }
    }
    
    //
    @GuardedBy("this")
    void expireEntries(long now) {
     drainRecencyQueue();
    
     ReferenceEntry<K, V> e;
     //这里面会循环当前队列的所有元素,判断每个元素是否已经过期 ,如果过期 会进行相应处理,重点关注他的 remvoeEntry方法
     while ((e = writeQueue.peek()) != null && map.isExpired(e, now)) {
       if (!removeEntry(e, e.getHash(), RemovalCause.EXPIRED)) {
         throw new AssertionError();
       }
     }
     while ((e = accessQueue.peek()) != null && map.isExpired(e, now)) {
       if (!removeEntry(e, e.getHash(), RemovalCause.EXPIRED)) {
         throw new AssertionError();
       }
     }
    }
    
    //removeentry中重点关注 removeValueFromChain 这里执行清理操作
    @VisibleForTesting
    @GuardedBy("this")
    boolean removeEntry(ReferenceEntry<K, V> entry, int hash, RemovalCause cause) {
      int newCount = this.count - 1;
      AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
      int index = hash & (table.length() - 1);
      ReferenceEntry<K, V> first = table.get(index);
    
      for (ReferenceEntry<K, V> e = first; e != null; e = e.getNext()) {
        if (e == entry) {
          ++modCount;
          ReferenceEntry<K, V> newFirst =
              removeValueFromChain(
                  first,
                  e,
                  e.getKey(),
                  hash,
                  e.getValueReference().get(),
                  e.getValueReference(),
                  cause);
          newCount = this.count - 1;
          table.set(index, newFirst);
          this.count = newCount; // write-volatile
          return true;
        }
      }
      return false;
    }
    
    //重点关注 enqueueNotification
    @GuardedBy("this")
    @Nullable
    ReferenceEntry<K, V> removeValueFromChain(
        ReferenceEntry<K, V> first,
        ReferenceEntry<K, V> entry,
        @Nullable K key,
        int hash,
        V value,
        ValueReference<K, V> valueReference,
        RemovalCause cause) {
      //这一句 ,会把要清理的元素 包装成一个通知 ,再put完成之后 ,finally的通知函数里面 ,进行逐个通知
      enqueueNotification(key, hash, value, valueReference.getWeight(), cause);
      writeQueue.remove(entry);
      accessQueue.remove(entry);
    
      if (valueReference.isLoading()) {
        valueReference.notifyNewValue(null);
        return first;
      } else {
        return removeEntryFromChain(first, entry);
      }
    }
    
    //本方法会把删除的key生成通知对象放入removalNotificationQueue中 ,通过offer方法
    @GuardedBy("this")
    void enqueueNotification(
        @Nullable K key, int hash, @Nullable V value, int weight, RemovalCause cause) {
      totalWeight -= weight;
      if (cause.wasEvicted()) {
        statsCounter.recordEviction();
      }
      if (map.removalNotificationQueue != DISCARDING_QUEUE) {
        RemovalNotification<K, V> notification = RemovalNotification.create(key, value, cause);
          // 生成notifycation , 放入通知队列,给后续的回调方法使用
        map.removalNotificationQueue.offer(notification);
      }
    }
    
    ////////////////预清理执行完毕/////////////////////////
    ////// 下面看下 通知逻辑  /////////////////////////////
    //put之后的finally中有代码 `postWriteCleanup`
    /**
    * Performs routine cleanup following a write.
    */
    void postWriteCleanup() {
      runUnlockedCleanup();
    }
    //
    void runUnlockedCleanup() {
     // locked cleanup may generate notifications we can send unlocked
      if (!isHeldByCurrentThread()) {
        map.processPendingNotifications();
      }
    }
    }
    //removalListener.onRemoval 这里面是回调用户穿进来的listenr代码
    void processPendingNotifications() {
    RemovalNotification<K, V> notification;
    while ((notification = removalNotificationQueue.poll()) != null) {
      try {
        removalListener.onRemoval(notification);
      } catch (Throwable e) {
        logger.log(Level.WARNING, "Exception thrown by removal listener", e);
      }
    }
    }
    

    总结 : 在put操作的时候,会进行过期元素的清理动作 .并且这个动作是再同一个线程中执行的 .但是清理的不是所有的cache,只是key所在的segemnt中的所有key

    get 操作

    get() 基本用法如下

    //允许传入一个loader.在缓存中没有命中的情况下,执行loader获取数据
    cache.get("adder", () -> {
        return "a";
    });
    

    get操作的基本流程是: 根据key首先hash出他对应的segment,然后去对应的segment获取对应的元素,这其中也会进行清理动作,过期的元素会被直接删除,并且发出通知. 代码如下

    //LocalManualCache
    @Override
    public V get(K key, final Callable<? extends V> valueLoader) throws ExecutionException {
        checkNotNull(valueLoader);
        // ↓↓↓↓
        return localCache.get(
            key,
            new CacheLoader<Object, V>() {
                @Override
                public V load(Object key) throws Exception {
                    return valueLoader.call();
                }
            });
    }
    //
     V get(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException {
         checkNotNull(key);
         checkNotNull(loader);
         try {
             if (count != 0) { // read-volatile
                 // don't call getLiveEntry, which would ignore loading values
                 ReferenceEntry<K, V> e = getEntry(key, hash);
                 if (e != null) {
                     long now = map.ticker.read();
                     V value = getLiveValue(e, now);
                     if (value != null) {
                         recordRead(e, now);
                         statsCounter.recordHits(1);
                         return scheduleRefresh(e, key, hash, value, now, loader);
                     }
                     ValueReference<K, V> valueReference = e.getValueReference();
                     if (valueReference.isLoading()) {
                         return waitForLoadingValue(e, key, valueReference);
                     }
                 }
             }
            //重点逻辑 ,get动作并且执行相应的清理
             //↓↓↓↓↓↓↓↓↓↓↓↓
             // 
             return lockedGetOrLoad(key, hash, loader);
         } catch (ExecutionException ee) {
             Throwable cause = ee.getCause();
             if (cause instanceof Error) {
                 throw new ExecutionError((Error) cause);
             } else if (cause instanceof RuntimeException) {
                 throw new UncheckedExecutionException(cause);
             }
             throw ee;
         } finally {
             postReadCleanup();
         }
     }
    
    //
    V lockedGetOrLoad(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException {
        ReferenceEntry<K, V> e;
        ValueReference<K, V> valueReference = null;
        LoadingValueReference<K, V> loadingValueReference = null;
        boolean createNewEntry = true;
    
        lock();
        try {
            // re-read ticker once inside the lock
            long now = map.ticker.read();
            //预清理 ,会直接清理过期的元素并且把过期的元素做成通知放入队列
            //等方法执行完毕,回调用户传入的listener
            preWriteCleanup(now);
    
            int newCount = this.count - 1;
            AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
            int index = hash & (table.length() - 1);
            ReferenceEntry<K, V> first = table.get(index);
    
            for (e = first; e != null; e = e.getNext()) {
                K entryKey = e.getKey();
                if (e.getHash() == hash
                    && entryKey != null
                    && map.keyEquivalence.equivalent(key, entryKey)) {
                    valueReference = e.getValueReference();
                    if (valueReference.isLoading()) {
                        createNewEntry = false;
                    } else {
                        V value = valueReference.get();
                        if (value == null) {
                            enqueueNotification(
                                entryKey, hash, value, valueReference.getWeight(), RemovalCause.COLLECTED);
                        } else if (map.isExpired(e, now)) {
                            // This is a duplicate check, as preWriteCleanup already purged expired
                            // entries, but let's accomodate an incorrect expiration queue.
                            enqueueNotification(
                                entryKey, hash, value, valueReference.getWeight(), RemovalCause.EXPIRED);
                        } else {
                            recordLockedRead(e, now);
                            statsCounter.recordHits(1);
                            // we were concurrent with loading; don't consider refresh
                            return value;
                        }
    
                        // immediately reuse invalid entries
                        writeQueue.remove(e);
                        accessQueue.remove(e);
                        this.count = newCount; // write-volatile
                    }
                    break;
                }
            }
    
            if (createNewEntry) {
                loadingValueReference = new LoadingValueReference<K, V>();
    
                if (e == null) {
                    e = newEntry(key, hash, first);
                    e.setValueReference(loadingValueReference);
                    table.set(index, e);
                } else {
                    e.setValueReference(loadingValueReference);
                }
            }
        } finally {
            unlock();
            //进行清理之后的通知, 执行用户传入的listener
            postWriteCleanup();
        }
    
        if (createNewEntry) {
            try {
                // Synchronizes on the entry to allow failing fast when a recursive load is
                // detected. This may be circumvented when an entry is copied, but will fail fast most
                // of the time.
                synchronized (e) {
                    return loadSync(key, hash, loadingValueReference, loader);
                }
            } finally {
                statsCounter.recordMisses(1);
            }
        } else {
            // The entry already exists. Wait for loading.
            return waitForLoadingValue(e, key, valueReference);
        }
    }
    

    总结: 根据上面的get的核心逻辑 ,重点看下 预清理和通知, 清理和通知的逻辑参看 上文 put的逻辑.根据这个逻辑可以总结出: put操作和get操作都会执行过期元素的检查清理和通知,并且是再同一个线程中执行相关的操作. 并没有另外开启线程.

    getIfPresent 操作

    本方法的意思是 ,如果缓存中有指定的key就直接返回, 否则返回null.另外清理操作是怎么实现的 ? 和put/get是一样的吗 ?

    //用法示例
    cache.getIfPresent("test");
    //如果不存在,返回null
    

    下面看一下他的获取逻辑,是否和get操作保持一致的 ?

    @Override
    @Nullable
    public V getIfPresent(Object key) {
        //↓↓↓↓↓↓↓
        return localCache.getIfPresent(key);
    }
    //可以看到,也是先获取hash找到对应的段(segment)的位置
    @Nullable
    public V getIfPresent(Object key) {
        int hash = hash(checkNotNull(key));
        //↓↓↓↓↓↓↓
        V value = segmentFor(hash).get(key, hash);
        if (value == null) {
            globalStatsCounter.recordMisses(1);
        } else {
            globalStatsCounter.recordHits(1);
        }
        return value;
    }
    //
    

    下面看看 get操作的核心逻辑

    //关注 : postReadCleanup  ,由于再finally中,表示他肯定会被执行
    @Nullable
    V get(Object key, int hash) {
        try {
            if (count != 0) { // read-volatile
                long now = map.ticker.read();
                ReferenceEntry<K, V> e = getLiveEntry(key, hash, now);
                if (e == null) {
                    return null;
                }
    
                V value = e.getValueReference().get();
                if (value != null) {
                    recordRead(e, now);
                    return scheduleRefresh(e, e.getKey(), hash, value, now, map.defaultLoader);
                }
                tryDrainReferenceQueues();
            }
            return null;
        } finally {
            //也执行的清理的动作 那是不是表示 每次get也会清理过期的keys?
            postReadCleanup();
        }
    }
    

    postReadCleanup的清理逻辑

    //这里面有一个 readCount,每次get一次,这个值都会增加 
    //并且这个是每个segment独有的 .每个segment自己递增
    void postReadCleanup() {
        if ((readCount.incrementAndGet() & DRAIN_THRESHOLD) == 0) {
            cleanUp();
        }
    }
    //cleanup逻辑
     void cleanUp() {
         long now = map.ticker.read();
         //执行清理逻辑
         runLockedCleanup(now);
         //发送清理通知
         runUnlockedCleanup();
     }
    

    从上面的代码可以看到 虽然每次都执行清理,但是有一个前置条件 : readCount.incrementAndGet() & DRAIN_THRESHOLD == 0 满足条件才能执行清理 .根据代码发现 DRAIN_THRESHOLD为64, 也就是说执行64次getifpresent 才会执行一次清理 .

    总结: 和put get 操作不同 getIfPresent并不是每次都执行过期key的维护,而是执行方法64次才执行一次缓存清理.

    相关文章

      网友评论

          本文标题:GuavaCache缓存架构和过期元素清理

          本文链接:https://www.haomeiwen.com/subject/jrpupctx.html