美文网首页
Flink维表关联系列3-LRU策略

Flink维表关联系列3-LRU策略

作者: LZhan | 来源:发表于2019-11-08 09:12 被阅读0次
    1.前言

    LRU(Least Recently Used),最近最少使用缓存淘汰算法,认为最近访问过的数据在将来被访问的概率也比较大,当内存达到上限去淘汰那些最近访问较少的数据。

    在Flink中做维表关联时,如果维表的数据比较大,无法一次性全部加载到内存中,而在业务上也允许一定的延时,那么可以使用LRU策略加载维表数据。
    但是如果一条维表数据一直都被缓存命中,这条数据永远都不会被淘汰,这时维表的数据已经发生改变,那么将会在很长时间或者永远都无法更新这条改变,所以需要设置缓存超时时间TTL,当缓存时间超过ttl,会强制性使其失效重新从外部加载进来。

    2.常用的LRU使用

    <1> Guava Cache
    google guava提供了Cache缓存模块,轻量级,适合做本地缓存,能够做到以下几点:
    a.可配置本地缓存大小
    b.可配置缓存过期时间
    c.可配置淘汰策略

    实例,通常在使用时,是在继承异步查询类RichAsyncFunction中的open方法中定义一个缓存,如下所示,
    应用CacheBuilder来构建

     @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
    
            this._storeCache = CacheBuilder
                    .newBuilder()
                    .expireAfterWrite(7, TimeUnit.DAYS)
                    .concurrencyLevel(this.capacity)
                    .build(new CacheLoader<Long, Optional<Store>>() {
    
                        @Override
                        public Optional<Store> load(Long storeId) throws Exception {
                            Store s = null;
    
                            List<Store> results = createQueryRunner()
                                    .query(JoinStore.this.sql, new BeanListHandler<>(Store.class), storeId);
                            if (results != null && results.size() > 0) {
                                s =  results.get(0);
                            }
    
                            return Optional.ofNullable(s);
                        }
    
                    });
        }
    
        private Future<Optional<Item>> query(Long itemId) {
            return asyncCall(() -> this._itemCache.get(itemId));
        }
    

    CacheBuilder通过使用CacheLoader进行自动加载,通过使用get方法获得返回结果,如果有则直接返回,如果没有,则使用CacheLoader的load方法去运算,接着将结果放到缓存中,最后才返回结果。


    <2> LinkedHashMap
    LinkedHashMap是双向链表+hash表的结构,普通的hash表访问是没有顺序的,通过加上元素之间的指向关系保证元素之间的顺序,默认是按照插入顺序的,插入是链表尾部,取数据是链表头部,也就是访问的顺序与插入的顺序是一致的。要想其具有LRU特性,那么就将其改为访问顺序,插入还是在链表尾部,但是数据访问会将其移动达到链表的尾部,那么最近插入或者访问的数据永远都在链表尾部,被访问较少的数据就在链表的头部,给 LinkedHashMap设置一个大小,当数据大小超过该值,就直接从链表头部移除数据。
    LinkedHashMap本身不具有ttl功能,就是无法知晓数据是否过期,可以通过给数据封装一个时间字段insertTimestamp,表示数据加载到内存的时间,当这条记录被命中,首先判断当前时间currentTimestamp与insertTimestamp差值是否达到ttl, 如果达到了就重新从外部存储中查询加载到内存中。

    3.LRU方式读取Hbase

    实现思路:

    • 1.使用Flink 异步IO RichAsyncFunction去异步读取hbase的数据,那么需要hbase 客户端支持异步读取,默认hbase客户端是同步,可使用hbase 提供的asynchbase 客户端;
    • 2.初始化一个Cache 并且设置最大缓存容量与数据过期时间;
    • 3.数据读取逻辑:先根据Key从Cache中查询value,如果能够查询到则返回,如果没有查询到结果则使用asynchbase查询数据,并且将查询的结果插入Cache中,然后返回
    public class HbaseAsyncLRU extends RichAsyncFunction<OrderItemOld, OrderItemOld> {
    
    
    
        private HBaseClient hBaseClient;
    
        private LoadingCache<Long, Optional<Item>> _memberCache;
    
        private String tableName;
    
        public HbaseAsyncLRU(String tableName,String zk){
            this.hBaseClient=new HBaseClient(zk);
            this.tableName=tableName;
        }
    
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            _memberCache= CacheBuilder
                    .newBuilder()
                    .maximumSize(2000)
                    .expireAfterWrite(1, TimeUnit.HOURS)
                    .build(new CacheLoader<Long, Optional<Item>>(){
    
                        @Override
                        public Optional<Item> load(Long memberId) throws Exception {
    
                            //执行hbase查询工作
                            GetRequest get=new GetRequest(tableName,memberId.toString());
                            hBaseClient.get(get).addCallbacks(new Callback<String, ArrayList<KeyValue>>() {
                                @Override
                                public String call(ArrayList<KeyValue> keyValues) throws Exception {
                                    //todo
                                    return null;
                                }
                            }, new Callback<String, Exception>() {
                                @Override
                                public String call(Exception e) throws Exception {
                                    //todo
                                    return null;
                                }
                            });
    
                            return Optional.empty();
                        }
                    });
        }
    
    
        @Override
        public void asyncInvoke(OrderItemOld orderItemOld, ResultFuture<OrderItemOld> resultFuture) throws Exception {
    
        }
    
        @Override
        public void timeout(OrderItemOld input, ResultFuture<OrderItemOld> resultFuture) throws Exception {
    
        }
    
        @Override
        public void close() throws Exception {
            super.close();
        }
    
    }
    

    AsyncHBase的文档地址 http://opentsdb.github.io/asynchbase/ 打不开了,所以关于HBaseClient的部分没有完成。 等之后看看能不能用梯子打开这个文档,再将代码补充完成。

    相关文章

      网友评论

          本文标题:Flink维表关联系列3-LRU策略

          本文链接:https://www.haomeiwen.com/subject/ayrubctx.html