Pregel 模型
graphX = graphX.ops().pregel(new ArrayList<String>(),
20, EdgeDirection.Either(),
new VProj(), new SendMsg(), new ReduceMsg(),msgClassTag );
有几个参数需要主要的地方:
EdgeDirection:
- @param activeDirection the direction of edges incident to a vertex that received a message in
- the previous round on which to run
sendMsg
. For example, if this isEdgeDirection.Out
, only - out-edges of vertices that received a message in the previous round will run.
比如有三条边: A->B->C
如果上一轮迭代中, B收到了MSG,那么下一轮迭代的时候会迭代哪些边呢?
这个就是有 EdgeDirection控制;
如果 为 Out,那么 就迭代 B-C;
如果为in,那么就叠戴 A-B,
如果是either,那么迭代 A-B, B-C
如果为both,就不迭代,both的意识是 只有 A-B两个定点 收到消息才会迭代这条边;
Iteration: 为迭代的次数, 一次迭代是指,途中所有节点已经不再更新自己的消息;
每次迭代时候,使用sendMsg 发送消息;
如果对同一个定点发送了多条消息,就会使用 mergeMsg 来合并消息;
而 Vprog,就是节点收到消息之后,做的动作,这个地方有个比较trick 的地方
节点,收到消息之手,会更新属性, 这个更新操作决定了, 这个节点是否继续做迭代;
static class VProj extends AbstractFunction3<Object,SpkVertex,List<String>,SpkVertex> implements Serializable {
/**
* 需要使用newV,VProj 貌似会比较 oldV v2 和 mergeValue 之后的值,如果发生了改变,就认为会认为收到有效的msg,可以继续迭代;
* @param v1
* @param v2
* @param msg
* @return
*/
@Override
public SpkVertex apply(Object v1, SpkVertex v2, List<String> msg) {
//update the old path
SpkVertex newV = new SpkVertex(v2.getLabel());
newV.setProperties(v2.getProperties());
newV.getProperties().put("path", new ArrayList<>(msg));
return newV;
}
}
我的属性为 spkVertex,最开始我用 老的节点,更新属性,然后返回 v2 本身;
而没有正确实现 spkvertex 的 equal & hash 方法; 使得更新了属性之后,还认为是同一个属性;
程序认为,这个消息之后,属性没有更新,所以达到了停止迭代的条件;
正确的做法,就是 正确的实现 equals,hash方法, 我这里直接返回一个新的 vertex对象; 那么就会持续的迭代数据;
这个地方,调试了很久才出坑;
网友评论