美文网首页
堆外内存及其在 RxCache 中的使用

堆外内存及其在 RxCache 中的使用

作者: fengzhizi715 | 来源:发表于2019-01-13 23:29 被阅读0次
    beautiful.jpg

    RxCache

    RxCache 是一款支持 Java 和 Android 的 Local Cache 。目前,支持堆内存、堆外内存(off-heap memory)、磁盘缓存。

    github地址:https://github.com/fengzhizi715/RxCache

    堆外内存(off-heap memory)

    对象可以存储在 堆内存、堆外内存、磁盘缓存甚至是分布式缓存。

    在 Java 中,与堆外内存相对的是堆内存。堆内存遵守 JVM 的内存管理机制,而堆外内存不受到此限制,它由操作系统进行管理。

    JVM的内存管理以及堆外内存.jpg

    堆外内存和堆内存有明显的区别,或者说有相反的应用场景。

    堆外内存更适合:

    • 存储生命周期长的对象
    • 可以在进程间可以共享,减少 JVM 间的对象复制,使得 JVM 的分割部署更容易实现。
    • 本地缓存,减少磁盘缓存或者分布式缓存的响应时间。

    RxCache 中使用的堆外内存

    首先,创建一个 DirectBufferConverter ,用于将对象和 ByteBuffer 相互转换,以及对象和byte数组相互转换。其中,ByteBuffer.allocteDirect(capability) 用于分配堆外内存。Cleaner 是自己定义的一个类,用于释放 DirectByteBuffer。具体代码可以查看:https://github.com/fengzhizi715/RxCache/blob/master/offheap/src/main/java/com/safframework/rxcache/offheap/converter/Cleaner.java

    public abstract class DirectBufferConverter<V> {
    
        public void dispose(ByteBuffer direct) {
    
            Cleaner.clean(direct);
        }
    
        public ByteBuffer to(V from) {
            if(from == null) return null;
    
            byte[] bytes = toBytes(from);
            ByteBuffer.wrap(bytes);
            ByteBuffer bf = ByteBuffer.allocateDirect(bytes.length);
            bf.put(bytes);
            bf.flip();
            return bf;
        }
    
        abstract public byte[] toBytes(V value);
    
        abstract public V toObject(byte[] value);
    
        public V from(ByteBuffer to) {
            if(to == null) return null;
    
            byte[] bs = new byte[to.capacity()];
            to.get(bs);
            to.flip();
            return toObject(bs);
        }
    
    }
    

    接下来,定义一个 ConcurrentDirectHashMap<K, V> 实现Map接口。它是一个范性,支持将 V 转换成 ByteBuffer 类型,存储到 ConcurrentDirectHashMap 的 map 中。

    public abstract class ConcurrentDirectHashMap<K, V> implements Map<K, V> {
    
        final private Map<K, ByteBuffer> map;
    
        private final DirectBufferConverter<V> converter = new DirectBufferConverter<V>() {
    
            @Override
            public byte[] toBytes(V value) {
                return convertObjectToBytes(value);
            }
    
            @Override
            public V toObject(byte[] value) {
                return convertBytesToObject(value);
            }
        };
    
        ConcurrentDirectHashMap() {
    
            map = new ConcurrentHashMap<>();
        }
    
        ConcurrentDirectHashMap(Map<K, V> m) {
    
            map = new ConcurrentHashMap<>();
    
            for (Entry<K, V> entry : m.entrySet()) {
                K key = entry.getKey();
                ByteBuffer val = converter.to(entry.getValue());
                map.put(key, val);
            }
        }
    
        protected abstract byte[] convertObjectToBytes(V value);
    
        protected abstract V convertBytesToObject(byte[] value);
    
        @Override
        public int size() {
            return map.size();
        }
    
        @Override
        public boolean isEmpty() {
            return map.isEmpty();
        }
    
        @Override
        public boolean containsKey(Object key) {
            return map.containsKey(key);
        }
    
        @Override
        public V get(Object key) {
            final ByteBuffer byteBuffer = map.get(key);
            return converter.from(byteBuffer);
        }
    
        @Override
        public V put(K key, V value) {
            final ByteBuffer byteBuffer = map.put(key, converter.to(value));
            converter.dispose(byteBuffer);
            return converter.from(byteBuffer);
        }
    
        @Override
        public V remove(Object key) {
            final ByteBuffer byteBuffer = map.remove(key);
            final V value = converter.from(byteBuffer);
            converter.dispose(byteBuffer);
            return value;
        }
    
        @Override
        public void putAll(Map<? extends K, ? extends V> m) {
            for (Entry<? extends K, ? extends V> entry : m.entrySet()) {
                ByteBuffer byteBuffer = converter.to(entry.getValue());
                map.put(entry.getKey(), byteBuffer);
            }
        }
    
        @Override
        public void clear() {
            final Set<K> keys = map.keySet();
    
            for (K key : keys) {
                map.remove(key);
            }
        }
    
        @Override
        public Set<K> keySet() {
            return map.keySet();
        }
    
        @Override
        public Collection<V> values() {
            Collection<V> values = new ArrayList<>();
    
            for (ByteBuffer byteBuffer : map.values())
            {
                V value = converter.from(byteBuffer);
                values.add(value);
            }
            return values;
        }
    
        @Override
        public Set<Entry<K, V>> entrySet() {
            Set<Entry<K, V>> entries = new HashSet<>();
    
            for (Entry<K, ByteBuffer> entry : map.entrySet()) {
                K key = entry.getKey();
                V value = converter.from(entry.getValue());
    
                entries.add(new Entry<K, V>() {
                    @Override
                    public K getKey() {
                        return key;
                    }
    
                    @Override
                    public V getValue() {
                        return value;
                    }
    
                    @Override
                    public V setValue(V v) {
                        return null;
                    }
                });
            }
    
            return entries;
        }
    
        @Override
        public boolean containsValue(Object value) {
    
            for (ByteBuffer v : map.values()) {
                if (v.equals(value)) {
                    return true;
                }
            }
            return false;
        }
    }
    

    创建 ConcurrentStringObjectDirectHashMap,它的 K 是 String 类型,V 是任意的 Object 对象。其中,序列化和反序列化采用《Java 字节的常用封装》提到的 bytekit

    public class ConcurrentStringObjectDirectHashMap extends ConcurrentDirectHashMap<String,Object> {
    
        @Override
        protected byte[] convertObjectToBytes(Object value) {
    
            return Bytes.serialize(value);
        }
    
        @Override
        protected Object convertBytesToObject(byte[] value) {
    
            return Bytes.deserialize(value);
        }
    }
    

    基于 FIFO 以及堆外内存来实现 Memory 级别的缓存。

    public class DirectBufferMemoryImpl extends AbstractMemoryImpl {
    
        private ConcurrentStringObjectDirectHashMap cache;
        private List<String> keys;
    
        public DirectBufferMemoryImpl(long maxSize) {
    
            super(maxSize);
            cache = new ConcurrentStringObjectDirectHashMap();
            this.keys = new LinkedList<>();
        }
    
        @Override
        public <T> Record<T> getIfPresent(String key) {
    
            T result = null;
    
            if(expireTimeMap.get(key)!=null) {
    
                if (expireTimeMap.get(key)<0) { // 缓存的数据从不过期
    
                    result = (T) cache.get(key);
                } else {
    
                    if (timestampMap.get(key) + expireTimeMap.get(key) > System.currentTimeMillis()) {  // 缓存的数据还没有过期
    
                        result = (T) cache.get(key);
                    } else {                     // 缓存的数据已经过期
    
                        evict(key);
                    }
                }
            }
    
            return result != null ? new Record<>(Source.MEMORY,key, result, timestampMap.get(key),expireTimeMap.get(key)) : null;
        }
    
        @Override
        public <T> void put(String key, T value) {
    
            put(key,value, Constant.NEVER_EXPIRE);
        }
    
        @Override
        public <T> void put(String key, T value, long expireTime) {
    
            if (keySet().size()<maxSize) { // 缓存还有空间
    
                saveValue(key,value,expireTime);
            } else {                       // 缓存空间不足,需要删除一个
    
                if (containsKey(key)) {
    
                    keys.remove(key);
    
                    saveValue(key,value,expireTime);
                } else {
    
                    String oldKey = keys.get(0); // 最早缓存的key
                    evict(oldKey);               // 删除最早缓存的数据 FIFO算法
    
                    saveValue(key,value,expireTime);
                }
            }
        }
    
        private <T> void saveValue(String key, T value, long expireTime) {
    
            cache.put(key,value);
            timestampMap.put(key,System.currentTimeMillis());
            expireTimeMap.put(key,expireTime);
            keys.add(key);
        }
    
        @Override
        public Set<String> keySet() {
    
            return cache.keySet();
        }
    
        @Override
        public boolean containsKey(String key) {
    
            return cache.containsKey(key);
        }
    
        @Override
        public void evict(String key) {
    
            cache.remove(key);
            timestampMap.remove(key);
            expireTimeMap.remove(key);
            keys.remove(key);
        }
    
        @Override
        public void evictAll() {
    
            cache.clear();
            timestampMap.clear();
            expireTimeMap.clear();
            keys.clear();
        }
    }
    

    到了这里,已经完成了堆外内存在 RxCache 中的封装。其实,已经有很多缓存框架都支持堆外内存,例如 Ehcache、MapDB 等。RxCache 目前已经有了 MapDB 的模块。

    总结

    RxCache 是一款 Local Cache,它已经应用到我们项目中,也在我个人的爬虫框架 NetDiscovery 中使用。未来,它会作为一个成熟的组件,不断运用到公司和个人的其他项目中。

    相关文章

      网友评论

          本文标题:堆外内存及其在 RxCache 中的使用

          本文链接:https://www.haomeiwen.com/subject/lboldqtx.html