美文网首页
Java Spark 简单示例(八) 自定义分区划分器

Java Spark 简单示例(八) 自定义分区划分器

作者: 憨人Zoe | 来源:发表于2018-09-28 18:20 被阅读0次

    大数据学习交流微信群

    今天花了大量时间来搜索分区划分器相关的资料。感觉大多雷同,无非是Spark 内置的两个分区划分器HashPartitionerRangePartitioner。但是关于RangePartitioner具体用法讲解的都很少。

    HashPartitioner 是绝大多数场景的默认分区划分器。RangePartitioner主要用于RDD的数据排序相关API中,比如sortByKey底层使用的就是RangePartitioner分区划分器。

    我将通过下面demo8演示各个分区划分器的用法以及自定义分区器(第9步)。

    1.test2.txt中创建RDD
    2. 打印初始分区数,分区划分器
    3. 调用flatMapToPair生成包含<K,V>结构的数据的RDD
    4. 打印flatMapToPair后分区数,分区划分器

    //test2.txt
    aaa,bbb,ccc,ddd,eee,fff,ggg
    aaa,ccc,eee
    
    public class demo8 {
        private static String appName = "spark.demo";
        private static String master = "local[*]";
    
        public static void main(String[] args) {
            JavaSparkContext sc = null;
            try {
                //初始化 JavaSparkContext
                SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
                sc = new JavaSparkContext(conf);
    
                JavaRDD<String> rdd = sc.textFile("test2.txt");
    
                System.out.println("初始分区数:" + rdd.getNumPartitions());
                System.out.println("初始分区划分器:" + rdd.partitioner().toString());
    
                JavaPairRDD<String, Integer> pairRDD = rdd.flatMapToPair(new PairFlatMapFunction<String, String, Integer>() {
                    public Iterator<Tuple2<String, Integer>> call(String s) throws Exception {
                        List<Tuple2<String, Integer>> list = new ArrayList<Tuple2<String, Integer>>();
                        String[] arr = s.split(",");
                        for (String ele : arr) {
                            list.add(new Tuple2<String, Integer>(ele, 1));
                        }
                        return list.iterator();
                    }
                }).cache();
    
                System.out.println("flatMapToPair后分区数:" + pairRDD.getNumPartitions());
                System.out.println("flatMapToPair后分区划分器:" + pairRDD.partitioner().toString());
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (sc != null) {
                    sc.close();
                }
            }
        }
    }
    

    输出结果:

    初始分区数:2
    初始分区划分器:Optional.empty
    flatMapToPair后分区数:2
    flatMapToPair后分区划分器:Optional.empty
    

    结果显示,执行flatMapToPair后,分区和划分器都没有发生变化,也说明flatMapToPair操作不会触发RDD默认的划分器。

    5. 执行groupByKey操作,打印分区数和划分器

    JavaPairRDD<String, Iterable<Integer>> groupRDD = pairRDD.groupByKey(4);
    System.out.println("groupByKey后分区数:" + groupRDD.getNumPartitions());
    System.out.println("groupByKey后分区划分器:" + groupRDD.partitioner().toString());
    

    输出结果:

    初始分区数:2
    初始分区划分器:Optional.empty
    flatMapToPair后分区数:2
    flatMapToPair后分区划分器:Optional.empty
    groupByKey后分区数:4
    groupByKey后分区划分器:Optional[org.apache.spark.HashPartitioner@4]
    

    结果显示,执行groupByKey 函数触发了RDDHashPartitioner分区划分器。

    6. 执行sortByKey操作,打印分区数和划分器

    JavaPairRDD<String, Iterable<Integer>> sortPartitionRDD = groupRDD.sortByKey();
    System.out.println("sortByKey后分区数:" + sortPartitionRDD.getNumPartitions());
    System.out.println("sortByKey后分区划分器:" + sortPartitionRDD.partitioner().toString());
    
    初始分区数:2
    初始分区划分器:Optional.empty
    flatMapToPair后分区数:2
    flatMapToPair后分区划分器:Optional.empty
    groupByKey后分区数:4
    groupByKey后分区划分器:Optional[org.apache.spark.HashPartitioner@4]
    sortByKey后分区数:4
    sortByKey后分区划分器:Optional[org.apache.spark.RangePartitioner@b2ba98ec]
    

    结果显示,执行sortByKey 函数触发了RDDRangePartitioner分区划分器,由于sortByKey前正在使用的是HashPartitioner,所以RDD被重新划分区。
    补充:sortByKey 方法有多个重载,包括可以指定排序规则等。

    7. 第5步用一下代码替换,输出结果相同

    JavaPairRDD<String, Integer> hashPartitionRDD = pairRDD.partitionBy(new HashPartitioner(4)).cache();
    System.out.println("partitionBy后分区数:" + hashPartitionRDD.getNumPartitions());
    System.out.println("partitionBy后分区划分器:" + hashPartitionRDD.partitioner().toString());
    

    8. 手动指定调用RangePartitioner分区器,这个分区器用Java实现把我难住了好久,主要是构造函数都看不懂,而且网上demo很难找。功夫不负有心人,在github上搜到了 Partitioning.java

    //Range 分区器
    RDD<Tuple2<String, Integer>> prdd = JavaPairRDD.toRDD(pairRDD);
    RangePartitioner rangePartitioner = new RangePartitioner(4,
        prdd,//待排序元 rdd
        true,//升序
        scala.math.Ordering.String$.MODULE$,//排序类型
        scala.reflect.ClassTag$.MODULE$.apply(String.class));//反射?
    
    //指定分区划分器
    JavaPairRDD<String, Integer> rangePartitionRDD = pairRDD.partitionBy(rangePartitioner).cache();
    System.out.println("partitionBy后分区数:" + rangePartitionRDD.getNumPartitions());
    System.out.println("partitionBy后分区划分器:" + rangePartitionRDD.partitioner().toString());
    
    //打印分区索引:key:value
    Partitioner partitioner = rangePartitionRDD.partitioner().get();
    System.out.println("分区数:" + partitioner.numPartitions());
    List<Tuple2<String, Integer>> resultRange = rangePartitionRDD.collect();
    for (Tuple2<String, Integer> tuple2 : resultRange) {
        System.out.println(partitioner.getPartition(tuple2._1()) + ":" + tuple2._1() + ":" + tuple2._2());
    }
    

    输出结果:

    初始分区数:2
    初始分区划分器:Optional.empty
    flatMapToPair后分区数:2
    flatMapToPair后分区划分器:Optional.empty
    partitionBy后分区数:4
    partitionBy后分区划分器:Optional[org.apache.spark.RangePartitioner@b2ab910c]
    分区数:4
    0:aaa:1
    0:bbb:1
    0:aaa:1
    1:ccc:1
    1:ccc:1
    2:ddd:1
    2:eee:1
    2:eee:1
    3:fff:1
    3:ggg:1
    

    9. 自定义分区器,直接上代码

    package com.yzy.spark;
    
    import org.apache.spark.Partitioner;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    
    public class MyPartitioner extends Partitioner {
        private int partitions; //分区数
    
        // RDD 内部数据和分区索引
        private Map<Object, Integer> hashCodePartitionIndexMap = new ConcurrentHashMap<Object, Integer>();
    
        //构造函数:key的集合,分区数(可选)
        public MyPartitioner(List<String> keys, int partitionsNum) throws Exception {
            if (keys == null || keys.isEmpty()) {
                throw new Exception("keys 不能为空");
            }
            this.partitions = partitionsNum > 0 ? partitionsNum : keys.size();
            for (int i = 0; i < keys.size(); i++) {
                 hashCodePartitionIndexMap.put(keys.get(i), i % this.partitions);
            }
        }
    
        @Override
        public int numPartitions() {
            return this.partitions;
        }
    
        @Override
        public int getPartition(Object key) {
            return hashCodePartitionIndexMap.get(key.toString());
        }
    
        @Override
        public boolean equals(Object obj) {
            if (obj instanceof MyPartitioner) {
                return ((MyPartitioner) obj).partitions == this.partitions;
            }
            return false;
        }
    
        @Override
        public int hashCode() {
            return this.partitions;
        }
    }
    

    demo8 改造

    //获得key集合(去重)
    List<String> keys = pairRDD.map(new Function<Tuple2<String, Integer>, String>() {
        public String call(Tuple2<String, Integer> v1) throws Exception {
            return v1._1();
        }
    }).distinct().collect();
    
    //自定义分区
    JavaPairRDD<String, Integer> myPartitionRDD = pairRDD.partitionBy(new MyPartitioner(keys, 4)).cache();
    System.out.println("partitionBy后分区数:" + myPartitionRDD.getNumPartitions());
    System.out.println("partitionBy后分区划分器:" + myPartitionRDD.partitioner().toString());
    

    结果输出:

    初始分区数:2
    初始分区划分器:Optional.empty
    flatMapToPair后分区数:2
    flatMapToPair后分区划分器:Optional.empty
    partitionBy后分区数:4
    partitionBy后分区划分器:Optional[com.yzy.spark.MyPartitioner@4]
    

    结果显示,自定义分区已生效,接下来打印一下partitioner的分区信息

    Partitioner partitioner = myPartitionRDD.partitioner().get();
    System.out.println("分区数:" + partitioner.numPartitions());
    for (String k : keys) {
        System.out.println(k + " 的分区ID:" + partitioner.getPartition(k));
    }
    

    结果输出:

    分区数:4
    bbb 的分区ID:0
    ddd 的分区ID:1
    fff 的分区ID:2
    ggg 的分区ID:3
    eee 的分区ID:0
    ccc 的分区ID:1
    aaa 的分区ID:2
    

    相关文章

      网友评论

          本文标题:Java Spark 简单示例(八) 自定义分区划分器

          本文链接:https://www.haomeiwen.com/subject/udwpoftx.html