mapPartitionsWithIndex,这个算子可以拿到每个partition的index
PartitionsWithIndex {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("MapPartitionsWithIndexJava").setMaster("local");
JavaSparkContext sparkContext = new JavaSparkContext(conf);
// 准备一下模拟数据
List<String> studentNames = Arrays.asList("张三", "李四", "王二", "麻子");
JavaRDD<String> studentNamesRDD = sparkContext.parallelize(studentNames, 2);
JavaRDD<String> studentWithClassRDD = studentNamesRDD.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {
@Override
public Iterator<String> call(Integer integer, Iterator<String> stringIterator) throws Exception {
List<String> studentWithClassList = new ArrayList<String>();
while (stringIterator.hasNext()) {
String studentName = stringIterator.next();
String studentWithClass = studentName + "_" + (integer + 1);
studentWithClassList.add(studentWithClass);
}
return studentWithClassList.iterator();
}
}, true);
studentWithClassRDD.foreach(new VoidFunction<String>() {
@Override
public void call(String s) throws Exception {
System.out.println("s = " + s);
}
});
}
}
网友评论