图中环路检测
一般是三色标记,使用dfs 算法; 三种状态:
- 未访问 visited =0
- 访问,但子节点未访问完 visited=1
- 全部访问,visited =2;
如果在访问过程中, dfs 下一个节点,为visited=1,则 有环;
使用graphx 实现
graphx 一般只能实现基于消息机制的 bfs 算法
在我们的实现中,为每个 node,记录 路径, 然后看下一个节点,是否包含在路径中,如果包含则有图;
每个节点,需要保存路径,内存开销很大,目前还没有找到好的算法实现;
参考,scala 版本
https://codeleading.com/article/35132638895/
java 版本:
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("Graphx Learning");
conf.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
ClassTag<Long> LongTag = scala.reflect.ClassTag$.MODULE$.apply(Long.class);
ClassTag<List<Long>> ListLongTag = scala.reflect.ClassTag$.MODULE$.apply(List.class);
ClassTag<HashMap<Object,Long>> mapTag = scala.reflect.ClassTag$.MODULE$.apply(Map.class);
List<String> edges = Lists.newArrayList(
"1 2",
"2 3",
"3 4",
"4 5",
"5 1",
"5 3",
"6 7",
"7 6",
"7 8",
"8 7",
"1 9",
"9 1"
);
JavaRDD<Edge<Long>> rddEdges = sc.parallelize(edges).map(x->
new Edge(Long.parseLong(x.split(" ")[0]),
Long.parseLong(x.split(" ")[1]),1L));
Graph<List<Long>,Long> graph = Graph.fromEdges(rddEdges.rdd(),new ArrayList<>(),
StorageLevel.MEMORY_ONLY(), StorageLevel.MEMORY_ONLY(),ListLongTag,LongTag);
Object obj = reflexivity();
Predef.$eq$colon$eq<List<Long>,List<Long>> tpEqus = ( Predef.$eq$colon$eq<List<Long>,List<Long>>) obj;
Graph<List<Long>, Long> ready = graph.mapVertices(new SimpleInitGraph(),ListLongTag,tpEqus);
Graph<List<Long>,Long> LPA = ready.ops().pregel(new ArrayList<>(),
10, EdgeDirection.Either(),
new VProj(), new SendMsg(), new ReduceMsg(),ListLongTag );
LPA.vertices().toJavaRDD()
// .filter(x->x._2.get(0).equals(x._2.get(x._2.size()-1)))
.foreach(x->{
System.out.println(x._1+"\t"+x._2);
});
}
static public <T> Predef.$eq$colon$eq<T, T> reflexivity() {
return Predef.$eq$colon$eq$.MODULE$.tpEquals();
}
static class SimpleInitGraph extends AbstractFunction2<Object,List<Long>,List<Long>> implements Serializable {
@Override
public List<Long> apply(Object v1, List<Long> v2) {
return new ArrayList<>();
}
}
static class VProj extends AbstractFunction3<Object,List<Long>,List<Long>,List<Long>> implements Serializable {
@Override
public List<Long> apply(Object v1, List<Long> v2, List<Long> v3) {
List<Long> res = new ArrayList<>();
res.addAll(v3);
return res;
}
}
static class SendMsg extends AbstractFunction1<EdgeTriplet<List<Long>,Long>, Iterator<Tuple2<Object,List<Long>>>>
implements Serializable{
@Override
public Iterator<Tuple2<Object, List<Long>>> apply(EdgeTriplet<List<Long>, Long> v1) {
List<Tuple2<Object,List<Long>>> res = new ArrayList<>();
if(v1.srcAttr().size() == 0){
Object id = v1.dstId();
List<Long> src_des = Lists.newArrayList(v1.srcId(),v1.dstId());
Tuple2<Object,List<Long>> path = new Tuple2<>(id, src_des);
res.add(path);
return JavaConverters.asScalaIteratorConverter(res.iterator()).asScala();
}else{
Long desId = v1.dstId();
List<Long> oldPath = v1.srcAttr();
for(int i=1;i<oldPath.size();i++) {
if (oldPath.get(i).equals(desId)) {
//find a circle, don't repeat
System.out.println("find a circle " + oldPath + "\t" + desId );
return JavaConverters.asScalaIteratorConverter(res.iterator()).asScala();
}
}
Object id = v1.dstId();
List<Long> src_des = new ArrayList<>();
src_des.addAll(oldPath);
src_des.add(desId);
Tuple2<Object,List<Long>> path = new Tuple2<>(id, src_des);
res.add(path);
return JavaConverters.asScalaIteratorConverter(res.iterator()).asScala();
}
}
}
static class ReduceMsg extends AbstractFunction2<List<Long>,List<Long>,List<Long>>
implements Serializable {
@Override
public List<Long> apply(List<Long> v1, List<Long> v2) {
return v1;
}
}
网友评论