上一章说道GraphTransaction有一个子类CachedGraphTransaction,它增加了额外的缓存功能。
先看构造方法:
public CachedGraphTransaction(HugeGraph graph, BackendStore store) {
super(graph, store);
HugeConfig conf = graph.configuration();
int capacity = conf.get(CoreOptions.VERTEX_CACHE_CAPACITY);
int expire = conf.get(CoreOptions.VERTEX_CACHE_EXPIRE);
this.verticesCache = this.cache("vertex", capacity, expire);
capacity = conf.get(CoreOptions.EDGE_CACHE_CAPACITY);
expire = conf.get(CoreOptions.EDGE_CACHE_EXPIRE);
this.edgesCache = this.cache("edge", capacity, expire);
}
VERTEX_CACHE_CAPACITY 与 EDGE_CACHE_EXPIRE 都为配置文件可配置属性,设置
最大可缓存的节点和边。
Cache cache = CacheManager.instance().cache(name, capacity);
为实际使用的Cache对象。
在CacheManger中可以看到,使用RAMCache 实现了Cache的接口。
RAMCache使用的是内存方式,实现的方式为 LRU算法。
使用map 来记录 插入的对象,
使用queue链表来记录插入记录的 先后关系。
当 插入的节点超过容量的 1/2 的时候,会启动LRU 策略
也就是新的Hit 的节点,会被置换到 投节点来。
// Implement LRU cache
private final ConcurrentMap<Id, LinkNode<Id, Object>> map;
private final LinkedQueueNonBigLock<Id, Object> queue;
关于LRU的实现,网上有很多版本,在Cache里面提供了一个ticker方法。
这个是与expiration相关的。我们看到在cacheManger里面有一个schedular
会定期执行ticker方法。
private TimerTask scheduleTimer(float period) {
TimerTask task = new TimerTask() {
@Override
public void run() {
try {
for (Entry<String, Cache> entry : caches().entrySet()) {
this.tick(entry.getKey(), entry.getValue());
}
} catch (Throwable e) {
LOG.warn("An exception occurred when running tick", e);
}
}
private void tick(String name, Cache cache) {
long start = System.currentTimeMillis();
long items = cache.tick();
long cost = System.currentTimeMillis() - start;
if (cost > LOG_TICK_COST_TIME) {
LOG.info("Cache '{}' expired {} items cost {}ms > {}ms " +
"(size {}, expire {}ms)", name, items, cost,
LOG_TICK_COST_TIME, cache.size(), cache.expire());
}
LOG.debug("Cache '{}' expiration tick cost {}ms", name, cost);
}
};
// Schedule task with the period in seconds
this.timer.schedule(task, 0, (long) (period * 1000.0));
return task;
}
ticker 方法会遍历map里面的缓存数据,如果存活时间超过了 expiration的时间,就会被删除。
介绍完Cached再回到CachedGraphTransaction,他是在什么时候读取缓存的呢?
以查询为例:
private Iterator<HugeVertex> queryVerticesByIds(IdQuery query) {
IdQuery newQuery = new IdQuery(HugeType.VERTEX, query);
List<HugeVertex> vertices = new ArrayList<>(query.ids().size());
for (Id vertexId : query.ids()) {
Object vertex = this.verticesCache.get(vertexId);
if (vertex != null) {
vertices.add((HugeVertex) vertex);
} else {
newQuery.query(vertexId);
}
}
if (vertices.isEmpty()) {
// Just use the origin query if find none from the cache
newQuery = query;
}
if (!newQuery.empty()) {
Iterator<HugeVertex> rs = super.queryVerticesFromBackend(newQuery);
while (rs.hasNext()) {
HugeVertex vertex = rs.next();
vertices.add(vertex);
this.verticesCache.update(vertex.id(), vertex);
}
}
return vertices.iterator();
}
对于有ID 的查询,先通过cache来查。
如果没有ID,则直接查询数据库,
对于查回来的数据集,同步到缓存中,以便不再miss。
hugegraph 缓存存在的问题
对于边的查询
List<HugeEdge> edges = (List<HugeEdge>) this.edgesCache.get(id);
if (edges == null) {
// Iterator can't be cached, caching list instead
edges = ImmutableList.copyOf(super.queryEdgesFromBackend(query));
if (edges.size() <= MAX_CACHE_EDGES_PER_QUERY) {
this.edgesCache.update(id, edges);
}
}
会把整个 query 作为key, result 作为value缓存起来。
edge 包含了 起始两个节点。例如 A_B
对节点A或者B进行更新的时候, hugegraph会把 cache中节点更新,但是并没有更新 缓存的边中涉及的节点。
这个时候就会造成脏数据。
所以在对一些路径查询的时候,会拿不到最新的数据,需要等到缓存失效才行。
正确的做法就是,在更新 节点的时候,需要去边的缓存里看一下,把对应的关联的节点也更新。
网友评论