给点一个图+ 种子节点,遍历连通的节点
ClassTag<String> stringTag = scala.reflect.ClassTag$.MODULE$.apply(String.class);
ClassTag<Double> doubleTag = scala.reflect.ClassTag$.MODULE$.apply(Double.class);
List<Edge<String>> edges = new ArrayList<>();
edges.add(new Edge<String>(1L, 2L, "Friend1"));
edges.add(new Edge<String>(1L, 4L, "Friend2"));
edges.add(new Edge<String>(2L, 4L, "Friend3"));
edges.add(new Edge<String>(3L, 1L, "Friend4"));
edges.add(new Edge<String>(3L, 4L, "Friend5"));
edges.add(new Edge<String>(5L, 6L, "Friend6"));
Long rootId = 1L;
JavaRDD<Edge<String>> edgesRDD = sc.parallelize(edges);
Graph<Double, String> graph = Graph.fromEdges(edgesRDD.rdd(), Double.POSITIVE_INFINITY,
StorageLevel.MEMORY_ONLY(), StorageLevel.MEMORY_ONLY(),doubleTag,stringTag);
Object obj = reflexivity();
scala.Predef.$eq$colon$eq<Double,Double> tpEqus = ( scala.Predef.$eq$colon$eq<Double,Double>) obj;
Graph<Double, String> ready = graph.mapVertices(new SimpleInitGraph(),doubleTag,tpEqus);
ready.vertices().toJavaRDD().foreach(v->{
System.out.println(v);
} );
Graph<Double, String> bfs = ready.ops().pregel(Double.POSITIVE_INFINITY, 100, EdgeDirection.Either(),
new VProj(), new SendMsg(), new ReduceMsg(),doubleTag );
bfs.vertices().toJavaRDD().foreach(v->{
System.out.println(v);
} );
}
static public <T> scala.Predef.$eq$colon$eq<T, T> reflexivity() {
return scala.Predef.$eq$colon$eq$.MODULE$.tpEquals();
}
static class SimpleInitGraph extends AbstractFunction2<Object,Double,Double> implements Serializable {
@Override
public Double apply(Object v1, Double v2) {
if(((Long)v1) == 1){
return 0.0;
}
return Double.POSITIVE_INFINITY;
}
}
static class VProj extends AbstractFunction3<Object,Double,Double,Double> implements Serializable {
@Override
public Double apply(Object v1, Double v2, Double v3) {
return Math.min(v2,v3);
}
}
static class SendMsg extends AbstractFunction1<EdgeTriplet<Double,String>, scala.collection.Iterator<Tuple2<Object,Double>>>
implements Serializable{
@Override
public scala.collection.Iterator<Tuple2<Object, Double>> apply(EdgeTriplet<Double, String> v1) {
List<Tuple2<Object, Double>> res = new ArrayList<>();
boolean isSrcMarked = v1.srcAttr() != Double.POSITIVE_INFINITY;
boolean isDstMarked = v1.dstAttr() != Double.POSITIVE_INFINITY;
if(!(isSrcMarked && isDstMarked)){
if(isSrcMarked){
res.add(new Tuple2<>(v1.dstId(),v1.srcAttr()+1));
}else{
res.add(new Tuple2<>(v1.srcId(),v1.dstAttr()+1));
}
}
return JavaConverters.asScalaIteratorConverter(res.iterator()).asScala();
}
}
static class ReduceMsg extends AbstractFunction2<Double,Double,Double> implements Serializable {
@Override
public Double apply(Double v1, Double v2) {
return Math.min(v1,v2);
}
}
网友评论