美文网首页
Graphx 查询图中的环

Graphx 查询图中的环

作者: NazgulSun | 来源:发表于2021-08-23 17:19 被阅读0次

    图中环路检测

    一般是三色标记,使用dfs 算法; 三种状态:

    • 未访问 visited =0
    • 访问,但子节点未访问完 visited=1
    • 全部访问,visited =2;
      如果在访问过程中, dfs 下一个节点,为visited=1,则 有环;

    使用graphx 实现

    graphx 一般只能实现基于消息机制的 bfs 算法
    在我们的实现中,为每个 node,记录 路径, 然后看下一个节点,是否包含在路径中,如果包含则有图;

    每个节点,需要保存路径,内存开销很大,目前还没有找到好的算法实现;
    参考,scala 版本
    https://codeleading.com/article/35132638895/

    java 版本:

            public static void main(String[] args) {
    
            SparkConf conf = new SparkConf().setAppName("Graphx Learning");
            conf.setMaster("local");
            JavaSparkContext sc = new JavaSparkContext(conf);
    
            ClassTag<Long> LongTag = scala.reflect.ClassTag$.MODULE$.apply(Long.class);
            ClassTag<List<Long>> ListLongTag = scala.reflect.ClassTag$.MODULE$.apply(List.class);
    
                ClassTag<HashMap<Object,Long>> mapTag = scala.reflect.ClassTag$.MODULE$.apply(Map.class);
    
            List<String> edges = Lists.newArrayList(
         "1 2",
                    "2 3",
                    "3 4",
                    "4 5",
                    "5 1",
                    "5 3",
                    "6 7",
                    "7 6",
                    "7 8",
                    "8 7",
                    "1 9",
                    "9 1"
            );
    
    
            JavaRDD<Edge<Long>> rddEdges = sc.parallelize(edges).map(x->
                     new Edge(Long.parseLong(x.split(" ")[0]),
                             Long.parseLong(x.split(" ")[1]),1L));
    
            Graph<List<Long>,Long> graph = Graph.fromEdges(rddEdges.rdd(),new ArrayList<>(),
                    StorageLevel.MEMORY_ONLY(), StorageLevel.MEMORY_ONLY(),ListLongTag,LongTag);
    
            Object obj = reflexivity();
            Predef.$eq$colon$eq<List<Long>,List<Long>> tpEqus = ( Predef.$eq$colon$eq<List<Long>,List<Long>>) obj;
            Graph<List<Long>, Long> ready = graph.mapVertices(new SimpleInitGraph(),ListLongTag,tpEqus);
    
            Graph<List<Long>,Long> LPA = ready.ops().pregel(new ArrayList<>(),
                    10, EdgeDirection.Either(),
                    new VProj(), new SendMsg(), new ReduceMsg(),ListLongTag );
            LPA.vertices().toJavaRDD()
                   // .filter(x->x._2.get(0).equals(x._2.get(x._2.size()-1)))
                    .foreach(x->{
                System.out.println(x._1+"\t"+x._2);
            });
        }
    
    
        static public <T> Predef.$eq$colon$eq<T, T> reflexivity() {
            return Predef.$eq$colon$eq$.MODULE$.tpEquals();
        }
    
    
        static class SimpleInitGraph extends AbstractFunction2<Object,List<Long>,List<Long>> implements Serializable {
    
            @Override
            public List<Long> apply(Object v1, List<Long> v2) {
                return new ArrayList<>();
            }
        }
    
        static class VProj extends AbstractFunction3<Object,List<Long>,List<Long>,List<Long>> implements Serializable {
    
    
            @Override
            public List<Long> apply(Object v1, List<Long> v2, List<Long> v3) {
                List<Long> res = new ArrayList<>();
                res.addAll(v3);
                return res;
            }
        }
    
        static class SendMsg extends AbstractFunction1<EdgeTriplet<List<Long>,Long>, Iterator<Tuple2<Object,List<Long>>>>
                implements Serializable{
    
            @Override
            public Iterator<Tuple2<Object, List<Long>>> apply(EdgeTriplet<List<Long>, Long> v1) {
                List<Tuple2<Object,List<Long>>> res = new ArrayList<>();
                if(v1.srcAttr().size() == 0){
                    Object id = v1.dstId();
                    List<Long> src_des = Lists.newArrayList(v1.srcId(),v1.dstId());
                    Tuple2<Object,List<Long>> path = new Tuple2<>(id, src_des);
                    res.add(path);
                    return JavaConverters.asScalaIteratorConverter(res.iterator()).asScala();
                }else{
                    Long desId = v1.dstId();
                    List<Long> oldPath = v1.srcAttr();
    
                    for(int i=1;i<oldPath.size();i++) {
                        if (oldPath.get(i).equals(desId)) {
                            //find a circle, don't repeat
                            System.out.println("find a circle " + oldPath + "\t" + desId );
                            return JavaConverters.asScalaIteratorConverter(res.iterator()).asScala();
                        }
                    }
                    Object id = v1.dstId();
                    List<Long> src_des = new ArrayList<>();
                    src_des.addAll(oldPath);
                    src_des.add(desId);
                    Tuple2<Object,List<Long>> path = new Tuple2<>(id, src_des);
                    res.add(path);
                    return JavaConverters.asScalaIteratorConverter(res.iterator()).asScala();
    
                }
    
            }
        }
    
        static class ReduceMsg extends AbstractFunction2<List<Long>,List<Long>,List<Long>>
                                implements Serializable {
    
            @Override
            public List<Long> apply(List<Long> v1, List<Long> v2) {
                return v1;
            }
        }
    

    相关文章

      网友评论

          本文标题:Graphx 查询图中的环

          本文链接:https://www.haomeiwen.com/subject/hculiltx.html