1.前言
LRU(Least Recently Used),最近最少使用缓存淘汰算法,认为最近访问过的数据在将来被访问的概率也比较大,当内存达到上限去淘汰那些最近访问较少的数据。
在Flink中做维表关联时,如果维表的数据比较大,无法一次性全部加载到内存中,而在业务上也允许一定的延时,那么可以使用LRU策略加载维表数据。
但是如果一条维表数据一直都被缓存命中,这条数据永远都不会被淘汰,这时维表的数据已经发生改变,那么将会在很长时间或者永远都无法更新这条改变,所以需要设置缓存超时时间TTL,当缓存时间超过ttl,会强制性使其失效重新从外部加载进来。
2.常用的LRU使用
<1> Guava Cache
google guava提供了Cache缓存模块,轻量级,适合做本地缓存,能够做到以下几点:
a.可配置本地缓存大小
b.可配置缓存过期时间
c.可配置淘汰策略
实例,通常在使用时,是在继承异步查询类RichAsyncFunction中的open方法中定义一个缓存,如下所示,
应用CacheBuilder来构建
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this._storeCache = CacheBuilder
.newBuilder()
.expireAfterWrite(7, TimeUnit.DAYS)
.concurrencyLevel(this.capacity)
.build(new CacheLoader<Long, Optional<Store>>() {
@Override
public Optional<Store> load(Long storeId) throws Exception {
Store s = null;
List<Store> results = createQueryRunner()
.query(JoinStore.this.sql, new BeanListHandler<>(Store.class), storeId);
if (results != null && results.size() > 0) {
s = results.get(0);
}
return Optional.ofNullable(s);
}
});
}
private Future<Optional<Item>> query(Long itemId) {
return asyncCall(() -> this._itemCache.get(itemId));
}
CacheBuilder通过使用CacheLoader进行自动加载,通过使用get方法获得返回结果,如果有则直接返回,如果没有,则使用CacheLoader的load方法去运算,接着将结果放到缓存中,最后才返回结果。
<2> LinkedHashMap
LinkedHashMap是双向链表+hash表的结构,普通的hash表访问是没有顺序的,通过加上元素之间的指向关系保证元素之间的顺序,默认是按照插入顺序的,插入是链表尾部,取数据是链表头部,也就是访问的顺序与插入的顺序是一致的。要想其具有LRU特性,那么就将其改为访问顺序,插入还是在链表尾部,但是数据访问会将其移动达到链表的尾部,那么最近插入或者访问的数据永远都在链表尾部,被访问较少的数据就在链表的头部,给 LinkedHashMap设置一个大小,当数据大小超过该值,就直接从链表头部移除数据。
LinkedHashMap本身不具有ttl功能,就是无法知晓数据是否过期,可以通过给数据封装一个时间字段insertTimestamp,表示数据加载到内存的时间,当这条记录被命中,首先判断当前时间currentTimestamp与insertTimestamp差值是否达到ttl, 如果达到了就重新从外部存储中查询加载到内存中。
3.LRU方式读取Hbase
实现思路:
- 1.使用Flink 异步IO RichAsyncFunction去异步读取hbase的数据,那么需要hbase 客户端支持异步读取,默认hbase客户端是同步,可使用hbase 提供的asynchbase 客户端;
- 2.初始化一个Cache 并且设置最大缓存容量与数据过期时间;
- 3.数据读取逻辑:先根据Key从Cache中查询value,如果能够查询到则返回,如果没有查询到结果则使用asynchbase查询数据,并且将查询的结果插入Cache中,然后返回
public class HbaseAsyncLRU extends RichAsyncFunction<OrderItemOld, OrderItemOld> {
private HBaseClient hBaseClient;
private LoadingCache<Long, Optional<Item>> _memberCache;
private String tableName;
public HbaseAsyncLRU(String tableName,String zk){
this.hBaseClient=new HBaseClient(zk);
this.tableName=tableName;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
_memberCache= CacheBuilder
.newBuilder()
.maximumSize(2000)
.expireAfterWrite(1, TimeUnit.HOURS)
.build(new CacheLoader<Long, Optional<Item>>(){
@Override
public Optional<Item> load(Long memberId) throws Exception {
//执行hbase查询工作
GetRequest get=new GetRequest(tableName,memberId.toString());
hBaseClient.get(get).addCallbacks(new Callback<String, ArrayList<KeyValue>>() {
@Override
public String call(ArrayList<KeyValue> keyValues) throws Exception {
//todo
return null;
}
}, new Callback<String, Exception>() {
@Override
public String call(Exception e) throws Exception {
//todo
return null;
}
});
return Optional.empty();
}
});
}
@Override
public void asyncInvoke(OrderItemOld orderItemOld, ResultFuture<OrderItemOld> resultFuture) throws Exception {
}
@Override
public void timeout(OrderItemOld input, ResultFuture<OrderItemOld> resultFuture) throws Exception {
}
@Override
public void close() throws Exception {
super.close();
}
}
AsyncHBase的文档地址 http://opentsdb.github.io/asynchbase/ 打不开了,所以关于HBaseClient的部分没有完成。 等之后看看能不能用梯子打开这个文档,再将代码补充完成。
网友评论