美文网首页
二 Hugeraph 源代码 之 GraphTransacti

二 Hugeraph 源代码 之 GraphTransacti

作者: NazgulSun | 来源:发表于2019-06-19 10:56 被阅读0次

我们从api 调用 addVertex开始,hugegraph就开启了漫长的事务之旅,让我们看看如何自己实现一个事务。

事务的入口

  1. Hugegraph.AddVertex
    从API 的调用链来看,hugegraph 的AddVertex 并没有马上提交,
    而是先放入一个 叫做 addedVertexes 的 Map里面, hugegraph对于
    图的操作都是先放入一个临时的map里面,等需要提交的时候,再对map做一个
    预处理,然后统一提交,可以很好的支持batch 模式,看如下代码。
   @Watched("graph.addVertex-instance")
    public HugeVertex addVertex(HugeVertex vertex) {
        this.checkOwnerThread();

        // Override vertexes in local `removedVertexes`
        this.removedVertexes.remove(vertex.id());
        try {
            this.locksTable.lockReads(LockUtil.VERTEX_LABEL_DELETE,
                                      vertex.schemaLabel().id());
            this.locksTable.lockReads(LockUtil.INDEX_LABEL_DELETE,
                                      vertex.schemaLabel().indexLabels());
            // Ensure vertex label still exists from vertex-construct to lock
            this.graph().vertexLabel(vertex.schemaLabel().id());
            /*
             * No need to lock VERTEX_LABEL_ADD_UPDATE, because vertex label
             * update only can add nullable properties and user data, which is
             * unconcerned with add vertex
             */
            this.beforeWrite();
            this.addedVertexes.put(vertex.id(), vertex);
            this.afterWrite();
        } catch (Throwable e){
            this.locksTable.unlock();
            throw e;
        }
        return vertex;
    }
  1. this.afterWrite 开始进入提交流程

通过对 afterWrite的调用链查找,可以看到 他们调用了 AbstractTransaction 的如下方法。

  • commitToBackEnd
  • commitToBackEnd会调用 prepareCommit方法,该方法需要具体的GraphTransaction进行重写。
  • 调用 BackendStore 的入库方法,并操作的对象进行入库。

其中prepareCommit为关键,我们在下一个小结重点描述

    protected void commit2Backend() {
        BackendMutation mutation = this.prepareCommit();
        assert !mutation.isEmpty();
        this.commitMutation2Backend(mutation);
    }

    protected void commitMutation2Backend(BackendMutation... mutations) {
        assert mutations.length > 0;
        this.committing2Backend = true;

        // If an exception occurred, catch in the upper layer and rollback
        this.store.beginTx();
        for (BackendMutation mutation : mutations) {
            this.store.mutate(mutation);
        }
        this.store.commitTx();

        this.committing2Backend = false;
    }

    protected BackendMutation prepareCommit() {
        // For sub-class preparing data, nothing to do here
        LOG.debug("Transaction prepareCommit()...");
        return this.mutation();
    }

hugegraph 的Transaction 体系。

  1. 类的继承关系

在上篇介绍 hugegraph 对象的时候,提到了系统实现了一套自己的Transaction对象,查看继承体系可以看到
分别为

  • cachedGraphTransaction, 该类被hugegraph直接使用,加入了一个LRU的缓存,提高对节点和边的查询速度。
  • cached transaction 继承于 GraphTransaction,在这里提供 addVertex,addEdge,query 相关的函数,上小节说的 prepareCommit也在这里实现。
  • Graph transaction 继承 抽象类 AbstractTransaction,这里定义了操作事务的通用方法,通常是与节点,边的概念无关,都是些基本操作。
  • Astract Transaction 实现了 Transaction 接口,包含commit,rollback 等方法。

cachedGraphTransaction 只是加入了LRU的cache,我们在cache 章节重点介绍。
下面主要介绍 GraphTransaction 。

  1. GraphTransaction主要功能。
    前面提到最重要的就是 prepareCommit功能,我们看看 在GraphTransaction 里面是怎么实现的
protected BackendMutation prepareCommit() {
        // Serialize and add updates into super.deletions
        if (this.removedVertexes.size() > 0 || this.removedEdges.size() > 0) {
            this.prepareDeletions(this.removedVertexes, this.removedEdges);
        }
        // Serialize and add updates into super.additions
        if (this.addedVertexes.size() > 0 || this.addedEdges.size() > 0) {
            this.prepareAdditions(this.addedVertexes, this.addedEdges);
        }
        return this.mutation();
    }

    protected void prepareAdditions(Map<Id, HugeVertex> addedVertexes,
                                    Map<Id, HugeEdge> addedEdges) {
        if (this.checkVertexExist) {
            this.checkVertexExistIfCustomizedId(addedVertexes);
        }
        // Do vertex update
        for (HugeVertex v : addedVertexes.values()) {
            assert !v.removed();
            v.committed();
            // Add vertex entry
            this.doInsert(this.serializer.writeVertex(v));
            // Update index of vertex(only include props)
            this.indexTx.updateVertexIndex(v, false);
            this.indexTx.updateLabelIndex(v, false);
        }

        // Do edge update
        for (HugeEdge e : addedEdges.values()) {
            assert !e.removed();
            e.committed();
            // Skip edge if its owner has been removed
            if (this.removingEdgeOwner(e)) {
                continue;
            }
            // Add edge entry of OUT and IN
            this.doInsert(this.serializer.writeEdge(e));
            this.doInsert(this.serializer.writeEdge(e.switchOwner()));
            // Update index of edge
            this.indexTx.updateEdgeIndex(e, false);
            this.indexTx.updateLabelIndex(e, false);
        }
    }

程序从临时的map 里面拿出来所有需要添加的节点,然后依次调用

            this.doInsert(this.serializer.writeVertex(v));
            // Update index of vertex(only include props)
            this.indexTx.updateVertexIndex(v, false);
            this.indexTx.updateLabelIndex(v, false);
  • doInsert 加入该节点到 真正的backendStore 后端等待 commit时候提交。
  • updateVertexIndex 和 updateLabelIndex 为 SchemaTransaction 的方法,主要是在添加新的节点之后,为他们建立索引的。
  • schemaTransaction 是和 GraphTransaction 同级别的类,主要负责 schema 的操作,结构和 GraphTransaction 大同小异。

到这里 整个事务的调用链条就完整了。

  1. 每次 addVertex 调用了两次 commit

我们看到在 AddVertex 方法里面,自己调用了一次 commit。

对于hugegraph 的api,它会把 addVertex 又包裹在一个 commit 函数中,这个函数还会调用一次。
所以调用了两次

        HugeGraph g = graph(manager, graph);
        Vertex vertex = commit(g, () -> g.addVertex(jsonVertex.properties()));

相关文章

网友评论

      本文标题:二 Hugeraph 源代码 之 GraphTransacti

      本文链接:https://www.haomeiwen.com/subject/trehqctx.html