hugegraph Step 模型
Step 接口
几个观点的点:
- step 集成了 Traverser 的 迭代器,一个step 包含了多个traverser。我们可以认为traverser就是对一条边的遍历; 一个step 就是一个遍历的集合,比如访问 v().outE(),
outE() 就是一个范围邻接边的step,有n条边,就有n个traverser; - step的addStrate 方法,可以理解为添加traverser的源节点;
- previousStep,和 nextStep, 就是当前step保存了 前后的step的引用,step 是放在traversal中的,traversal 是一个step 的linklist,代表我们的查询计划;
public interface Step<S, E> extends Iterator<Traverser.Admin<E>>, Serializable, Cloneable {
/**
* Add a iterator of {@link Traverser.Admin} objects of type S to the step.
*
* @param starts The iterator of objects to add
*/
public void addStarts(final Iterator<Traverser.Admin<S>> starts);
/**
* Add a single {@link Traverser.Admin} to the step.
*
* @param start The traverser to add
*/
public void addStart(final Traverser.Admin<S> start);
/**
* Set the step that is previous to the current step.
* Used for linking steps together to form a function chain.
*
* @param step the previous step of this step
*/
public void setPreviousStep(final Step<?, S> step);
/**
* Get the step prior to the current step.
*
* @return The previous step
*/
public Step<?, S> getPreviousStep();
AbstractStep Step的抽象实现
protected ExpandableStepIterator<S> starts;
protected Traverser.Admin<E> nextEnd = null;
protected boolean traverserStepIdAndLabelsSetByChild = false;
protected Step<?, S> previousStep = EmptyStep.instance();
protected Step<E, ?> nextStep = EmptyStep.instance();
public AbstractStep(final Traversal.Admin traversal) {
this.traversal = traversal;
this.starts = new ExpandableStepIterator<>(this);
}
@Override
public void addStarts(final Iterator<Traverser.Admin<S>> starts) {
this.starts.add(starts);
}
@Override
public void addStart(final Traverser.Admin<S> start) {
this.starts.add(start);
}
最重要的是引入了starts的结构,为ExpandableStepIterator, 我们知道一个step 必须要初始化他的start 我们才能运行起来;
start 决定了step的开始节点结合; addStart就是把元素放入到starts结构里面;
接下来看 hasNext方法,就是这个step 是否已经执行完,没有执行完,就继续:
@Override
public boolean hasNext() {
if (null != this.nextEnd)
return true;
else {
try {
while (true) {
if (Thread.interrupted()) throw new TraversalInterruptedException();
this.nextEnd = this.processNextStart();
if (null != this.nextEnd.get() && 0 != this.nextEnd.bulk())
return true;
else
this.nextEnd = null;
}
} catch (final NoSuchElementException e) {
return false;
}
}
}
this.nextEnd = this.processNextStart();
通过nextEnd来判断,没处理一个 start 节点,就会有一个 nextEnd结果,知道NoSuchElement为止;
对于ProcessNextStart,我们看到是由各个具体的step自己来实现的;
举个例子 g.V().outE()
系统首先把gremlin 编译为一个DefaultTraversal, Traversal 是一个iterator 迭代器, 有一个filler负责迭代收集结果:
Traveral 的step-chains
[HugeGraphStep(vertex,[]), HugeVertexStep(OUT,edge)]
@Override
public boolean hasNext() {
if (!this.locked) this.applyStrategies();
return this.lastTraverser.bulk() > 0L || this.finalEndStep.hasNext();
}
finalEndStep 就是 HugeVertexStep: outE()
第一步就是调用 hasNext获取 结果,然后调用this.nextEnd = this.processNextStart();
第一次的调用时候,这是一个空的step; 因为vertexStep 集成了flatMap,所以进入了FlatMap的方法
@Override
protected Traverser.Admin<E> processNextStart() {
while (true) {
if (this.iterator.hasNext()) {
return this.head.split(this.iterator.next(), this);
} else {
closeIterator();
this.head = this.starts.next();
this.iterator = this.flatMap(this.head);
}
}
}
this.head = this.starts.next();
我们之前说到 starts是一个extendedIterator,我们看看他的next方法:
public Traverser.Admin<S> next() {
if (!this.traverserSet.isEmpty())
return this.traverserSet.remove();
/////////////
if (this.hostStep.getPreviousStep().hasNext())
return this.hostStep.getPreviousStep().next();
/////////////
return this.traverserSet.remove();
}
调用了是前一步的next方法,通过这里我们就感觉到了,一个step 是否和被递归调用,通过 尾部的step ,尝试去获得结果,没有就调用它的上一层step去获得结果;
对于这个case 就是会调用 graphStep:
graphStep的关键:
protected Traverser.Admin<E> processNextStart() {
while (true) {
if (this.iterator.hasNext()) {
return this.isStart ? this.getTraversal().getTraverserGenerator().generate(this.iterator.next(), (Step) this, 1l) : this.head.split(this.iterator.next(), this);
} else {
if (this.isStart) {
if (this.done)
throw FastNoSuchElementException.instance();
else {
this.done = true;
this.iterator = null == this.iteratorSupplier ? EmptyIterator.instance() : this.iteratorSupplier.get();
}
} else {
this.head = this.starts.next();
this.iterator = null == this.iteratorSupplier ? EmptyIterator.instance() : this.iteratorSupplier.get();
}
}
}
}
this.iterator = null == this.iteratorSupplier ? EmptyIterator.instance() : this.iteratorSupplier.get();
我们看到,最后是通过IteratorSupplier。get方法获取元素, 那么这个iteratorSupplier就是关键;
回到hugegraph,我们看到hugegraph重写了 HugeGraphStep,本质就是提供这个supplier,从自己的底层获取开始遍历的节点:
这个从低向上不断拉取得过程就是整个traversal 执行大致流程;
以HugeCountGraph 为例,解析如何定制化step
hugegraph 在0.11版本加入了 hugeCount step, 优化了g.V().count()这样的step,可以去后端直接查询统计数据,而不用遍历整个图;
我们看看是怎么做的:
首先需要一个一个strategy,把原油的globalCount换成HugeCount;
public void apply(Traversal.Admin<?, ?> traversal) {
TraversalUtil.convAllHasSteps(traversal);
// Extract CountGlobalStep
List<CountGlobalStep> steps = TraversalHelper.getStepsOfClass(
CountGlobalStep.class, traversal);
if (steps.isEmpty()) {
return;
}
// Find HugeGraphStep before count()
CountGlobalStep<?> originStep = steps.get(0);
List<Step<?, ?>> originSteps = new ArrayList<>();
HugeGraphStep<?, ? extends Element> graphStep = null;
Step<?, ?> step = originStep;
do {
if (!(step instanceof CountGlobalStep ||
step instanceof GraphStep ||
step instanceof IdentityStep ||
step instanceof NoOpBarrierStep ||
step instanceof CollectingBarrierStep) ||
(step instanceof TraversalParent &&
TraversalHelper.anyStepRecursively(s -> {
return s instanceof SideEffectStep ||
s instanceof AggregateStep;
}, (TraversalParent) step))) {
return;
}
originSteps.add(step);
if (step instanceof HugeGraphStep) {
graphStep = (HugeGraphStep<?, ? extends Element>) step;
break;
}
step = step.getPreviousStep();
} while (step != null);
if (graphStep == null) {
return;
}
// Replace with HugeCountStep
graphStep.queryInfo().aggregate(AggregateFunc.COUNT, null);
HugeCountStep<?> countStep = new HugeCountStep<>(traversal, graphStep);
for (Step<?, ?> origin : originSteps) {
traversal.removeStep(origin);
}
traversal.addStep(0, countStep);
}
我们看到 目前只处理g.V().count()或者g.E().count()这种在 graphStep层面上的count,其他模式的count不支持;
接下里,我们看看countStep 是如何实现的。
@Override
protected Admin<Long> processNextStart() throws NoSuchElementException {
if (this.done) {
throw FastNoSuchElementException.instance();
}
this.done = true;
@SuppressWarnings({ "unchecked", "rawtypes" })
Step<Long, Long> step = (Step) this;
return this.getTraversal().getTraverserGenerator()
.generate(this.originGraphStep.count(), step, 1L);
}
我们看到 processNextStart的方法
- 至调用一次,然后就是 FastNoSuchElementException,告诉下游,没有元素了
- 返回count结果,并生成traverser返回给下游;
从整个模型来看,是一个比较典型的从底向上的地鬼调用,我们可以修改 其中的step,做相应的优化。
比如 对于 out().out() 这种两调的查询,我们可以做一个 twohupStep,然后 batch的方式拿回数据,生成traverser。减少目前的onebyone的迭代消耗;
对于之前paypal在infoQ提到的多线程模型查询,目前看,不太清楚是在哪个层面,实在生成多个 traverser,然后并发去拉取? 还是在 step internal 去并发访问存储?
后续继续探索;
网友评论