guava Cache源码分析(二)

作者: 一只小哈 | 来源:发表于2016-12-24 17:47 被阅读1157次

    一、Guava的设计思想###

    之前一篇短文,简要的概括了一下GuavaCache具有的一些特性。例如像缓存淘汰、删除监听和缓存刷新等。这次主要写一些Guava Cache是怎样实现这些特性的。
    GuavaCache的源码在 https://github.com/google/guava
    GuavaCache的设计是类似与ConcurrentHashMap的,主要靠锁的细化,来减小并发,同时通过Hash算法来加快检索速度。但是GuavaCahce和ConcurrentHash不同的是GuavaCache要支持很多的Cache特性,所以设计上还是很比较复杂的。

    二、源码的分析###

    这里我们主要以LoadingCache为例子来分析GuavaCache的结构和实现,首先Wiki的例子是:

    LoadingCache<Key, Graph> graphs = CacheBuilder.newBuilder()
           .maximumSize(1000)
           .expireAfterWrite(10, TimeUnit.MINUTES)
           .removalListener(MY_LISTENER)
           .build(
               new CacheLoader<Key, Graph>() {
                 public Graph load(Key key) throws AnyException {
                   return createExpensiveGraph(key);
                 }
               });
    

    这里GuavaCache主要采用builder的模式,CacheBuilder的每一个方法都返回这个CacheBuilder知道build方法的调用。
    那么我们先看一下CacheBuilder的各个方法:

       /**
       *
       * 指定一个Cahce的大小上限,当Cache中的数据将要达到上限的时候淘汰掉不常用的。
       * Specifies the maximum number of entries the cache may contain. Note that the cache <b>may evict
       * an entry before this limit is exceeded</b>. As the cache size grows close to the maximum, the
       * cache evicts entries that are less likely to be used again. For example, the cache may evict an
       * entry because it hasn't been used recently or very often.
       *
       * <p>When {@code size} is zero, elements will be evicted immediately after being loaded into the
       * cache. This can be useful in testing, or to disable caching temporarily without a code change.
       *
       * <p>This feature cannot be used in conjunction with {@link #maximumWeight}.
       *
       * @param size the maximum size of the cache
       * @return this {@code CacheBuilder} instance (for chaining)
       * @throws IllegalArgumentException if {@code size} is negative
       * @throws IllegalStateException if a maximum size or weight was already set
       */
      public CacheBuilder<K, V> maximumSize(long size) {
        checkState(
            this.maximumSize == UNSET_INT, "maximum size was already set to %s", this.maximumSize);
        checkState(
            this.maximumWeight == UNSET_INT,
            "maximum weight was already set to %s",
            this.maximumWeight);
        checkState(this.weigher == null, "maximum size can not be combined with weigher");
        checkArgument(size >= 0, "maximum size must not be negative");
        this.maximumSize = size;
        return this;
    

    状态检测之后就是执行了一个赋值操作。
    同理

     public CacheBuilder<K, V> expireAfterWrite(long duration, TimeUnit unit) {
        checkState(
            expireAfterWriteNanos == UNSET_INT,
            "expireAfterWrite was already set to %s ns",
            expireAfterWriteNanos);
        checkArgument(duration >= 0, "duration cannot be negative: %s %s", duration, unit);
        this.expireAfterWriteNanos = unit.toNanos(duration);
        return this;
      }
    
      public <K1 extends K, V1 extends V> CacheBuilder<K1, V1> removalListener(
          RemovalListener<? super K1, ? super V1> listener) {
        checkState(this.removalListener == null);
    
        // safely limiting the kinds of caches this can produce
        @SuppressWarnings("unchecked")
        CacheBuilder<K1, V1> me = (CacheBuilder<K1, V1>) this;
        me.removalListener = checkNotNull(listener);
        return me;
      }
    

    执行build方法:

      public <K1 extends K, V1 extends V> LoadingCache<K1, V1> build(
          CacheLoader<? super K1, V1> loader) {
        checkWeightWithWeigher();
        return new LocalCache.LocalLoadingCache<K1, V1>(this, loader);
      }
    

    这里主要返回一个LocalCache.LocalLoadingCache,这是LocalCache的一个内部类,到这里GuavaCahce真正的存储结构出现了,LocalLoadingCache继承了LocalManualCache实现了LoadingCache接口。实例化的时候,根据CacheBuilder构建了一个LocalCache,而LoadingCache和LocalManualCache只是在LocalCache上做了代理。

    LocalLoadingCache(    CacheBuilder<? super K, ? super V> builder, CacheLoader<? super K, V> loader) {  super(new LocalCache<K, V>(builder, checkNotNull(loader)));}
    
    private LocalManualCache(LocalCache<K, V> localCache) {  this.localCache = localCache;}
    

    那么LocalCache的构建是什么样的呢?

      LocalCache(
          CacheBuilder<? super K, ? super V> builder, @Nullable CacheLoader<? super K, V> loader) {
        //并发度,seg的个数
        concurrencyLevel = Math.min(builder.getConcurrencyLevel(), MAX_SEGMENTS);
        //key强弱关系
        keyStrength = builder.getKeyStrength();
        //value的强弱关系
        valueStrength = builder.getValueStrength();
        //比较器,类似于Object.equal
        keyEquivalence = builder.getKeyEquivalence();
        valueEquivalence = builder.getValueEquivalence();
        //最大权重,weigher为null那么maxWeight=maxsize
        maxWeight = builder.getMaximumWeight();
        //entry的权重,用于淘汰策略
        weigher = builder.getWeigher();
        //lastAccess之后多长时间删除
        expireAfterAccessNanos = builder.getExpireAfterAccessNanos();
        //在写入后长时间之后删除
        expireAfterWriteNanos = builder.getExpireAfterWriteNanos();
        //刷新的时间间隔
        refreshNanos = builder.getRefreshNanos();
        //entry删除之后的Listener
        removalListener = builder.getRemovalListener();
        //删除监听的队列
        removalNotificationQueue =
            (removalListener == NullListener.INSTANCE)
                ? LocalCache.<RemovalNotification<K, V>>discardingQueue()
                : new ConcurrentLinkedQueue<RemovalNotification<K, V>>();
        //时钟
        ticker = builder.getTicker(recordsTime());
        //创建Entry的Factory
        entryFactory = EntryFactory.getFactory(keyStrength, usesAccessEntries(), usesWriteEntries());
        //缓存的状态统计器,用于统计缓存命中率等
        globalStatsCounter = builder.getStatsCounterSupplier().get();
    
        //加载数据的Loader
        defaultLoader = loader;
    
        //初始化HashTable的容量
        int initialCapacity = Math.min(builder.getInitialCapacity(), MAXIMUM_CAPACITY);
    
        //没有设置权重设置但是有maxsize的设置,那么需要减小容量的设置
        if (evictsBySize() && !customWeigher()) {
          initialCapacity = Math.min(initialCapacity, (int) maxWeight);
        }
    
        // Find the lowest power-of-two segmentCount that exceeds concurrencyLevel, unless
        // maximumSize/Weight is specified in which case ensure that each segment gets at least 10
        // entries. The special casing for size-based eviction is only necessary because that eviction
        // happens per segment instead of globally, so too many segments compared to the maximum size
        // will result in random eviction behavior.
    
        //类似于ConcurentHashMap
        int segmentShift = 0;//seg的掩码
        int segmentCount = 1;//seg的个数
        //如果seg的个数事故小于并发度的
        //初始化并发度为4,默认的maxWeight是-1,默认是不驱逐
        while (segmentCount < concurrencyLevel && (!evictsBySize() || segmentCount * 20 <= maxWeight)) {
          ++segmentShift;
          segmentCount <<= 1;
        }
        this.segmentShift = 32 - segmentShift;
        segmentMask = segmentCount - 1;
    
        this.segments = newSegmentArray(segmentCount);
    
        int segmentCapacity = initialCapacity / segmentCount;
        if (segmentCapacity * segmentCount < initialCapacity) {
          ++segmentCapacity;
        }
    
        int segmentSize = 1;
        while (segmentSize < segmentCapacity) {
          segmentSize <<= 1;
        }
        //默认不驱逐
        if (evictsBySize()) {
          // Ensure sum of segment max weights = overall max weights
          long maxSegmentWeight = maxWeight / segmentCount + 1;
          long remainder = maxWeight % segmentCount;
          for (int i = 0; i < this.segments.length; ++i) {
            if (i == remainder) {
              maxSegmentWeight--;
            }
            this.segments[i] =
                createSegment(segmentSize, maxSegmentWeight, builder.getStatsCounterSupplier().get());
          }
        } else {
          //为每一个Segment进行初始化
          for (int i = 0; i < this.segments.length; ++i) {
            this.segments[i] =
                createSegment(segmentSize, UNSET_INT, builder.getStatsCounterSupplier().get());
          }
        }
      }
    

    初始化的时候初始化一些配置等,可以看到和ConcurrentHashMap基本一致,但是引入了一些其他的概念。

    那么回过头看一下,最关键的两个方法,首先是put方法:

        @Override
        public void put(K key, V value) {
          localCache.put(key, value);
        }
      /**
       * 代理到Segment的put方法
       * @param key
       * @param value
       * @return
       */
      @Override
      public V put(K key, V value) {
        checkNotNull(key);
        checkNotNull(value);
        int hash = hash(key);
        return segmentFor(hash).put(key, hash, value, false);
      }
          @Nullable
        V put(K key, int hash, V value, boolean onlyIfAbsent) {
          //保证线程安全,加锁
          lock();
          try {
            //获取当前的时间
            long now = map.ticker.read();
            //清除队列中的元素
            preWriteCleanup(now);
            //localCache的Count+1
            int newCount = this.count + 1;
            //扩容操作
            if (newCount > this.threshold) { // ensure capacity
              expand();
              newCount = this.count + 1;
            }
            //获取当前Entry中的HashTable的Entry数组
            AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
            //定位
            int index = hash & (table.length() - 1);
            //获取第一个元素
            ReferenceEntry<K, V> first = table.get(index);
            //遍历整个Entry链表
            // Look for an existing entry.
            for (ReferenceEntry<K, V> e = first; e != null; e = e.getNext()) {
              K entryKey = e.getKey();
              if (e.getHash() == hash
                  && entryKey != null
                  && map.keyEquivalence.equivalent(key, entryKey)) {
                // We found an existing entry.
                //如果找到相应的元素
                ValueReference<K, V> valueReference = e.getValueReference();
                //获取value
                V entryValue = valueReference.get();
                //如果entry的value为null,可能被GC掉了
                if (entryValue == null) {
                  ++modCount;
                  if (valueReference.isActive()) {
                    enqueueNotification( //减小锁时间的开销
                        key, hash, entryValue, valueReference.getWeight(), RemovalCause.COLLECTED);
                    //利用原来的key并且刷新value
                    setValue(e, key, value, now);//存储数据,并且将新增加的元素写入两个队列中
                    newCount = this.count; // count remains unchanged
                  } else {
                    setValue(e, key, value, now);//存储数据,并且将新增加的元素写入两个队列中
                    newCount = this.count + 1;
                  }
                  this.count = newCount; // write-volatile,保证内存可见性
                  //淘汰缓存
                  evictEntries(e);
                  return null;
                } else if (onlyIfAbsent) {//原来的Entry中包含指定key的元素,所以读取一次,读取操作需要更新Access队列
                  // Mimic
                  // "if (!map.containsKey(key)) ...
                  // else return map.get(key);
                  recordLockedRead(e, now);
                  return entryValue;
                } else {
                  //如果value不为null,那么更新value
                  // clobber existing entry, count remains unchanged
                  ++modCount;
                  //将replace的Cause添加到队列中
                  enqueueNotification(
                      key, hash, entryValue, valueReference.getWeight(), RemovalCause.REPLACED);
                  setValue(e, key, value, now);//存储数据,并且将新增加的元素写入两个队列中
                  //数据的淘汰
                  evictEntries(e);
                  return entryValue;
                }
              }
            }
            //如果目标的entry不存在,那么新建entry
            // Create a new entry.
            ++modCount;
            ReferenceEntry<K, V> newEntry = newEntry(key, hash, first);
            setValue(newEntry, key, value, now);
            table.set(index, newEntry);
            newCount = this.count + 1;
            this.count = newCount; // write-volatile
            //淘汰多余的entry
            evictEntries(newEntry);
            return null;
          } finally {
            //解锁
            unlock();
            //处理刚刚的remove Cause
            postWriteCleanup();
          }
        }
    
    

    代码比较长,看上去是比较恶心的,注释写了一些,那么重点说几个注意的点:

    1. 加锁,和ConcurrentHashMap一样,加锁是为了保证线程安全。
    2. preWriteCleanup:在每一次做put之前都要清理一下,清理什么?看下代码:
        @GuardedBy("this")
        void preWriteCleanup(long now) {
          runLockedCleanup(now);
        }
        void runLockedCleanup(long now) {
          if (tryLock()) {
            try {
              drainReferenceQueues();
              expireEntries(now); // calls drainRecencyQueue
              readCount.set(0);
            } finally {
              unlock();
            }
          }
        }
        @GuardedBy("this")
        void drainReferenceQueues() {
          if (map.usesKeyReferences()) {
            drainKeyReferenceQueue();
          }
          if (map.usesValueReferences()) {
            drainValueReferenceQueue();
          }
        }
        @GuardedBy("this")
        void drainKeyReferenceQueue() {
          Reference<? extends K> ref;
          int i = 0;
          while ((ref = keyReferenceQueue.poll()) != null) {
            @SuppressWarnings("unchecked")
            ReferenceEntry<K, V> entry = (ReferenceEntry<K, V>) ref;
            map.reclaimKey(entry);
            if (++i == DRAIN_MAX) {
              break;
            }
          }
        }
    
    

    看上去可能有点懵,其实它要做的就是清空两个队列keyReferenceQueue和valueReferenceQueue,这两个队列是什么东西?其实是引用使用队列。
    GuavaCache为了支持弱引用和软引用,引入了引用清空队列。同时将key和Value包装成了keyReference和valueReference。
    在创建Entry的时候:

        @GuardedBy("this")
        ReferenceEntry<K, V> newEntry(K key, int hash, @Nullable ReferenceEntry<K, V> next) {
          return map.entryFactory.newEntry(this, checkNotNull(key), hash, next);
        }
    

    利用map.entryFactory创建Entry。Factory的初始化是通过

    entryFactory = EntryFactory.getFactory(keyStrength, usesAccessEntries(), usesWriteEntries());
    

    实现的。keyStrength是我们在初始化时指定的引用强度。可选的有工厂有:

        static final EntryFactory[] factories = {
          STRONG,
          STRONG_ACCESS,
          STRONG_WRITE,
          STRONG_ACCESS_WRITE,
          WEAK,
          WEAK_ACCESS,
          WEAK_WRITE,
          WEAK_ACCESS_WRITE,
        };
    

    通过相应的工厂创建对应的Entry,这里主要说一下WeakEntry:

        WEAK {
          @Override
          <K, V> ReferenceEntry<K, V> newEntry(
              Segment<K, V> segment, K key, int hash, @Nullable ReferenceEntry<K, V> next) {
            return new WeakEntry<K, V>(segment.keyReferenceQueue, key, hash, next);
          }
        },
    
      /**
       * Used for weakly-referenced keys.
       */
      static class WeakEntry<K, V> extends WeakReference<K> implements ReferenceEntry<K, V> {
        WeakEntry(ReferenceQueue<K> queue, K key, int hash, @Nullable ReferenceEntry<K, V> next) {
          super(key, queue);
          this.hash = hash;
          this.next = next;
        }
    
        @Override
        public K getKey() {
          return get();
        }
    
        /*
         * It'd be nice to get these for free from AbstractReferenceEntry, but we're already extending
         * WeakReference<K>.
         */
    
        // null access
    
        @Override
        public long getAccessTime() {
          throw new UnsupportedOperationException();
        }
    
        @Override
        public void setAccessTime(long time) {
          throw new UnsupportedOperationException();
        }
    
        @Override
        public ReferenceEntry<K, V> getNextInAccessQueue() {
          throw new UnsupportedOperationException();
        }
    
        @Override
        public void setNextInAccessQueue(ReferenceEntry<K, V> next) {
          throw new UnsupportedOperationException();
        }
    
        @Override
        public ReferenceEntry<K, V> getPreviousInAccessQueue() {
          throw new UnsupportedOperationException();
        }
    
        @Override
        public void setPreviousInAccessQueue(ReferenceEntry<K, V> previous) {
          throw new UnsupportedOperationException();
        }
    
        // null write
    
        @Override
        public long getWriteTime() {
          throw new UnsupportedOperationException();
        }
    
        @Override
        public void setWriteTime(long time) {
          throw new UnsupportedOperationException();
        }
    
        @Override
        public ReferenceEntry<K, V> getNextInWriteQueue() {
          throw new UnsupportedOperationException();
        }
    
        @Override
        public void setNextInWriteQueue(ReferenceEntry<K, V> next) {
          throw new UnsupportedOperationException();
        }
    
        @Override
        public ReferenceEntry<K, V> getPreviousInWriteQueue() {
          throw new UnsupportedOperationException();
        }
    
        @Override
        public void setPreviousInWriteQueue(ReferenceEntry<K, V> previous) {
          throw new UnsupportedOperationException();
        }
    
        // The code below is exactly the same for each entry type.
    
        final int hash;
        final ReferenceEntry<K, V> next;
        volatile ValueReference<K, V> valueReference = unset();
    
        @Override
        public ValueReference<K, V> getValueReference() {
          return valueReference;
        }
    
        @Override
        public void setValueReference(ValueReference<K, V> valueReference) {
          this.valueReference = valueReference;
        }
    
        @Override
        public int getHash() {
          return hash;
        }
    
        @Override
        public ReferenceEntry<K, V> getNext() {
          return next;
        }
      }
    

    WeakEntry继承了WeakReference实现了ReferenceEntry,也就是说这个引用是弱引用。WeakEntry引用的key和Value随时可能会被回收。构造的时候参数里面有ReferenceQueue<K> queue,这个就是我们上面提到的KeyReferenceQueue,所以在Key被GC掉的时候,会自动的将引用加入到ReferenceQueue这样我们就能处理对应的Entry了。Value也是一样的。是不是觉得十分牛逼?
    回到正题清理KeyReferenceQueue:

        @GuardedBy("this")
        void drainKeyReferenceQueue() {
          Reference<? extends K> ref;
          int i = 0;
          while ((ref = keyReferenceQueue.poll()) != null) {
            @SuppressWarnings("unchecked")
            ReferenceEntry<K, V> entry = (ReferenceEntry<K, V>) ref;
            map.reclaimKey(entry);
            if (++i == DRAIN_MAX) {
              break;
            }
          }
        }
    
        void reclaimKey(ReferenceEntry<K, V> entry) {
        int hash = entry.getHash();
        segmentFor(hash).reclaimKey(entry, hash);
      }
    
        /**
         * Removes an entry whose key has been garbage collected.
         */
        boolean reclaimKey(ReferenceEntry<K, V> entry, int hash) {
          lock();
          try {
            int newCount = count - 1;
            AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
            int index = hash & (table.length() - 1);
            ReferenceEntry<K, V> first = table.get(index);
    
            for (ReferenceEntry<K, V> e = first; e != null; e = e.getNext()) {
              if (e == entry) {
                ++modCount;
                ReferenceEntry<K, V> newFirst =
                    removeValueFromChain(
                        first,
                        e,
                        e.getKey(),
                        hash,
                        e.getValueReference().get(),
                        e.getValueReference(),
                        RemovalCause.COLLECTED);
                newCount = this.count - 1;
                table.set(index, newFirst);
                this.count = newCount; // write-volatile
                return true;
              }
            }
    
            return false;
          } finally {
            unlock();
            postWriteCleanup();
          }
        }
    
    

    上面就是清理过程了,如果发现key或者value被GC了,那么会在put的时候触发清理。
    3.setValue都干了什么?setValue其实是将value写入Entry,但是这是一个写操作,所以会刷新上一次写的时间,但是这是根据什么维护的呢?

        /**
         * Sets a new value of an entry. Adds newly created entries at the end of the access queue.
         */
        @GuardedBy("this")
        void setValue(ReferenceEntry<K, V> entry, K key, V value, long now) {
          ValueReference<K, V> previous = entry.getValueReference();
          int weight = map.weigher.weigh(key, value);
          checkState(weight >= 0, "Weights must be non-negative");
    
          ValueReference<K, V> valueReference =
              map.valueStrength.referenceValue(this, entry, value, weight);
          entry.setValueReference(valueReference);
          //写入队列
          recordWrite(entry, weight, now);
          previous.notifyNewValue(value);
        }
    
            /**
         * Updates eviction metadata that {@code entry} was just written. This currently amounts to
         * adding {@code entry} to relevant eviction lists.
         */
        @GuardedBy("this")
        void recordWrite(ReferenceEntry<K, V> entry, int weight, long now) {
          // we are already under lock, so drain the recency queue immediately
          drainRecencyQueue();
          totalWeight += weight;
    
          if (map.recordsAccess()) {
            entry.setAccessTime(now);
          }
          if (map.recordsWrite()) {
            entry.setWriteTime(now);
          }
          accessQueue.add(entry);
          writeQueue.add(entry);
        }
    

    其实GuavaCache会维护两个队列一个Write队列和一个Access队列,用这两个队列来实现最近读和最近写的清除操作,我们可以猜测这两个队列需要有序,同时也需要能快速定位元素。以Access队列为例:

      /**
       * A custom queue for managing access order. Note that this is tightly integrated with
       * {@code ReferenceEntry}, upon which it reliese to perform its linking.
       *
       * <p>Note that this entire implementation makes the assumption that all elements which are in the
       * map are also in this queue, and that all elements not in the queue are not in the map.
       *
       * <p>The benefits of creating our own queue are that (1) we can replace elements in the middle of
       * the queue as part of copyWriteEntry, and (2) the contains method is highly optimized for the
       * current model.
       */
      static final class AccessQueue<K, V> extends AbstractQueue<ReferenceEntry<K, V>> {
        final ReferenceEntry<K, V> head =
            new AbstractReferenceEntry<K, V>() {
    
              @Override
              public long getAccessTime() {
                return Long.MAX_VALUE;
              }
    
              @Override
              public void setAccessTime(long time) {}
    
              ReferenceEntry<K, V> nextAccess = this;
    
              @Override
              public ReferenceEntry<K, V> getNextInAccessQueue() {
                return nextAccess;
              }
    
              @Override
              public void setNextInAccessQueue(ReferenceEntry<K, V> next) {
                this.nextAccess = next;
              }
    
              ReferenceEntry<K, V> previousAccess = this;
    
              @Override
              public ReferenceEntry<K, V> getPreviousInAccessQueue() {
                return previousAccess;
              }
    
              @Override
              public void setPreviousInAccessQueue(ReferenceEntry<K, V> previous) {
                this.previousAccess = previous;
              }
            };
    
        // implements Queue
    
        @Override
        public boolean offer(ReferenceEntry<K, V> entry) {
          // unlink
          connectAccessOrder(entry.getPreviousInAccessQueue(), entry.getNextInAccessQueue());
    
          // add to tail
          connectAccessOrder(head.getPreviousInAccessQueue(), entry);
          connectAccessOrder(entry, head);
    
          return true;
        }
    
        @Override
        public ReferenceEntry<K, V> peek() {
          ReferenceEntry<K, V> next = head.getNextInAccessQueue();
          return (next == head) ? null : next;
        }
    
        @Override
        public ReferenceEntry<K, V> poll() {
          ReferenceEntry<K, V> next = head.getNextInAccessQueue();
          if (next == head) {
            return null;
          }
    
          remove(next);
          return next;
        }
          head.setNextInAccessQueue(head);
          head.setPreviousInAccessQueue(head);
        }
        }
      }
    

    重点关注几个点:offer方法,offer主要做了几个事情:
    1.将Entry和它的前节点后节点的关联断开,这样就需要Entry中维护它的前向和后向引用。
    2.将新增加的节点加入到队列的尾部,寻找尾节点用了head.getPreviousInAccessQueue()。可以看出来是个环形队列。
    3.将新增加的节点,或者新调整出来的节点设为尾部节点。

    通过这几点,可以得知,最近更新的节点一定是在尾部的,head后面的节点一定是不活跃的,在每一次清除过期节点的时候一定清除head之后的超时的节点,这点可以通过poll进行验证。

    Write队列也是同理。也就是每次写入操作都会更新元素的引用和写入的时间,并且更新元素在读写队列中的位置。我又一次感觉它挺牛逼的。

    4.evictEntries(e),item的淘汰,这个操作是在设置了Cache中能缓存最大条目的前提下触发的:

        /**
         * Performs eviction if the segment is over capacity. Avoids flushing the entire cache if the
         * newest entry exceeds the maximum weight all on its own.
         *
         * @param newest the most recently added entry
         */
        @GuardedBy("this")
        void evictEntries(ReferenceEntry<K, V> newest) {
          if (!map.evictsBySize()) {
            return;
          }
    
          drainRecencyQueue();
    
          // If the newest entry by itself is too heavy for the segment, don't bother evicting
          // anything else, just that
          if (newest.getValueReference().getWeight() > maxSegmentWeight) {
            if (!removeEntry(newest, newest.getHash(), RemovalCause.SIZE)) {
              throw new AssertionError();
            }
          }
    
          while (totalWeight > maxSegmentWeight) {
            ReferenceEntry<K, V> e = getNextEvictable();
            if (!removeEntry(e, e.getHash(), RemovalCause.SIZE)) {
              throw new AssertionError();
            }
          }
        }
    

    这里主要做了几件事,首先判断是否开启淘汰,之后呢清理RecencyQueue,然后判断新增加的元素是否有很大的权重,如果是那么直接删掉,因为它太重了。最后判断是否权重已经大于上限,如果是的话那么我们就清除最近最少有使用的Entry,直到Weight小于上限。

        // TODO(fry): instead implement this with an eviction head
        @GuardedBy("this")
        ReferenceEntry<K, V> getNextEvictable() {
          for (ReferenceEntry<K, V> e : accessQueue) {
            int weight = e.getValueReference().getWeight();
            if (weight > 0) {
              return e;
            }
          }
          throw new AssertionError();
        }
    

    这里比较容易疑惑的是:Weight是啥?其实如果不做设置Weight都是1,Weight上限就是maxSize。但是Guava允许自己定义Weight,那么上限就是maxWeight了。这部分可以看上面初始化部分。

    5.removeListener:removeListener可以看到,在元素被覆盖的时候后注册了一个事件,同时在finnally里面进行了一次清理:

    
        /**
       * Notifies listeners that an entry has been automatically removed due to expiration, eviction, or
       * eligibility for garbage collection. This should be called every time expireEntries or
       * evictEntry is called (once the lock is released).
       */
      void processPendingNotifications() {
        RemovalNotification<K, V> notification;
        while ((notification = removalNotificationQueue.poll()) != null) {
          try {
            removalListener.onRemoval(notification);
          } catch (Throwable e) {
            logger.log(Level.WARNING, "Exception thrown by removal listener", e);
          }
        }
      }
    

    可以看到为了减小put的开销,这里做了一个类似于异步的操作,并且在解锁之后做这样的操作来避免阻塞其他的put。

    关于Guava的Put操作就分析完了,的确是够复杂的。下面看一下get部分:

        // LoadingCache methods
        //local cache的代理
        @Override
        public V get(K key) throws ExecutionException {
          return localCache.getOrLoad(key);
        }
    
          /**
       * 根据key获取value,如果获取不到进行load
       * @param key
       * @return
       * @throws ExecutionException
         */
      V getOrLoad(K key) throws ExecutionException {
        return get(key, defaultLoader);
      }
    
        V get(K key, CacheLoader<? super K, V> loader) throws ExecutionException {
        int hash = hash(checkNotNull(key));//hash——>rehash
        return segmentFor(hash).get(key, hash, loader);
      }
    
      // loading
        //进行指定key对应的value的获取,读取不加锁
        V get(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException {
          //保证key-value不为null
          checkNotNull(key);
          checkNotNull(loader);
    
          try {
            if (count != 0) { // read-volatile  volatile读会刷新缓存,尽量保证可见性,如果为0那么直接load
              // don't call getLiveEntry, which would ignore loading values
              ReferenceEntry<K, V> e = getEntry(key, hash);
              //如果对应的Entry不为Null,证明值还在
              if (e != null) {
                long now = map.ticker.read();//获取当前的时间,根据当前的时间进行Live的数据的读取
                V value = getLiveValue(e, now);
                //元素不为null的话可以不刷新
                if (value != null) {
                  recordRead(e, now);//为entry增加accessTime,同时加入recencyQueue
                  statsCounter.recordHits(1);//更新当前的状态,增加为命中,可以用于计算命中率
                  //判断当前有没有到刷新的时机,如果没有的话那么返回原值。否则进行刷新
                  return scheduleRefresh(e, key, hash, value, now, loader);
                }
                //value为null,如果此时value正在刷新,那么此时等待刷新结果
                ValueReference<K, V> valueReference = e.getValueReference();
                if (valueReference.isLoading()) {
                  return waitForLoadingValue(e, key, valueReference);
                }
              }
            }
            //如果取不到值,那么进行统一的加锁get
            // at this point e is either null or expired;
            return lockedGetOrLoad(key, hash, loader);
          } catch (ExecutionException ee) {
            Throwable cause = ee.getCause();
            if (cause instanceof Error) {
              throw new ExecutionError((Error) cause);
            } else if (cause instanceof RuntimeException) {
              throw new UncheckedExecutionException(cause);
            }
            throw ee;
          } finally {
            postReadCleanup();//每次Put和get之后都要进行一次Clean
          }
        }
    
    

    get的实现和JDK1.6的ConcurrentHashMap思想一致,都是put加锁,但是get是用volatile保证。
    这里主要做了几件事:

    1. 首先获取Entry,Entry不为null获取对应的Value,如果Value不为空,那么证明值还在,那么这时候判断一下是否要刷新直接返回了。否则判断目前引用是否在Loading,如果是就等待Loading结束。
    2. 如果取不到Entry或者Value为null 并且没有在Loading,那么这时候进行lockedGetOrLoad(),这是一个大活儿。
        V lockedGetOrLoad(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException {
          ReferenceEntry<K, V> e;
          ValueReference<K, V> valueReference = null;
          LoadingValueReference<K, V> loadingValueReference = null;
          boolean createNewEntry = true;
    
          lock();//加锁,因为会改变数据结构
          try {
            // re-read ticker once inside the lock
            long now = map.ticker.read();
            preWriteCleanup(now);//清除引用队列,Acess队列和Write队列中过期的数据,这算是一次put操作
    
            int newCount = this.count - 1;
            AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
            int index = hash & (table.length() - 1);
            ReferenceEntry<K, V> first = table.get(index);
            //定位目标元素
            for (e = first; e != null; e = e.getNext()) {
              K entryKey = e.getKey();
              if (e.getHash() == hash
                  && entryKey != null
                  && map.keyEquivalence.equivalent(key, entryKey)) {
                valueReference = e.getValueReference();
                //如果目前处在loading状态,不创建新元素
                if (valueReference.isLoading()) {
                  createNewEntry = false;
                } else {
                  V value = valueReference.get();
                  if (value == null) { //可能被GC掉了,加入removeListener
                    enqueueNotification(
                        entryKey, hash, value, valueReference.getWeight(), RemovalCause.COLLECTED);
                  } else if (map.isExpired(e, now)) { //可能过期了
                    // This is a duplicate check, as preWriteCleanup already purged expired
                    // entries, but let's accomodate an incorrect expiration queue.
                    enqueueNotification(
                        entryKey, hash, value, valueReference.getWeight(), RemovalCause.EXPIRED);
                  } else {//目前就已经加载过了,返回
                    recordLockedRead(e, now);
                    statsCounter.recordHits(1);
                    // we were concurrent with loading; don't consider refresh
                    return value;
                  }
                  //删除在队列中相应的引用,因为后面要新创建
                  // immediately reuse invalid entries
                  writeQueue.remove(e);
                  accessQueue.remove(e);
                  this.count = newCount; // write-volatile
                }
                break;
              }
            }
            //创建新的Entry,但是此时是没有值的
            if (createNewEntry) {
              loadingValueReference = new LoadingValueReference<K, V>();
    
              if (e == null) {
                e = newEntry(key, hash, first);
                e.setValueReference(loadingValueReference);
                table.set(index, e);
              } else {
                e.setValueReference(loadingValueReference);
              }
            }
          } finally {
            unlock();
            postWriteCleanup();
          }
    
          if (createNewEntry) {
            try {
              // Synchronizes on the entry to allow failing fast when a recursive load is
              // detected. This may be circumvented when an entry is copied, but will fail fast most
              // of the time.
              synchronized (e) {
                return loadSync(key, hash, loadingValueReference, loader);
              }
            } finally {
              statsCounter.recordMisses(1);
            }
          } else {
            // The entry already exists. Wait for loading.
            return waitForLoadingValue(e, key, valueReference);
          }
        }
    

    首先说一下为什么加锁,加锁的原因有两个:

    1. load算是一个写操作,改变数据结构,需要加锁。
    2. 为了避免缓存击穿,加锁一个防止缓存击穿的发生,当然是JVm级别的不是分布式级别的。

    因为是写所以要进行preWriteCleanup,根据key定位一下Entry,如果能定位到,那么判断是否在Loading,如果是的话不创建新的Entry并且等待Loading结束。如果不是那么判断value是否为null和是否过期,如果是的话都要进行创建新Entry,如果都不是证明value是加载过了,那么更新下Access队列然后返回。
    接下来清除一下Access和Write队列的元素,创建新的Entry。这里比较有意思:

       // at most one of loadSync/loadAsync may be called for any given LoadingValueReference
        //同步刷新
        V loadSync(
            K key,
            int hash,
            LoadingValueReference<K, V> loadingValueReference,
            CacheLoader<? super K, V> loader)
            throws ExecutionException {
          ListenableFuture<V> loadingFuture = loadingValueReference.loadFuture(key, loader);
          return getAndRecordStats(key, hash, loadingValueReference, loadingFuture);
        }
    

    这里创建了一个loadingReference,这也就是之前看到的判断是否在Loading。如果是Loading状态那么表面有一个线程正在更新Cache,其他的线程等待就可以了。

    这里可以看到其实也支持异步的刷新:

        ListenableFuture<V> loadAsync(
            final K key,
            final int hash,
            final LoadingValueReference<K, V> loadingValueReference,
            CacheLoader<? super K, V> loader) {
          final ListenableFuture<V> loadingFuture = loadingValueReference.loadFuture(key, loader);
          loadingFuture.addListener(
              new Runnable() {
                @Override
                public void run() {
                  try {
                    getAndRecordStats(key, hash, loadingValueReference, loadingFuture);
                  } catch (Throwable t) {
                    logger.log(Level.WARNING, "Exception thrown during refresh", t);
                    loadingValueReference.setException(t);
                  }
                }
              },
              directExecutor());
          return loadingFuture;
        }
    

    后面更新的逻辑就不贴了。
    从上面我们可以看到,对于每一次get都会去进行Access队列的更新,同时对于多线程的更新只会引起一个线程去load数据,对于不存在的数据,get时也会进行一次load操作。同时通过同步操作解决了缓存击穿的问题。不得不说GuavaCache设计的很巧妙。

    其实Guava还有一个比较好玩的东西,asMap(),我们感觉GuavaCache像Map,但是还不完全是Map,那么就提供了一个方法以Map的视图去展现。
    看下asMap()

        @Override
        public ConcurrentMap<K, V> asMap() {
          return localCache;
        }
    

    其实就是localCache返回了,返回类型是ConcurrentMap,那么我们看看localCache的继承结构:

    @GwtCompatible(emulated = true)
    class LocalCache<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V> {
    

    果然和Map关系大大的,也就是说,LocalCache本身是个ConcurrentMap,但是对于LocalCache的这些map方法我们是调用不到的,因为我们只能用LoadingCache嘛。通过asMap我们能得到LocalCache,但是我们不能使用除了Map接口之外的方法,也就是说我们不能使用自动加载等一系列的功能。
    正如官方Wiki说的:

    Paste_Image.png

    至此所有的核心源码分析完了,觉得有点恶心,源码这东西就要静下来细细的看,收获会很大。

    由于文章比较长,如果有什么问题还请赐教。最后,祝自己这个苦逼码农圣诞快乐。

    相关文章

      网友评论

      • puzzlex:分析的很好,如果先由大概的结构图在进行分析会让其他人更容易理解一些
      • 江流儿sai:我想学习guava cache,能推荐点学习资料吗?如何使用他与spring整合网上很难找到
        戈风:帅哥建议梳理一下用图形化表示结构出来
        一只小哈:@江流儿sai guava是一个工具包而已,guava cache 本身就是local cache的一种 没有什么整合不整合的 直接用就行了
      • 朱端的一坨:您好,我想请教下,这段没太看懂 “ 构造的时候参数里面有ReferenceQueue<K> queue,这个就是我们上面提到的KeyReferenceQueue,所以在Key被GC掉的时候,会自动的将引用加入到ReferenceQueue这样我们就能处理对应的Entry了 ”, GC的时候为啥会自动的把引用加入到Queue里面呢?

      本文标题:guava Cache源码分析(二)

      本文链接:https://www.haomeiwen.com/subject/zmglvttx.html