我们从api 调用 addVertex开始,hugegraph就开启了漫长的事务之旅,让我们看看如何自己实现一个事务。
事务的入口
- Hugegraph.AddVertex
从API 的调用链来看,hugegraph 的AddVertex 并没有马上提交,
而是先放入一个 叫做 addedVertexes 的 Map里面, hugegraph对于
图的操作都是先放入一个临时的map里面,等需要提交的时候,再对map做一个
预处理,然后统一提交,可以很好的支持batch 模式,看如下代码。
@Watched("graph.addVertex-instance")
public HugeVertex addVertex(HugeVertex vertex) {
this.checkOwnerThread();
// Override vertexes in local `removedVertexes`
this.removedVertexes.remove(vertex.id());
try {
this.locksTable.lockReads(LockUtil.VERTEX_LABEL_DELETE,
vertex.schemaLabel().id());
this.locksTable.lockReads(LockUtil.INDEX_LABEL_DELETE,
vertex.schemaLabel().indexLabels());
// Ensure vertex label still exists from vertex-construct to lock
this.graph().vertexLabel(vertex.schemaLabel().id());
/*
* No need to lock VERTEX_LABEL_ADD_UPDATE, because vertex label
* update only can add nullable properties and user data, which is
* unconcerned with add vertex
*/
this.beforeWrite();
this.addedVertexes.put(vertex.id(), vertex);
this.afterWrite();
} catch (Throwable e){
this.locksTable.unlock();
throw e;
}
return vertex;
}
- this.afterWrite 开始进入提交流程
通过对 afterWrite的调用链查找,可以看到 他们调用了 AbstractTransaction 的如下方法。
- commitToBackEnd
- commitToBackEnd会调用 prepareCommit方法,该方法需要具体的GraphTransaction进行重写。
- 调用 BackendStore 的入库方法,并操作的对象进行入库。
其中prepareCommit为关键,我们在下一个小结重点描述
protected void commit2Backend() {
BackendMutation mutation = this.prepareCommit();
assert !mutation.isEmpty();
this.commitMutation2Backend(mutation);
}
protected void commitMutation2Backend(BackendMutation... mutations) {
assert mutations.length > 0;
this.committing2Backend = true;
// If an exception occurred, catch in the upper layer and rollback
this.store.beginTx();
for (BackendMutation mutation : mutations) {
this.store.mutate(mutation);
}
this.store.commitTx();
this.committing2Backend = false;
}
protected BackendMutation prepareCommit() {
// For sub-class preparing data, nothing to do here
LOG.debug("Transaction prepareCommit()...");
return this.mutation();
}
hugegraph 的Transaction 体系。
- 类的继承关系
在上篇介绍 hugegraph 对象的时候,提到了系统实现了一套自己的Transaction对象,查看继承体系可以看到
分别为
- cachedGraphTransaction, 该类被hugegraph直接使用,加入了一个LRU的缓存,提高对节点和边的查询速度。
- cached transaction 继承于 GraphTransaction,在这里提供 addVertex,addEdge,query 相关的函数,上小节说的 prepareCommit也在这里实现。
- Graph transaction 继承 抽象类 AbstractTransaction,这里定义了操作事务的通用方法,通常是与节点,边的概念无关,都是些基本操作。
- Astract Transaction 实现了 Transaction 接口,包含commit,rollback 等方法。
cachedGraphTransaction 只是加入了LRU的cache,我们在cache 章节重点介绍。
下面主要介绍 GraphTransaction 。
- GraphTransaction主要功能。
前面提到最重要的就是 prepareCommit功能,我们看看 在GraphTransaction 里面是怎么实现的
protected BackendMutation prepareCommit() {
// Serialize and add updates into super.deletions
if (this.removedVertexes.size() > 0 || this.removedEdges.size() > 0) {
this.prepareDeletions(this.removedVertexes, this.removedEdges);
}
// Serialize and add updates into super.additions
if (this.addedVertexes.size() > 0 || this.addedEdges.size() > 0) {
this.prepareAdditions(this.addedVertexes, this.addedEdges);
}
return this.mutation();
}
protected void prepareAdditions(Map<Id, HugeVertex> addedVertexes,
Map<Id, HugeEdge> addedEdges) {
if (this.checkVertexExist) {
this.checkVertexExistIfCustomizedId(addedVertexes);
}
// Do vertex update
for (HugeVertex v : addedVertexes.values()) {
assert !v.removed();
v.committed();
// Add vertex entry
this.doInsert(this.serializer.writeVertex(v));
// Update index of vertex(only include props)
this.indexTx.updateVertexIndex(v, false);
this.indexTx.updateLabelIndex(v, false);
}
// Do edge update
for (HugeEdge e : addedEdges.values()) {
assert !e.removed();
e.committed();
// Skip edge if its owner has been removed
if (this.removingEdgeOwner(e)) {
continue;
}
// Add edge entry of OUT and IN
this.doInsert(this.serializer.writeEdge(e));
this.doInsert(this.serializer.writeEdge(e.switchOwner()));
// Update index of edge
this.indexTx.updateEdgeIndex(e, false);
this.indexTx.updateLabelIndex(e, false);
}
}
程序从临时的map 里面拿出来所有需要添加的节点,然后依次调用
this.doInsert(this.serializer.writeVertex(v));
// Update index of vertex(only include props)
this.indexTx.updateVertexIndex(v, false);
this.indexTx.updateLabelIndex(v, false);
- doInsert 加入该节点到 真正的backendStore 后端等待 commit时候提交。
- updateVertexIndex 和 updateLabelIndex 为 SchemaTransaction 的方法,主要是在添加新的节点之后,为他们建立索引的。
- schemaTransaction 是和 GraphTransaction 同级别的类,主要负责 schema 的操作,结构和 GraphTransaction 大同小异。
到这里 整个事务的调用链条就完整了。
- 每次 addVertex 调用了两次 commit
我们看到在 AddVertex 方法里面,自己调用了一次 commit。
对于hugegraph 的api,它会把 addVertex 又包裹在一个 commit 函数中,这个函数还会调用一次。
所以调用了两次
HugeGraph g = graph(manager, graph);
Vertex vertex = commit(g, () -> g.addVertex(jsonVertex.properties()));
网友评论