美文网首页
十九 Gremlin Step 模型

十九 Gremlin Step 模型

作者: NazgulSun | 来源:发表于2021-11-06 14:51 被阅读0次

    hugegraph Step 模型

    Step 接口

    几个观点的点:

    • step 集成了 Traverser 的 迭代器,一个step 包含了多个traverser。我们可以认为traverser就是对一条边的遍历; 一个step 就是一个遍历的集合,比如访问 v().outE(),
      outE() 就是一个范围邻接边的step,有n条边,就有n个traverser;
    • step的addStrate 方法,可以理解为添加traverser的源节点;
    • previousStep,和 nextStep, 就是当前step保存了 前后的step的引用,step 是放在traversal中的,traversal 是一个step 的linklist,代表我们的查询计划;
    public interface Step<S, E> extends Iterator<Traverser.Admin<E>>, Serializable, Cloneable {
    
        /**
         * Add a iterator of {@link Traverser.Admin} objects of type S to the step.
         *
         * @param starts The iterator of objects to add
         */
        public void addStarts(final Iterator<Traverser.Admin<S>> starts);
    
        /**
         * Add a single {@link Traverser.Admin} to the step.
         *
         * @param start The traverser to add
         */
        public void addStart(final Traverser.Admin<S> start);
    
        /**
         * Set the step that is previous to the current step.
         * Used for linking steps together to form a function chain.
         *
         * @param step the previous step of this step
         */
        public void setPreviousStep(final Step<?, S> step);
    
        /**
         * Get the step prior to the current step.
         *
         * @return The previous step
         */
        public Step<?, S> getPreviousStep();
    
    

    AbstractStep Step的抽象实现

        protected ExpandableStepIterator<S> starts;
        protected Traverser.Admin<E> nextEnd = null;
        protected boolean traverserStepIdAndLabelsSetByChild = false;
    
        protected Step<?, S> previousStep = EmptyStep.instance();
        protected Step<E, ?> nextStep = EmptyStep.instance();
    
        public AbstractStep(final Traversal.Admin traversal) {
            this.traversal = traversal;
            this.starts = new ExpandableStepIterator<>(this);
        }
            @Override
        public void addStarts(final Iterator<Traverser.Admin<S>> starts) {
            this.starts.add(starts);
        }
    
        @Override
        public void addStart(final Traverser.Admin<S> start) {
            this.starts.add(start);
        }
    
    

    最重要的是引入了starts的结构,为ExpandableStepIterator, 我们知道一个step 必须要初始化他的start 我们才能运行起来;
    start 决定了step的开始节点结合; addStart就是把元素放入到starts结构里面;

    接下来看 hasNext方法,就是这个step 是否已经执行完,没有执行完,就继续:

        @Override
        public boolean hasNext() {
            if (null != this.nextEnd)
                return true;
            else {
                try {
                    while (true) {
                        if (Thread.interrupted()) throw new TraversalInterruptedException();
                        this.nextEnd = this.processNextStart();
                        if (null != this.nextEnd.get() && 0 != this.nextEnd.bulk())
                            return true;
                        else
                            this.nextEnd = null;
                    }
                } catch (final NoSuchElementException e) {
                    return false;
                }
            }
        }
    
    

    this.nextEnd = this.processNextStart();
    通过nextEnd来判断,没处理一个 start 节点,就会有一个 nextEnd结果,知道NoSuchElement为止;
    对于ProcessNextStart,我们看到是由各个具体的step自己来实现的;

    举个例子 g.V().outE()

    系统首先把gremlin 编译为一个DefaultTraversal, Traversal 是一个iterator 迭代器, 有一个filler负责迭代收集结果:
    Traveral 的step-chains
    [HugeGraphStep(vertex,[]), HugeVertexStep(OUT,edge)]
    @Override
    public boolean hasNext() {
    if (!this.locked) this.applyStrategies();
    return this.lastTraverser.bulk() > 0L || this.finalEndStep.hasNext();
    }
    finalEndStep 就是 HugeVertexStep: outE()

    第一步就是调用 hasNext获取 结果,然后调用this.nextEnd = this.processNextStart();

    第一次的调用时候,这是一个空的step; 因为vertexStep 集成了flatMap,所以进入了FlatMap的方法

        @Override
        protected Traverser.Admin<E> processNextStart() {
            while (true) {
                if (this.iterator.hasNext()) {
                    return this.head.split(this.iterator.next(), this);
                } else {
                    closeIterator();
                    this.head = this.starts.next();
                    this.iterator = this.flatMap(this.head);
                }
            }
        }
    

    this.head = this.starts.next();

    我们之前说到 starts是一个extendedIterator,我们看看他的next方法:

        public Traverser.Admin<S> next() {
            if (!this.traverserSet.isEmpty())
                return this.traverserSet.remove();
            /////////////
            if (this.hostStep.getPreviousStep().hasNext())
                return this.hostStep.getPreviousStep().next();
            /////////////
            return this.traverserSet.remove();
        }
    

    调用了是前一步的next方法,通过这里我们就感觉到了,一个step 是否和被递归调用,通过 尾部的step ,尝试去获得结果,没有就调用它的上一层step去获得结果;

    对于这个case 就是会调用 graphStep:
    graphStep的关键:

        protected Traverser.Admin<E> processNextStart() {
            while (true) {
                if (this.iterator.hasNext()) {
                    return this.isStart ? this.getTraversal().getTraverserGenerator().generate(this.iterator.next(), (Step) this, 1l) : this.head.split(this.iterator.next(), this);
                } else {
                    if (this.isStart) {
                        if (this.done)
                            throw FastNoSuchElementException.instance();
                        else {
                            this.done = true;
                            this.iterator = null == this.iteratorSupplier ? EmptyIterator.instance() : this.iteratorSupplier.get();
                        }
                    } else {
                        this.head = this.starts.next();
                        this.iterator = null == this.iteratorSupplier ? EmptyIterator.instance() : this.iteratorSupplier.get();
                    }
                }
            }
        }
    

    this.iterator = null == this.iteratorSupplier ? EmptyIterator.instance() : this.iteratorSupplier.get();

    我们看到,最后是通过IteratorSupplier。get方法获取元素, 那么这个iteratorSupplier就是关键;

    回到hugegraph,我们看到hugegraph重写了 HugeGraphStep,本质就是提供这个supplier,从自己的底层获取开始遍历的节点:

    这个从低向上不断拉取得过程就是整个traversal 执行大致流程;

    以HugeCountGraph 为例,解析如何定制化step

    hugegraph 在0.11版本加入了 hugeCount step, 优化了g.V().count()这样的step,可以去后端直接查询统计数据,而不用遍历整个图;
    我们看看是怎么做的:
    首先需要一个一个strategy,把原油的globalCount换成HugeCount;

        public void apply(Traversal.Admin<?, ?> traversal) {
            TraversalUtil.convAllHasSteps(traversal);
    
            // Extract CountGlobalStep
            List<CountGlobalStep> steps = TraversalHelper.getStepsOfClass(
                                          CountGlobalStep.class, traversal);
            if (steps.isEmpty()) {
                return;
            }
    
            // Find HugeGraphStep before count()
            CountGlobalStep<?> originStep = steps.get(0);
            List<Step<?, ?>> originSteps = new ArrayList<>();
            HugeGraphStep<?, ? extends Element> graphStep = null;
            Step<?, ?> step = originStep;
            do {
                if (!(step instanceof CountGlobalStep ||
                      step instanceof GraphStep ||
                      step instanceof IdentityStep ||
                      step instanceof NoOpBarrierStep ||
                      step instanceof CollectingBarrierStep) ||
                     (step instanceof TraversalParent &&
                      TraversalHelper.anyStepRecursively(s -> {
                          return s instanceof SideEffectStep ||
                                 s instanceof AggregateStep;
                      }, (TraversalParent) step))) {
                    return;
                }
                originSteps.add(step);
                if (step instanceof HugeGraphStep) {
                    graphStep = (HugeGraphStep<?, ? extends Element>) step;
                    break;
                }
                step = step.getPreviousStep();
            } while (step != null);
    
            if (graphStep == null) {
                return;
            }
    
            // Replace with HugeCountStep
            graphStep.queryInfo().aggregate(AggregateFunc.COUNT, null);
            HugeCountStep<?> countStep = new HugeCountStep<>(traversal, graphStep);
            for (Step<?, ?> origin : originSteps) {
                traversal.removeStep(origin);
            }
            traversal.addStep(0, countStep);
        }
    

    我们看到 目前只处理g.V().count()或者g.E().count()这种在 graphStep层面上的count,其他模式的count不支持;

    接下里,我们看看countStep 是如何实现的。

    @Override
    protected Admin<Long> processNextStart() throws NoSuchElementException {
        if (this.done) {
            throw FastNoSuchElementException.instance();
        }
        this.done = true;
        @SuppressWarnings({ "unchecked", "rawtypes" })
        Step<Long, Long> step = (Step) this;
        return this.getTraversal().getTraverserGenerator()
                   .generate(this.originGraphStep.count(), step, 1L);
    }
    

    我们看到 processNextStart的方法

    • 至调用一次,然后就是 FastNoSuchElementException,告诉下游,没有元素了
    • 返回count结果,并生成traverser返回给下游;

    从整个模型来看,是一个比较典型的从底向上的地鬼调用,我们可以修改 其中的step,做相应的优化。
    比如 对于 out().out() 这种两调的查询,我们可以做一个 twohupStep,然后 batch的方式拿回数据,生成traverser。减少目前的onebyone的迭代消耗;

    对于之前paypal在infoQ提到的多线程模型查询,目前看,不太清楚是在哪个层面,实在生成多个 traverser,然后并发去拉取? 还是在 step internal 去并发访问存储?
    后续继续探索;

    相关文章

      网友评论

          本文标题:十九 Gremlin Step 模型

          本文链接:https://www.haomeiwen.com/subject/zedjzltx.html