美文网首页
grahx + java + 广度优先算法

grahx + java + 广度优先算法

作者: NazgulSun | 来源:发表于2021-04-13 16:50 被阅读0次

    给点一个图+ 种子节点,遍历连通的节点

            ClassTag<String> stringTag = scala.reflect.ClassTag$.MODULE$.apply(String.class);
            ClassTag<Double> doubleTag = scala.reflect.ClassTag$.MODULE$.apply(Double.class);
    
            List<Edge<String>> edges = new ArrayList<>();
            edges.add(new Edge<String>(1L, 2L, "Friend1"));
            edges.add(new Edge<String>(1L, 4L, "Friend2"));
            edges.add(new Edge<String>(2L, 4L, "Friend3"));
            edges.add(new Edge<String>(3L, 1L, "Friend4"));
            edges.add(new Edge<String>(3L, 4L, "Friend5"));
            edges.add(new Edge<String>(5L, 6L, "Friend6"));
    
    
            Long rootId = 1L;
    
    
            JavaRDD<Edge<String>> edgesRDD = sc.parallelize(edges);
    
            Graph<Double, String> graph = Graph.fromEdges(edgesRDD.rdd(), Double.POSITIVE_INFINITY,
                    StorageLevel.MEMORY_ONLY(), StorageLevel.MEMORY_ONLY(),doubleTag,stringTag);
    
            Object obj = reflexivity();
            scala.Predef.$eq$colon$eq<Double,Double> tpEqus = ( scala.Predef.$eq$colon$eq<Double,Double>) obj;
            Graph<Double, String> ready = graph.mapVertices(new SimpleInitGraph(),doubleTag,tpEqus);
    
            ready.vertices().toJavaRDD().foreach(v->{
               System.out.println(v);
            } );
    
            Graph<Double, String> bfs = ready.ops().pregel(Double.POSITIVE_INFINITY, 100, EdgeDirection.Either(),
                    new VProj(), new SendMsg(), new ReduceMsg(),doubleTag );
    
            bfs.vertices().toJavaRDD().foreach(v->{
                System.out.println(v);
            } );
    
        }
    
        static public <T> scala.Predef.$eq$colon$eq<T, T> reflexivity() {
            return scala.Predef.$eq$colon$eq$.MODULE$.tpEquals();
        }
    
    
        static class SimpleInitGraph extends AbstractFunction2<Object,Double,Double> implements Serializable {
            @Override
            public Double apply(Object v1, Double v2) {
    
                if(((Long)v1) == 1){
                    return 0.0;
                }
                return Double.POSITIVE_INFINITY;
            }
        }
    
        static class VProj extends AbstractFunction3<Object,Double,Double,Double> implements Serializable {
    
            @Override
            public Double apply(Object v1, Double v2, Double v3) {
                return Math.min(v2,v3);
            }
        }
    
        static class SendMsg extends AbstractFunction1<EdgeTriplet<Double,String>, scala.collection.Iterator<Tuple2<Object,Double>>>
                implements Serializable{
    
            @Override
            public scala.collection.Iterator<Tuple2<Object, Double>> apply(EdgeTriplet<Double, String> v1) {
                List<Tuple2<Object, Double>> res = new ArrayList<>();
                boolean isSrcMarked = v1.srcAttr() != Double.POSITIVE_INFINITY;
                boolean isDstMarked = v1.dstAttr() != Double.POSITIVE_INFINITY;
                if(!(isSrcMarked && isDstMarked)){
                    if(isSrcMarked){
                        res.add(new Tuple2<>(v1.dstId(),v1.srcAttr()+1));
                    }else{
                        res.add(new Tuple2<>(v1.srcId(),v1.dstAttr()+1));
                    }
                }
                return JavaConverters.asScalaIteratorConverter(res.iterator()).asScala();
            }
        }
    
        static class ReduceMsg extends AbstractFunction2<Double,Double,Double> implements Serializable {
            @Override
            public Double apply(Double v1, Double v2) {
                return Math.min(v1,v2);
    
            }
        }
    
    

    相关文章

      网友评论

          本文标题:grahx + java + 广度优先算法

          本文链接:https://www.haomeiwen.com/subject/njsclltx.html