美文网首页
grahx + java + 广度优先算法

grahx + java + 广度优先算法

作者: NazgulSun | 来源:发表于2021-04-13 16:50 被阅读0次

给点一个图+ 种子节点,遍历连通的节点

        ClassTag<String> stringTag = scala.reflect.ClassTag$.MODULE$.apply(String.class);
        ClassTag<Double> doubleTag = scala.reflect.ClassTag$.MODULE$.apply(Double.class);

        List<Edge<String>> edges = new ArrayList<>();
        edges.add(new Edge<String>(1L, 2L, "Friend1"));
        edges.add(new Edge<String>(1L, 4L, "Friend2"));
        edges.add(new Edge<String>(2L, 4L, "Friend3"));
        edges.add(new Edge<String>(3L, 1L, "Friend4"));
        edges.add(new Edge<String>(3L, 4L, "Friend5"));
        edges.add(new Edge<String>(5L, 6L, "Friend6"));


        Long rootId = 1L;


        JavaRDD<Edge<String>> edgesRDD = sc.parallelize(edges);

        Graph<Double, String> graph = Graph.fromEdges(edgesRDD.rdd(), Double.POSITIVE_INFINITY,
                StorageLevel.MEMORY_ONLY(), StorageLevel.MEMORY_ONLY(),doubleTag,stringTag);

        Object obj = reflexivity();
        scala.Predef.$eq$colon$eq<Double,Double> tpEqus = ( scala.Predef.$eq$colon$eq<Double,Double>) obj;
        Graph<Double, String> ready = graph.mapVertices(new SimpleInitGraph(),doubleTag,tpEqus);

        ready.vertices().toJavaRDD().foreach(v->{
           System.out.println(v);
        } );

        Graph<Double, String> bfs = ready.ops().pregel(Double.POSITIVE_INFINITY, 100, EdgeDirection.Either(),
                new VProj(), new SendMsg(), new ReduceMsg(),doubleTag );

        bfs.vertices().toJavaRDD().foreach(v->{
            System.out.println(v);
        } );

    }

    static public <T> scala.Predef.$eq$colon$eq<T, T> reflexivity() {
        return scala.Predef.$eq$colon$eq$.MODULE$.tpEquals();
    }


    static class SimpleInitGraph extends AbstractFunction2<Object,Double,Double> implements Serializable {
        @Override
        public Double apply(Object v1, Double v2) {

            if(((Long)v1) == 1){
                return 0.0;
            }
            return Double.POSITIVE_INFINITY;
        }
    }

    static class VProj extends AbstractFunction3<Object,Double,Double,Double> implements Serializable {

        @Override
        public Double apply(Object v1, Double v2, Double v3) {
            return Math.min(v2,v3);
        }
    }

    static class SendMsg extends AbstractFunction1<EdgeTriplet<Double,String>, scala.collection.Iterator<Tuple2<Object,Double>>>
            implements Serializable{

        @Override
        public scala.collection.Iterator<Tuple2<Object, Double>> apply(EdgeTriplet<Double, String> v1) {
            List<Tuple2<Object, Double>> res = new ArrayList<>();
            boolean isSrcMarked = v1.srcAttr() != Double.POSITIVE_INFINITY;
            boolean isDstMarked = v1.dstAttr() != Double.POSITIVE_INFINITY;
            if(!(isSrcMarked && isDstMarked)){
                if(isSrcMarked){
                    res.add(new Tuple2<>(v1.dstId(),v1.srcAttr()+1));
                }else{
                    res.add(new Tuple2<>(v1.srcId(),v1.dstAttr()+1));
                }
            }
            return JavaConverters.asScalaIteratorConverter(res.iterator()).asScala();
        }
    }

    static class ReduceMsg extends AbstractFunction2<Double,Double,Double> implements Serializable {
        @Override
        public Double apply(Double v1, Double v2) {
            return Math.min(v1,v2);

        }
    }

相关文章

网友评论

      本文标题:grahx + java + 广度优先算法

      本文链接:https://www.haomeiwen.com/subject/njsclltx.html