Caffeine 知识扩充

作者: 右耳菌 | 来源:发表于2022-06-12 15:40 被阅读0次

    一、常用的缓存淘汰算法

    缓存淘汰算法的作用是在有限的资源内,尽可能识别出哪些数据在短时间会被重复利用,从而提高缓存的命中率。常用的缓存淘汰算法有LRU、LFU、FIFO等。

    • LRU (Least Recently Used):算法认为最近访问过的数据将来被访问的几率也更高。
    • LFU (Least Frequently Used):算法根据数据的历史访问频率来淘汰数据,其核心思想是“如果数据过去被访问多次,那么将来被访问的频率也更高”。

    二、淘汰算法W-TinyLFU

    • caffeine 淘汰算法
      Caffeine采用了一种结合LRU、LFU优点的算法:W-TinyLFU,其特点:高命中率、低内存占用。在搞懂W-TinyLFU算法之前,首先了解一下TinyLFU算法:TinyLFU是一种为了解决传统LFU算法空间存储比较大的问题LFU算法,它可以在较大访问量的场景下近似的替代LFU的数据统计部分,它的原理有些类似BloomFilter,也可以查看 caffeine 中的 FrequencySketch
    FrequencySketch

    更多内容,可以

    如上图所示,W-TinyLFU分为两个大类,分别是窗口缓存、主缓存:

    • 窗口缓存占用比例大小为 1%左右,主缓存占用百分之99%。
    • caffeine 可以根据工作负载特性,动态分配窗口缓存和主缓存空间的大小,具体的分配原则大致如下新增数据频繁,大窗口更加受欢迎;如果新增频率偏小,小窗口更受欢迎。
    • 主缓存分为两个部分,一个是存放热数据(hot items)的部分(A2 protected segment),占用了80%,一个是存放冷数据(non hot items)的部分(A1 probation segment),占用20%。数据可以在这两个部分之间进行互相的转移。
    • 淘汰过程
    • 首先数据先放入窗口缓存中
    • 如果窗口缓存满了,则把数据淘汰,放入到A1中
    • 如果A2也满了,那么就会使用窗口缓存中的候选和A1 probation segment中的候选进行对比淘汰。Caffeine中具体的PK逻辑可以查看BoundedLocalCache下的admit方法
     /**
      * Determines if the candidate should be accepted into the main space, as determined by its
      * frequency relative to the victim. A small amount of randomness is used to protect against hash
      * collision attacks, where the victim's frequency is artificially raised so that no new entries
      * are admitted.
      *
      * @param candidateKey the key for the entry being proposed for long term retention
      * @param victimKey the key for the entry chosen by the eviction policy for replacement
      * @return if the candidate should be admitted and the victim ejected
      */
     @GuardedBy("evictionLock")
     boolean admit(K candidateKey, K victimKey) {
       int victimFreq = frequencySketch().frequency(victimKey);
       int candidateFreq = frequencySketch().frequency(candidateKey);
       if (candidateFreq > victimFreq) {
         return true;
       } else if (candidateFreq <= 5) {
         // The maximum frequency is 15 and halved to 7 after a reset to age the history. An attack
         // exploits that a hot candidate is rejected in favor of a hot victim. The threshold of a warm
         // candidate reduces the number of random acceptances to minimize the impact on the hit rate.
         return false;
       }
       int random = ThreadLocalRandom.current().nextInt();
       return ((random & 127) == 0);
     }
    

    二、Caffenie的核心类图

    - Caffenie的核心类图

    三、Caffeine缓存的分类

    Caffeine缓存的分类

    四、Caffeine 操作的原子性

    • load
    • put
    • invalidate

    caffeine的load、put和invalidate操作都是原子的,这个意思是这3个操作是互斥的,load和put是不能同时执行的,load和invalidate也是不能同时执行的。

    package cn.lazyfennec.caffeine;
    
    import com.github.benmanes.caffeine.cache.Caffeine;
    import com.github.benmanes.caffeine.cache.LoadingCache;
    import org.junit.Test;
    
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * @Author: Neco
     * @Description:
     * @Date: create in 2022/6/12 15:18
     */
    public class CaffeineTest {
        @Test
        public void test() throws Exception {
            AtomicInteger atomicInteger=new AtomicInteger();
            LoadingCache<String, String> cache = Caffeine.newBuilder().maximumSize(3).build(key -> {
                Thread.sleep(1000);
                return atomicInteger.incrementAndGet()+"";
            });
    
            cache.get("test");
            cache.invalidate("test");
    
            new Thread() {
                @Override
                public void run() {
                    cache.get("test");
    
                }
            }.start();
    
            new Thread() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(200);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    long start = System.currentTimeMillis();
                    cache.invalidate("test");
                    System.out.println("use ms:"+(System.currentTimeMillis() - start));
                }
            }.start();
    
            Thread.sleep(1200);
            System.out.println("========" + cache.asMap());
            System.out.println("========" + cache.get("test"));
        }
    }
    
    • 运行结果
    use ms:797
    ========{}
    ========3
    

    五、Caffeine 缓存过期策略解析

    可以查看类scheduleDrainBuffers中的afterWriteafterRead方法

    • afterWrite
      /**
       * Performs the post-processing work required after a write.
       *
       * @param task the pending operation to be applied
       */
      void afterWrite(Runnable task) {
        if (buffersWrites()) {
          for (int i = 0; i < WRITE_BUFFER_RETRIES; i++) {
            if (writeBuffer().offer(task)) {
              scheduleAfterWrite();
              return;
            }
            scheduleDrainBuffers();
          }
    
          // The maintenance task may be scheduled but not running due to all of the executor's threads
          // being busy. If all of the threads are writing into the cache then no progress can be made
          // without assistance.
          try {
            performCleanUp(task);
          } catch (RuntimeException e) {
            logger.log(Level.SEVERE, "Exception thrown when performing the maintenance task", e);
          }
        } else {
          scheduleAfterWrite();
        }
      }
    
    • afterRead
      /**
       * Performs the post-processing work required after a read.
       *
       * @param node the entry in the page replacement policy
       * @param now the current time, in nanoseconds
       * @param recordHit if the hit count should be incremented
       */
      void afterRead(Node<K, V> node, long now, boolean recordHit) {
        if (recordHit) {
          statsCounter().recordHits(1);
        }
    
        boolean delayable = skipReadBuffer() || (readBuffer.offer(node) != Buffer.FULL);
        if (shouldDrainBuffers(delayable)) {
          scheduleDrainBuffers();
        }
        refreshIfNeeded(node, now);
      }
    

    可以知道其中有一个scheduleDrainBuffers方法,该方法的作用就是过期任务调度的。

      /**
       * Attempts to schedule an asynchronous task to apply the pending operations to the page
       * replacement policy. If the executor rejects the task then it is run directly.
       */
      void scheduleDrainBuffers() {
        if (drainStatus() >= PROCESSING_TO_IDLE) {
          return;
        }
        if (evictionLock.tryLock()) {
          try {
            int drainStatus = drainStatus();
            if (drainStatus >= PROCESSING_TO_IDLE) {
              return;
            }
            lazySetDrainStatus(PROCESSING_TO_IDLE);
            executor().execute(drainBuffersTask);
          } catch (Throwable t) {
            logger.log(Level.WARNING, "Exception thrown when submitting maintenance task", t);
            maintenance(/* ignored */ null);
          } finally {
            evictionLock.unlock();
          }
        }
      }
    

    又看到drainBuffersTask字样,查看可以知道这是一个PerformCleanupTask,具体查看该类细节,可以看见以下内容

    PerformCleanupTask

    而其中的perfermCleanUp的具体细节如下

      /**
       * Performs the maintenance work, blocking until the lock is acquired. Any exception thrown, such
       * as by {@link CacheWriter#delete}, is propagated to the caller.
       *
       * @param task an additional pending task to run, or {@code null} if not present
       */
      void performCleanUp(@Nullable Runnable task) {
        evictionLock.lock();
        try {
          maintenance(task);
        } finally {
          evictionLock.unlock();
        }
      }
    

    再对maintenance方法进行查看

      /**
       * Performs the pending maintenance work and sets the state flags during processing to avoid
       * excess scheduling attempts. The read buffer, write buffer, and reference queues are
       * drained, followed by expiration, and size-based eviction.
       *
       * @param task an additional pending task to run, or {@code null} if not present
       */
      @GuardedBy("evictionLock")
      void maintenance(@Nullable Runnable task) {
        lazySetDrainStatus(PROCESSING_TO_IDLE);
    
        try {
          drainReadBuffer();
    
          drainWriteBuffer();
          if (task != null) {
            task.run();
          }
    
          drainKeyReferences();
          drainValueReferences();
    
          expireEntries();
          evictEntries();
        } finally {
          if ((drainStatus() != PROCESSING_TO_IDLE) || !casDrainStatus(PROCESSING_TO_IDLE, IDLE)) {
            lazySetDrainStatus(REQUIRED);
          }
        }
      }
    

    可见以上方法中,分别执行了drainReadBuffer,drainWriteBuffer,drainKeyReferences,drainValueReferences,expireEntries,evictEntries,这些方法都是对缓存进行回收的方法。


    六、总结

    Caffeine是一个非常大的内容,更多的相关知识,可以查看相关的文档或者源码。


    如果觉得有收获就点个赞吧,更多知识,请点击关注查看我的主页信息哦~

    相关文章

      网友评论

        本文标题:Caffeine 知识扩充

        本文链接:https://www.haomeiwen.com/subject/axxwmrtx.html