初探guava cache实现

作者: 王二北 | 来源:发表于2019-08-11 22:21 被阅读10次

    王二北原创,转载请标明出处:来自王二北

    1、简单介绍guava cache

    guava cache是Google guava中提供的一款轻量级缓存组件,最近项目中用到了guava cache做本地缓存,之所以选择guava cache,原因是guava cache够轻量、够简单、key过期和内存管理机制也较为完善,可扩展性强。

    下面是guava cache的一个简单例子:

     LoadingCache<String, String> cache = CacheBuilder.newBuilder()
                    // 当cache中元素个数超过100时,则进行内存回收
                    .maximumSize(100)
                    // 缓存项在给定时间内没有被进行写操作,则回收这个数据项占用内存
                    .expireAfterWrite(5, TimeUnit.SECONDS)
                    // 缓存项在给定时间内没有被访问(读/写操作),则回收这个数据项占用内存
                    .expireAfterAccess(10,TimeUnit.SECONDS) 缓存项在给的时间内没有进行写操作(创建/更新值),则刷新缓存, 调用reload()去重新加载数据
                    .refreshAfterWrite(15, TimeUnit.SECONDS)
                    // 删除监听器,当缓存被删除时,会触发onRemoval()方法
                    // RemovalNotification是删除通知,其中包含移除原因[RemovalCause]、键和值。
                    .removalListener(new RemovalListener<String, String>() {
                        @Override
                        public void onRemoval(RemovalNotification<String, String> notification) {
                            System.out.println("onRemoval execute: key="+notification.getKey()+",value="+notification.getValue()+" was deleted,cause="+notification.getCause().name());
                        }
                    })
    
                    /**
                     * recordStats用来开启Guava Cache的统计功能,用于统计缓存命中率、命中次数等值。
                     * 统计打开后,使用Cache.stats()方法会返回CacheStats对象以提供如下统计信息:
                     * hitRate():缓存命中率;
                     * averageLoadPenalty():加载新值的平均时间,单位为纳秒;
                     * evictionCount():缓存项被回收的总数,不包括显式清除。
                     */
                    .recordStats()
                    .build(
                            // CacheLoader用于处理load, reload等逻辑
                            new CacheLoader<String, String>() {
                                public String load(String key)  {
                                    System.out.println("load .......");
                                     return key + new Date().toString();
                                }
    
                                //重载CacheLoader.reload(K, V)可以扩展刷新时(调用Cache#refresh()方法时)的行为,
                                // 这个方法允许在获取新值时返回旧的值。
                                @Override
                                public ListenableFuture<String> reload(String key, String oldValue) throws Exception {
                                    System.out.println("reload cache:  key="+key+",oldValue="+oldValue);
                                    return super.reload(key, oldValue);
                                }
                            });
    

    2、对guava cache的疑惑

    虽然guava cache很“小”,很轻量,但是在使用的过程中,我一直对其中的一些机制感到疑惑:

    (1)guava cache有几个重要的参数:expireAfterAccess、expireAfterWrite、refreshAfterWrite,第一个和第二个参数从字面上理解,分别表示一个key对应的value,多久没有访问就会过期和多久没有进行写操作就会过期,第三个参数表示写操作多久后进行刷新。 那么这三个参数是如何管理cache中数据的有效性的呢,guava cache是如何通过这三个参数管理数据的呢?

    (2)guava cache官方文档介绍说,guava cache本身并不会开启一个线程去维护缓存, 而是在访问key时顺便检查一下key的有效期,然后该过期的过期,该刷新缓存的刷缓存,那么在多线程访问的情况下,需要注意哪些问题呢?如果要实现主动刷新,该如何实现呢?

    要回答第一个问题,最好从本质上,也就是从源码层面来解答这些问题。

    注:如果你感觉源码比较枯燥,可以直接跳过2.1小节,从2.2小节看起。

    2.1、从源码中探究问题的答案

    首先看LoadindgCache,它是一个接口(又继承了Cache接口),接口中定义了get/getUnchecked/refresh等跟缓存存取相关的方法。

    @GwtCompatible
    public interface LoadingCache<K, V> extends Cache<K, V>, Function<K, V>{
        //xxxx
    }
    

    LoadingCache实现类的结构如下:


    image

    那么我们会用到哪个类呢?回头查看上面的例子,在调用构建器CacheBuilder#build方法时,返回LoadingCache对象,查看一下build方法源码:

    public <K1 extends K, V1 extends V> LoadingCache<K1, V1> build(
          CacheLoader<? super K1, V1> loader) {
        checkWeightWithWeigher();
        return new LocalCache.LocalLoadingCache<>(this, loader);
      }
    

    可以看到,cache使用的是LocalLoadingCache这个实现类。

    我们平常用的最多的,也就是cache.get()方法,那么就从LocalLoadingCache入手,查看一下其对应的get(xxx)方法:

       // 其get方法调用的是LocalCache的getOrLoad方法
        @Override
        public V get(K key) throws ExecutionException {
          return localCache.getOrLoad(key);
        }
        
        // LocalCache中getOrLoad内调用了get
        V getOrLoad(K key) throws ExecutionException {
            return get(key, defaultLoader);
        }
      
        // get方法内部的实现,看着是不是有点熟悉,有点像concurrentHashMap
        V get(K key, CacheLoader<? super K, V> loader) throws ExecutionException {
            // 注意,key不可为空
            int hash = hash(checkNotNull(key));
            return segmentFor(hash).get(key, hash, loader);
        }
    

    当看到上面的LocalCache中的V get(K key, CacheLoader<? super K, V> loader)方法时,突然感觉有点眼熟,这个又是hash,又是segment的,怎么感觉和ConcurrentHashMap很像,接着往下看,首先取hash值的方法实现和hashMap等中取hash的方式大同小异,这里不是重点,接着看segmentFor(hash)

     // 通过hash得到Segment数组下标,通过下标得到segment数组的元素
     final Segment<K, V>[] segments;
     Segment<K, V> segmentFor(int hash) {
        // TODO(fry): Lazily create segments?
        return segments[(hash >>> segmentShift) & segmentMask];
      }
    

    Segment是LocalCache的内部类,在创建Sement时,会初始化一个AtomicReferenceArray实现的可并发读写的数组,数组元素是ReferenceEntry:


    image
    // 初始化线程安全的数组,数组元素是ReferenceEntry(也就是链表中的头结点)
     void initTable(AtomicReferenceArray<ReferenceEntry<K, V>> newTable) {
          this.threshold = newTable.length() * 3 / 4; // 0.75
          if (!map.customWeigher() && this.threshold == maxSegmentWeight) {
            // prevent spurious expansion before eviction
            this.threshold++;
          }
          this.table = newTable;
        }
    AtomicReferenceArray<ReferenceEntry<K, V>> newEntryArray(int size) {
          return new AtomicReferenceArray<>(size);
    }
    
    

    ReferenceEntry是一个接口,有基于强引用和弱引用的实现,这里不再展开讲,后面再写一篇去讲,下面是ReferenceEntry的定义的方法,可以看出其也是链表的形式:


    image

    由此可见LoadingCache的内部数据结构和ConcurrentHashMap是类似的。


    image

    接着分析get方法,前面讲到get的内部实现是:

     // get方法内部的实现
        V get(K key, CacheLoader<? super K, V> loader) throws ExecutionException {
            // 注意,key不可为空
            int hash = hash(checkNotNull(key));
            // 通过hash定位到segment数组的某个Segment元素,然后调用其get方法
            return segmentFor(hash).get(key, hash, loader);
        }
        
        // LocalCache内部类Segment的get方法实现
        V get(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException {
          checkNotNull(key);
          checkNotNull(loader);
          try {
            if (count != 0) { // read-volatile
              // 内部也是通过找Entry链数组定位到某个Entry节点
              ReferenceEntry<K, V> e = getEntry(key, hash);
              if (e != null) {
                long now = map.ticker.read();
                // 根据key获取值,这个方法中也会对值进行一些失效数据的清理
                // 另外需要注意的是,这个方法中不会去load数据。
                // 如果过期的值在这一步被清理了,则下面回去load
                V value = getLiveValue(e, now);
                if (value != null) {
                  // 更新key的AccessTime
                  recordRead(e, now);
                  // 记录key命中
                  statsCounter.recordHits(1);
                  // 这个方法中会判断是否到了刷新的时刻,并进行刷新
                  // 具体参考后面的scheduleRefresh()源码
                  return scheduleRefresh(e, key, hash, value, now, loader);
                }
                // 当值不存在时(有可能是前面过期时清理了),则去load数据
                ValueReference<K, V> valueReference = e.getValueReference();
                if (valueReference.isLoading()) {
                  return waitForLoadingValue(e, key, valueReference);
                }
              }
            }
    
            // at this point e is either null or expired;
            return lockedGetOrLoad(key, hash, loader);
          } catch (ExecutionException ee) {
            Throwable cause = ee.getCause();
            if (cause instanceof Error) {
              throw new ExecutionError((Error) cause);
            } else if (cause instanceof RuntimeException) {
              throw new UncheckedExecutionException(cause);
            }
            throw ee;
          } finally {
            postReadCleanup();
          }
        }
    
    

    这里主要关注上面代码中getLiveValue()、scheduleRefresh()、lockedGetOrLoad()和 lockedGetOrLoad()方法:

    先看getLiveValue()方法的源码:

     /**
     * 这个方法会触发一些失效数据的清理,但不会去load数据
     */
     V getLiveValue(ReferenceEntry<K, V> entry, long now) {
          // 下面两步是对那些不是强引用类型的key或value被垃圾回收后,去清理链中对应的entry。这两步体现了guava cache并不会主动去清理失效的数据,而是在访问数据时才被动去清理。
          //( 关于key或value的源码在下面,这里不做深入探讨,如果需要可以自行查看。)
          //比如key是一个弱引用,gc后,key对象被回收了,这个key对于的entry就会被放到一个keyReferenceQueue中,每次get操作时,如果当前key是null,则就触发一次遍历这个keyReferenceQueue,去清理entry链中对于的entry节点。
          if (entry.getKey() == null) {
            tryDrainReferenceQueues();
            return null;
          }
          // value也一样,也是遍历对应的queue
          V value = entry.getValueReference().get();
          if (value == null) {
            tryDrainReferenceQueues();
            return null;
          }
          // 首先调用map.isExpired判断当前entry是否过期,如果已经过期,则触发过期清理,具体的源码在下面
          if (map.isExpired(entry, now)) {
            tryExpireEntries(now);
            return null;
          }
          return value;
        }
    
       // 下面是对应key/value 不是强引用类型时的处理 逻辑
        /**
         * The key reference queue contains entries whose keys have been garbage collected, and which
         * need to be cleaned up internally.
         */
        final @Nullable ReferenceQueue<K> keyReferenceQueue;
    
        /**
         * The value reference queue contains value references whose values have been garbage collected,
         * and which need to be cleaned up internally.
         */
        final @Nullable ReferenceQueue<V> valueReferenceQueue;
        
    /** Cleanup collected entries when the lock is available. */
        void tryDrainReferenceQueues() {
          if (tryLock()) {
            try {
              drainReferenceQueues();
            } finally {
              unlock();
            }
          }
        }
        // 判断key或value是否是非强引用类型
         void drainReferenceQueues() {
          if (map.usesKeyReferences()) {
            drainKeyReferenceQueue();
          }
          if (map.usesValueReferences()) {
            drainValueReferenceQueue();
          }
        }
        
      boolean usesKeyReferences() {
        return keyStrength != Strength.STRONG;
      }
    
      boolean usesValueReferences() {
        return valueStrength != Strength.STRONG;
      }
    
    

    下面是guava cache中关于过期数据的处理过程:

    // 下面是guava cache中判断一个entry是否过期的处理
      boolean isExpired(ReferenceEntry<K, V> entry, long now) {
        checkNotNull(entry);
        // 首先判断accessTime是否设置,如果设置则判断当前是否已达到accessTiMe过期条件
        if (expiresAfterAccess() && (now - entry.getAccessTime() >= expireAfterAccessNanos)) {
          return true;
        }
        // 然后判断afterWrite是否已设置,如果设置则判断当前是否已达到writeTime过期条件
        if (expiresAfterWrite() && (now - entry.getWriteTime() >= expireAfterWriteNanos)) {
          return true;
        }
        return false;
      }
        
    
       // 尝试清理过期的数据,这一步会加锁
        void tryExpireEntries(long now) {
          if (tryLock()) {
            try {
              expireEntries(now);
            } finally {
              unlock();
              // don't call postWriteCleanup as we're in a read
            }
          }
        }
        
        // 注意每次触发清理时,会清理cache中所有已经过期的数据
        void expireEntries(long now) {
          drainRecencyQueue();
          // 清理时,会遍历两个queue,去判断过期的entry,然后清理掉(这样的好处比遍历整个map去判断过期再清理效率要高很多,也算是空间换时间的一种手段)。
          // 这两个queue,一个是writequeue,它记录了最近被write过的entry(是按write时间排序的)
          // 另一个是accessQueue,它记录了最近被访问过的entry(是按访问时间排序)
          ReferenceEntry<K, V> e;
          while ((e = writeQueue.peek()) != null && map.isExpired(e, now)) {
            if (!removeEntry(e, e.getHash(), RemovalCause.EXPIRED)) {
              throw new AssertionError();
            }
          }
          while ((e = accessQueue.peek()) != null && map.isExpired(e, now)) {
            if (!removeEntry(e, e.getHash(), RemovalCause.EXPIRED)) {
              throw new AssertionError();
            }
          }
        }
    

    下面是判断是否需要刷新的逻辑

    V scheduleRefresh(
            ReferenceEntry<K, V> entry,
            K key,
            int hash,
            V oldValue,
            long now,
            CacheLoader<? super K, V> loader) {
          // 如果设置了refreshAfterWrite,并且刷新时间点已到,则进行刷新操作,也就是reload
          // 并且,如果有其他线程正在进行load操作(isloading为true),则直接返回旧的值。
          // 这里提前剧透一下,同一时刻只会有一个线程去reload数据,其他线程直接返回。这一点需要和后面要讲的load区分,load是线程阻塞的
          // 那么这个isloading是怎么为true的呢,接着看下面的refresh
          if (map.refreshes()
              && (now - entry.getWriteTime() > map.refreshNanos)
              && !entry.getValueReference().isLoading()) {
              // refresh方法,刷新开始
            V newValue = refresh(key, hash, loader, true);
            if (newValue != null) {
              return newValue;
            }
          }
          // 如果已经有其他线程在加载当前key对应的新数据,当前线程直接返回旧的数据
          return oldValue;
        }
        
        // 下面是实现key对应数据刷新的方法实现
         V refresh(K key, int hash, CacheLoader<? super K, V> loader, boolean checkTime) {
          // 这一步insertLoadingValueReference()方法就是isloading的关键,具体实现参考下方
          final LoadingValueReference<K, V> loadingValueReference =
              insertLoadingValueReference(key, hash, checkTime);
          if (loadingValueReference == null) {
            return null;
          }
    
          ListenableFuture<V> result = loadAsync(key, hash, loadingValueReference, loader);
          if (result.isDone()) {
            try {
              return Uninterruptibles.getUninterruptibly(result);
            } catch (Throwable t) {
              // don't let refresh exceptions propagate; error was already logged
            }
          }
          return null;
        }
        
        // 下面方法主要实现了两个功能
        // 1、保证同一时刻只能有一个线程对同一个key进行刷新
        // 2、标记当前key正在loading
        LoadingValueReference<K, V> insertLoadingValueReference(
            final K key, final int hash, boolean checkTime) {
          ReferenceEntry<K, V> e = null;
          // 首先加锁
          lock();
          try {
            long now = map.ticker.read();
            // 这一步也会执行无效数据的清理
            preWriteCleanup(now);
            AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
            int index = hash & (table.length() - 1);
            ReferenceEntry<K, V> first = table.get(index);
    
            // 首先通过key找到对应的entry
            for (e = first; e != null; e = e.getNext()) {
              K entryKey = e.getKey();
              if (e.getHash() == hash
                  && entryKey != null
                  && map.keyEquivalence.equivalent(key, entryKey)) {
                // entry存在,则判断当前entry是否正在被其他线程执行加载数据
                或者当前entry并未过期,则直接返回null
                ValueReference<K, V> valueReference = e.getValueReference();
                if (valueReference.isLoading()
                    || (checkTime && (now - e.getWriteTime() < map.refreshNanos))) {
                  return null;
                }
                
                // 如果没有其他线程正在操作,则创建一个新的LoadingValueReference给这个entry,新LoadingValueReference中的isLoading()方法就只有一行代码,return true.(看下方的源码截图)
                ++modCount;
                LoadingValueReference<K, V> loadingValueReference =
                    new LoadingValueReference<>(valueReference);
                e.setValueReference(loadingValueReference);
                return loadingValueReference;
              }
            }
            // 有可能在进行refresh操作之前,key对应entry已经被清理掉,因此这里会重新创建一个entry,后续会接着加载数据。
            ++modCount;
            LoadingValueReference<K, V> loadingValueReference = new LoadingValueReference<>();
            e = newEntry(key, hash, first);
            e.setValueReference(loadingValueReference);
            table.set(index, e);
            return loadingValueReference;
          } finally {
            unlock();
            postWriteCleanup();
          }
        }
        
        // 下面代码的意思是异步的去加载数据
        // 但是默认情况下,还是同步的去调用load()方法,因此如果要实现真正的异步,需要重新reload()方法去实现
        ListenableFuture<V> loadAsync(
            final K key,
            final int hash,
            final LoadingValueReference<K, V> loadingValueReference,
            CacheLoader<? super K, V> loader) {
          final ListenableFuture<V> loadingFuture = loadingValueReference.loadFuture(key, loader);
          loadingFuture.addListener(
              new Runnable() {
                @Override
                public void run() {
                  try {
                    getAndRecordStats(key, hash, loadingValueReference, loadingFuture);
                  } catch (Throwable t) {
                    logger.log(Level.WARNING, "Exception thrown during refresh", t);
                    loadingValueReference.setException(t);
                  }
                }
              },
              directExecutor());
          return loadingFuture;
        }
        
        public ListenableFuture<V> loadFuture(K key, CacheLoader<? super K, V> loader) {
          try {
            stopwatch.start();
            V previousValue = oldValue.get();
            // 如果旧数据为空,则调用load()方法加载数据
            if (previousValue == null) {
              V newValue = loader.load(key);
              return set(newValue) ? futureValue : Futures.immediateFuture(newValue);
            }
            // 否则,调用reload()加载数据
            ListenableFuture<V> newValue = loader.reload(key, previousValue);
            if (newValue == null) {
              return Futures.immediateFuture(null);
            }
            // To avoid a race, make sure the refreshed value is set into loadingValueReference
            // *before* returning newValue from the cache query.
            return transform(
                newValue,
                new com.google.common.base.Function<V, V>() {
                  @Override
                  public V apply(V newValue) {
                    LoadingValueReference.this.set(newValue);
                    return newValue;
                  }
                },
                directExecutor());
          } catch (Throwable t) {
            ListenableFuture<V> result = setException(t) ? futureValue : fullyFailedFuture(t);
            if (t instanceof InterruptedException) {
              Thread.currentThread().interrupt();
            }
            return result;
          }
        }
        
      // reload的默认实现就是同步调用load(),因此guava cache官方推荐重新override 这个方法,自己实现多线程异步处理(比如从线程池中取出一个线程去处理)
      // It is recommended that it be overridden with an asynchronous implementation when using CacheBuilder#refreshAfterWrite
      public ListenableFuture<V> reload(K key, V oldValue) throws Exception {
        checkNotNull(key);
        checkNotNull(oldValue);
        return Futures.immediateFuture(load(key));
      }
    

    [图片上传失败...(image-e4a47-1565533251151)]

    2.2、源码分析总结

    guava cache总结
        内部数据结构
            类似于ConcurrentHashMap,Segement数组 + Entry链表数组
        如何管理数据
            1、被动清理。只有当访问数据(比如get操作)时,guavacache才会去清理数据.
            2、清理两方面的数据
                当key或value为非强引用类型的对象被GC回收后,其对应的entry会被清理
                当数据失效时
            3、失效判断
                设置了expiresAfterAccess,并且超过expiresAfterAccess时间没有访问数据(读、写),则数据失效。
                设置了expiresAfterWrite,并且超过expiresAfterWrite时间没有更新数据(写),则数据失效。
                数据失效时,清理数据,并去load数据
            4、数据刷新
                设置了refreshAfterWrite,并且超过refreshAfterWrite时间没有更新数据,则调用reload刷新数据
            5、清理刷新数据流程
                1、首先,访问数据时,如果能通过key找到对应的entry,如果entry对象中对应的key或value为null,则表示是由于gc回收(回收非强引用)导致的,此时会触发对cache中这类数据的主动清理
                2、接着判断通过key得到的entry是否超过expiresAfterAccess,如果是则过期,触发主动清理过期数据的操作
                3、然后判断entry是否超过expiresAfterWrite,如果是则过期,触发清理过程
                4、如果经过上面的操作,数据被清理(返回null),则最后调用load()加载数据。
                5、如果经过123,数据不为空,则判断refreshAfterWrite,如果满足,则调用reload()刷新数据
        并发处理
            load()时, 只会有一个线程去执行load(), 其他线程会被阻塞,直到数据加载成功。
            reload()时,只会有一个线程去执行reload(), 其他线程会返回oldValue()
            guava cache推荐重写reload()方法,内部虽然使用了异步实现(异步回调),但默认实现是同步调用load(),需要自己实现多线程处理(比如在reload中搞一个线程池)
        三个时间参数
            cache的时间参数一般用于控制数据占用空间和数据的实时性
            expiresAfterAccess用于管理数据空间,清理不常用的数据
            expiresAfterWrite和refreshAfterWrite则用于管理数据的实时性
            不同的是当expireAfterWrite过期时,会重新去load()数据,而refreshAfterWrite过期时,会reload()数据
    
    image

    通过上面对guava cache源码的浅读,解决了我的第一个问题和第二个问题前半部分。
    那么如何实现主动刷新呢?
    可以使用HashedWheelTimer来搞一下,写的有点累了,这里就不介绍HashedWheel了,可以自己百度一下,实现很巧妙,也很实用,尤其是在任务量比较大的场景:


    image

    相关文章

      网友评论

        本文标题:初探guava cache实现

        本文链接:https://www.haomeiwen.com/subject/aividctx.html