美文网首页
Caffeine缓存

Caffeine缓存

作者: 呆叔么么 | 来源:发表于2019-12-27 14:21 被阅读0次

Caffeine是一种高性能的缓存库,是基于Java 8的最佳(最优)缓存框架。
Cache(缓存),基于Google GuavaCaffeine提供一个内存缓存,大大改善了设计Guava's cacheConcurrentLinkedHashMap 的体验。

LoadingCache<Key, Graph> graphs = Caffeine.newBuilder()
  .maximumSize(10_000) // 缓存大小
  .expireAfterWrite(5, TimeUnit.MINUTES) // 有效时间
  .refreshAfterWrite(1, TimeUnit.MINUTES) // 刷新的间隔
  .build(key -> createExpensiveGraph(key)); // 同步加载

缓存类似于ConcurrentMap,但二者并不完全相同。最基本的区别是,ConcurrentMap保存添加到其中的所有元素,直到显式地删除它们。另一方面,缓存通常配置为自动删除条目,以限制其内存占用。在某些情况下,LoadingCacheAsyncLoadingCache可能很有用,因为它是自动缓存加载的。

Caffeine提供了灵活的结构来创建缓存,并且有以下特性:

自动加载条目到缓存中,可选异步方式
可以基于大小剔除
可以设置过期时间,时间可以从上次访问或上次写入开始计算
异步刷新
keys自动包装在弱引用中
values自动包装在弱引用或软引用中
条目剔除通知
缓存访问统计

1. 加载/填充

Caffeine提供以下四种类型的加载策略:

1.1. Manual(手动加载)

Cache<Key, Graph> cache = Caffeine.newBuilder()
  .expireAfterWrite(10, TimeUnit.MINUTES)
  .maximumSize(10_000)
  .build();
  // 查找条目,如果没有找到则为null
  Graph graph = cache.getIfPresent(key);
// 根据Key查询一个缓存,如果没有调用createExpensiveGraph方法,并将返回值保存到缓存。
// 如果该方法返回Null则manualCache.get返回null,如果该方法抛出异常则manualCache.get抛出异常
  graph = cache.get(key, k -> createExpensiveGraph(key));
// 将一个值放入缓存,如果以前有值就覆盖以前的值
  cache.put(key, graph);
// 删除一个缓存
  cache.invalidate(key);

Cache接口可以显式地控制检索、更新和删除条目。
我们可以通过cache.getIfPresent(key) 方法来获取一个key的值,通过cache.put(key, value)方法显示的将数控放入缓存,但是这样子会覆盖缓原来key的数据。更加建议使用cache.get(key,k - > value) 的方式,get 方法将一个参数为 keyFunction (createExpensiveGraph) 作为参数传入。如果缓存中不存在该键,则调用这个 Function 函数,并将返回值作为该缓存的值插入缓存中。get 方法是以阻塞方式执行调用,即使多个线程同时请求该值也只会调用一次Function方法。这样可以避免与其他线程的写入竞争,这也是为什么使用 get 优于 getIfPresent的原因。

1.2. Loading(同步加载)

LoadingCache<Key, Graph> cache = Caffeine.newBuilder()
  .maximumSize(10_000)
  .expireAfterWrite(10, TimeUnit.MINUTES)
  .build(key -> createExpensiveGraph(key));

  String key = "name1";
  // 采用同步方式去获取一个缓存和上面的手动方式是一个原理。在build Cache的时候会提供一个createExpensiveGraph函数。
  // 查询并在缺失的情况下使用同步的方式来构建一个缓存
  Object graph = loadingCache.get(key);

  // 获取组key的值返回一个Map
  List<String> keys = new ArrayList<>();
  keys.add(key);
  Map<String, Object> graphs = loadingCache.getAll(keys);

LoadingCache通过关联一个CacheLoader来构建Cache
通过LoadingCachegetAll方法,可以批量查询 。默认情况下,getAll将会对缓存中没有值的key分别调用CacheLoader.load方法来构建缓存的值。我们可以重写CacheLoader.loadAll方法来提高getAll的效率。

注意:您可以编写一个CacheLoader.loadAll来实现为特别请求的key加载值。例如,如果计算某个组中的任何键的值将为该组中的所有键提供值,则loadAll可能会同时加载该组的其余部分。

1.3. 异步加载(Asynchronously Loading)

AsyncLoadingCache<String, Object> asyncLoadingCache = Caffeine.newBuilder()
            .maximumSize(10_000)
            .expireAfterWrite(10, TimeUnit.MINUTES)
            // 或者:使用包装为异步的同步计算进行构建
            .buildAsync(key -> createExpensiveGraph(key));
            // 或者:使用返回未来的异步计算构建
            // .buildAsync((key, executor) -> createExpensiveGraphAsync(key, executor));

 String key = "name1";

// 查询并在缺失的情况下使用异步的方式来构建缓存
CompletableFuture<Object> graph = asyncLoadingCache.get(key);
// 查询一组缓存并在缺失的情况下使用异步的方式来构建缓存
List<String> keys = new ArrayList<>();
keys.add(key);
CompletableFuture<Map<String, Object>> graphs = asyncLoadingCache.getAll(keys);
// 异步转同步
loadingCache = asyncLoadingCache.synchronous();

AsyncLoadingCache是继承自LoadingCache类的,异步加载使用Executor去调用方法并返回一个CompletableFuture。异步加载缓存使用了响应式编程模型。

如果要以同步方式调用时,应提供CacheLoader。要以异步表示时,应该提供一个AsyncCacheLoader,并返回一个CompletableFuture

synchronous()这个方法返回了一个LoadingCacheView视图,LoadingCacheView也继承自LoadingCache。调用该方法后就相当于你将一个异步加载的缓存AsyncLoadingCache转换成了一个同步加载的缓存LoadingCache

默认使用ForkJoinPool.commonPool()来执行异步线程,但是我们可以通过Caffeine.executor(Executor) 方法来替换线程池。

2. 剔除

Caffeine提供三种剔除方式:基于大小基于时间基于引用

2.1基于大小(size-based)

基于大小驱逐,有两种方式:一种是基于缓存大小,一种是基于权重。

// Evict based on the number of entries in the cache
// 根据缓存的计数进行驱逐
LoadingCache<Key, Graph> graphs = Caffeine.newBuilder()
    .maximumSize(10_000)
    .build(key -> createExpensiveGraph(key));

// Evict based on the number of vertices in the cache
// 根据缓存的权重来进行驱逐(权重只是用于确定缓存大小,不会用于决定该缓存是否被驱逐)
LoadingCache<Key, Graph> graphs = Caffeine.newBuilder()
    .maximumWeight(10_000)
    .weigher((Key key, Graph graph) -> graph.vertices().size())
    .build(key -> createExpensiveGraph(key));

如果缓存的条目数量不会超过某个值,那么可以使用Caffeine.maximumSize(long)。如果超过这个值,则会剔除很久没有被访问过或者不经常使用的那个条目。

如果,不同的条目有不同的权重值的话,那么你可以用Caffeine.weigher(Weigher)来指定一个权重函数,并且使用Caffeine.maximumWeight(long)来设定最大的权重值。

简单的来说,要么限制缓存条目的数量,要么限制缓存条目的权重值,二者取其一。限制数量很好理解,限制权重的话首先你得提供一个函数来设定每个条目的权重值是多少,然后才能显示最大的权重是多少。

2.2 基于时间(Time-based)

// Evict based on a fixed expiration policy
// 基于固定的到期策略进行退出
LoadingCache<Key, Graph> graphs = Caffeine.newBuilder()
    .expireAfterAccess(5, TimeUnit.MINUTES)
    .build(key -> createExpensiveGraph(key));
LoadingCache<Key, Graph> graphs = Caffeine.newBuilder()
    .expireAfterWrite(10, TimeUnit.MINUTES)
    .build(key -> createExpensiveGraph(key));

// Evict based on a varying expiration policy
// 基于不同的到期策略进行退出
LoadingCache<Key, Graph> graphs = Caffeine.newBuilder()
    .expireAfter(new Expiry<Key, Graph>() {
      @Override
      public long expireAfterCreate(Key key, Graph graph, long currentTime) {
        // Use wall clock time, rather than nanotime, if from an external resource
        long seconds = graph.creationDate().plusHours(5)
            .minus(System.currentTimeMillis(), MILLIS)
            .toEpochSecond();
        return TimeUnit.SECONDS.toNanos(seconds);
      }
      
      @Override
      public long expireAfterUpdate(Key key, Graph graph, 
          long currentTime, long currentDuration) {
        return currentDuration;
      }
      
      @Override
      public long expireAfterRead(Key key, Graph graph,
          long currentTime, long currentDuration) {
        return currentDuration;
      }
    })
    .build(key -> createExpensiveGraph(key));

Caffeine提供了三种定时驱逐策略:

expireAfterAccess(long, TimeUnit):在最后一次访问或者写入后开始计时,在指定的时间后过期。假如一直有请求访问该key,那么这个缓存将一直不会过期。
expireAfterWrite(long, TimeUnit): 在最后一次写入缓存后开始计时,在指定的时间后过期。
expireAfter(Expiry): 自定义策略,过期时间由Expiry实现独自计算。
缓存的删除策略使用的是惰性删除和定时删除。这两个删除策略的时间复杂度都是O(1)。
测试定时驱逐不需要等到时间结束。我们可以使用Ticker接口和Caffeine.ticker(Ticker)方法在缓存生成器中指定时间源,而不必等待系统时钟。

FakeTicker ticker = new FakeTicker(); // Guava's testlib
Cache<Key, Graph> cache = Caffeine.newBuilder()
    .expireAfterWrite(10, TimeUnit.MINUTES)
    .executor(Runnable::run)
    .ticker(ticker::read)
    .maximumSize(10)
    .build();

cache.put(key, graph);
ticker.advance(30, TimeUnit.MINUTES)
assertThat(cache.getIfPresent(key), is(nullValue());

2.3基于引用(reference-based)

强引用,软引用,弱引用概念说明请点击连接,这里说一下各各引用的区别:

Java4种引用的级别由高到低依次为:强引用 > 软引用 > 弱引用 > 虚引用

引用类型 被垃圾回收时间 用途 生存时间
强引用 从来不会 对象的一般状态 JVM停止运行时终止
软引用 在内存不足时 对象缓存 内存不足时终止
弱引用 在垃圾回收时 对象缓存 gc运行后终止
虚引用 Unknown Unknown Unknown
// Evict when neither the key nor value are strongly reachable
// 当key和value都没有引用时驱逐缓存
LoadingCache<Key, Graph> graphs = Caffeine.newBuilder()
    .weakKeys()
    .weakValues()
    .build(key -> createExpensiveGraph(key));

// Evict when the garbage collector needs to free memory
// 当垃圾收集器需要释放内存时驱逐
LoadingCache<Key, Graph> graphs = Caffeine.newBuilder()
    .softValues()
    .build(key -> createExpensiveGraph(key));

我们可以将缓存的驱逐配置成基于垃圾回收器。为此,我们可以将key 和 value 配置为弱引用或只将值配置成软引用。

注意:AsyncLoadingCache不支持弱引用和软引用。

Caffeine.weakKeys()使用弱引用存储key。如果没有其他地方对该key有强引用,那么该缓存就会被垃圾回收器回收。由于垃圾回收器只依赖于身份(identity)相等,因此这会导致整个缓存使用身份 (==) 相等来比较 key,而不是使用 equals()。

Caffeine.weakValues() 使用弱引用存储value。如果没有其他地方对该value有强引用,那么该缓存就会被垃圾回收器回收。由于垃圾回收器只依赖于身份(identity)相等,因此这会导致整个缓存使用身份 (==) 相等来比较 key,而不是使用 equals()。

Caffeine.softValues() 使用软引用存储value。当内存满了过后,软引用的对象以将使用最近最少使用(least-recently-used ) 的方式进行垃圾回收。由于使用软引用是需要等到内存满了才进行回收,所以我们通常建议给缓存配置一个使用内存的最大值。 softValues() 将使用身份相等(identity) (==) 而不是equals() 来比较值。

注意Caffeine.weakValues()Caffeine.softValues()不可以一起使用。

3. 删除

术语:

eviction  指受策略影响而被删除
invalidation  值被调用者手动删除
removal  值因eviction或invalidation而发生的一种行为

3.1. 明确地删除

// individual key
cache.invalidate(key)
// bulk keys
cache.invalidateAll(keys)
// all keys
cache.invalidateAll()

3.2. 监听器

Cache<Key, Graph> graphs = Caffeine.newBuilder()
    .removalListener((Key key, Graph graph, RemovalCause cause) ->
        System.out.printf("Key %s was removed (%s)%n", key, cause))
    .build();

您可以通过Caffeine.removalListener(RemovalListener) 为缓存指定一个删除侦听器,以便在删除数据时执行某些操作。 RemovalListener可以获取到keyvalueRemovalCause(删除的原因)。

删除侦听器的里面的操作是使用Executor来异步执行的。默认执行程序是ForkJoinPool.commonPool(),可以通过Caffeine.executor(Executor)覆盖。当操作必须与删除同步执行时,请改为使用CacheWriteCacheWrite将在下面说明。

注意:由RemovalListener抛出的任何异常都会被记录(使用Logger)并不会抛出。

4. 刷新

LoadingCache<Key, Graph> graphs = Caffeine.newBuilder()
    .maximumSize(10_000)
    // 指定在创建缓存或者最近一次更新缓存后经过固定的时间间隔,刷新缓存
    .refreshAfterWrite(1, TimeUnit.MINUTES)
    .build(key -> createExpensiveGraph(key));

刷新和驱逐是不一样的。刷新的是通过LoadingCache.refresh(key)方法来指定,并通过调用CacheLoader.reload方法来执行,刷新key会异步地为这个key加载新的value,并返回旧的值(如果有的话)。驱逐会阻塞查询操作直到驱逐作完成才会进行其他操作。

expireAfterWrite不同的是,refreshAfterWrite将在查询数据的时候判断该数据是不是符合查询条件,如果符合条件该缓存就会去执行刷新操作。例如,您可以在同一个缓存中同时指定refreshAfterWriteexpireAfterWrite,只有当数据具备刷新条件的时候才会去刷新数据,不会盲目去执行刷新操作。如果数据在刷新后就一直没有被再次查询,那么该数据也会过期。

刷新操作是使用Executor异步执行的。默认执行程序是ForkJoinPool.commonPool(),可以通过Caffeine.executor(Executor)覆盖。

如果刷新时引发异常,则使用log记录日志,并不会抛出。

5. 统计

Cache<Key, Graph> graphs = Caffeine.newBuilder()
    .maximumSize(10_000)
    .recordStats()
    .build();

使用Caffeine.recordStats(),你可以打开统计功能。Cache.stats()方法会返回一个CacheStats对象,该对象提供以下统计信息:

hitRate(): 命中率
evictionCount(): 被剔除的条目数量
averageLoadPenalty(): 加载新值所花费的平均时间

6. 示例

终于要说到重点了
一般来讲,用Redis作为一级缓存,Caffeine作为二级缓存

单独使用Caffeine

Controller

 @RestController
 @RequestMapping("/student")
 public class StudentController {
 
     @Autowired
     private StudentService studentService;
 
     @GetMapping("/info/{studentId}")
     public Student info(@PathVariable("studentId") Integer studentId) {
         return studentService.getById(studentId);
     }
 
     @GetMapping("/getAll")
     public List<Student> getAll() {
         return studentService.getAll(Arrays.asList(101, 102, 103, 104, 105));
     }
 
     @GetMapping("/hitRate")
     public Double hitRate() {
         return studentService.hitRate();
     }
 }

Service

 @Service
 public class StudentService {
 
     @Resource(name = "studentCache")
     private LoadingCache<Integer, Student> studentCache;
 
     public Student getById(Integer id) {
         return studentCache.get(id);
     }
 
     public List<Student> getAll(List<Integer> idList) {
         Map<Integer, Student> studentMap = studentCache.getAll(idList);
         return studentMap.values().parallelStream().sorted(Comparator.comparing(Student::getId)).collect(Collectors.toList());
     }
 
     public Double hitRate() {
         return studentCache.stats().hitRate();
     }

     /**
      * 不直接写到本地缓存,而是先写到Redis,然后从Redis中读到本地
      */
}

Config

 @Slf4j
 @Configuration
 public class CacheConfig {
 
     @Autowired
     private StringRedisTemplate stringRedisTemplate;
 
     @Bean("studentCache")
     public LoadingCache<Integer, Student> studentCache() {
           return Caffeine.newBuilder()
                   .maximumSize(10).recordStats()
                   .expireAfterWrite(1, TimeUnit.HOURS)
                 //.scheduler(Scheduler.systemScheduler())  // 需要自定义调度器,用定时任务去主动提前刷新
                   .build(new CacheLoader<Integer, Student>() {
                       @Nullable
                       @Override
                       public Student load(@NonNull Integer key) throws Exception {
                           log.info("从缓存中加载...key={}", key);
                           HashOperations<String, String, String> hashOperations = stringRedisTemplate.opsForHash();
                           String value = hashOperations.get("STU_HS", String.valueOf(key));
                           if (StringUtils.isEmpty(value)) {
                               return null;
                           }
                           return JSON.parseObject(value, Student.class);
                       }
                   });
     }
}

结合Redis实现多级缓存

service

package com.example.express.service;

import com.baomidou.mybatisplus.extension.service.IService;
import com.example.express.domain.bean.DataCompany;

import java.util.List;

public interface DataCompanyService extends IService<DataCompany> {
    List<DataCompany> listAll();

    List<DataCompany> listAllByCache();

    DataCompany getByCache(Integer id);
}

serviceimpl

package com.example.express.service.impl;

import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.example.express.common.cache.CommonDataCache;
import com.example.express.common.constant.RedisKeyConstant;
import com.example.express.domain.bean.DataCompany;
import com.example.express.mapper.DataCompanyMapper;
import com.example.express.service.DataCompanyService;
import io.netty.util.concurrent.DefaultThreadFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;

import java.util.List;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@Slf4j
@Service
public class DataCompanyServiceImpl extends ServiceImpl<DataCompanyMapper, DataCompany> implements DataCompanyService, ApplicationListener<ApplicationStartedEvent> {
    @Autowired
    private DataCompanyMapper dataCompanyMapper; // mybatis-plus
    @Autowired
    private RedisTemplate<String, DataCompany> redisTemplate; // redis

    @Override
    public List<DataCompany> listAll() {
        return dataCompanyMapper.selectList(null);
    }

    @Override
    public List<DataCompany> listAllByCache() {
        List<DataCompany> list = redisTemplate.opsForList().range(RedisKeyConstant.DATA_COMPANY, 0, -1);
        if(list == null) {
            list = listAll();
        }
        return list;
    }

    @Override
    public DataCompany getByCache(Integer id) {
        return CommonDataCache.dataCompanyCache.get(id);
    }
    /**
    * 监听容器启动事件
    * 数据库中查询所有的快递公司,并存放在redis缓存中
    **/
    @Override
    public void onApplicationEvent(ApplicationStartedEvent event) {
        log.info("开始加载快递公司数据...");
        // 数据加载线程池
        ScheduledThreadPoolExecutor executorService = new ScheduledThreadPoolExecutor(1, new DefaultThreadFactory("data-company-loader"));
        executorService.scheduleWithFixedDelay(() -> {
            redisTemplate.delete(RedisKeyConstant.DATA_COMPANY);
            redisTemplate.opsForList().rightPushAll(RedisKeyConstant.DATA_COMPANY, listAll());
        }, 0, 10, TimeUnit.MINUTES);
    }
}

CommonDataCache

package com.example.express.common.cache;

import com.example.express.domain.bean.DataArea;
import com.example.express.domain.bean.DataCompany;
import com.example.express.domain.bean.DataSchool;
import com.example.express.domain.bean.UserEvaluate;
import com.example.express.service.DataAreaService;
import com.example.express.service.DataCompanyService;
import com.example.express.service.DataSchoolService;
import com.example.express.service.UserEvaluateService;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.List;
import java.util.concurrent.TimeUnit;

@Slf4j
@Component
public class CommonDataCache {
    @Autowired
    private DataAreaService dataAreaService;
    @Autowired
    private DataSchoolService dataSchoolService;
    @Autowired
    private DataCompanyService dataCompanyService;
    @Autowired
    private UserEvaluateService userEvaluateService;

    /**
     * 行政区域数据缓存
     * key: parentId
     */
    public static LoadingCache<Integer, List<DataArea>> dataAreaCache;
    /**
     * 学校数据缓存
     * key: 省份
     */
    public static LoadingCache<Integer, List<DataSchool>> dataSchoolCache;
    /**
     * 学校数据缓存
     * key: schoolId
     */
    public static LoadingCache<Integer, DataCompany> dataCompanyCache;
    /**
     * 用户评分Score
     * key: 用户ID
     */
    public static LoadingCache<String, String> userScoreCache;

    @PostConstruct
    private void init() {
        dataAreaCache = Caffeine.newBuilder()
                .maximumSize(35)
                .expireAfterWrite(5, TimeUnit.MINUTES)
                .build(parentId -> dataAreaService.listByParentId(parentId));

        dataSchoolCache = Caffeine.newBuilder()
                .maximumSize(35)
                .expireAfterWrite(1, TimeUnit.MINUTES)
                .build(provinceId -> dataSchoolService.listByProvinceId(provinceId));

        dataCompanyCache = Caffeine.newBuilder()
                .maximumSize(35)
                .expireAfterWrite(1, TimeUnit.MINUTES)
                .build(id -> dataCompanyService.getById(id));

        userScoreCache = Caffeine.newBuilder()
                .maximumSize(35)
                .expireAfterWrite(1, TimeUnit.MINUTES)
                .build(id -> {
                    UserEvaluate evaluate = userEvaluateService.getById(id);
                    return evaluate.getScore().toPlainString();
                });
    }
}

目前就很好的实现了热点数据非热点数据的相互兼容,实现了效率和空间占用的最大化
热点数据: 如:顺丰物流被大家用的最多,所以它的信息可以作为一个热点数据,存在本地缓存中
非热点数据: 如:整个物流公司列表,首先数据量很庞大,不适合存在本地缓存中
来自https://www.jianshu.com/p/9a80c662dac4

相关文章

网友评论

      本文标题:Caffeine缓存

      本文链接:https://www.haomeiwen.com/subject/bnzroctx.html