美文网首页
(十四)GeoSpark源码解析(三)

(十四)GeoSpark源码解析(三)

作者: Scially | 来源:发表于2019-07-31 19:41 被阅读0次

    GeoSpark源码解析(三)

    本节我们在来看一个SpatialRDD的成员indexedRawRDD

    public class SpatialRDD<T extends Geometry>
            implements Serializable
    {
        /**
         * The raw spatial RDD.
         */
        public JavaRDD<T> rawSpatialRDD;
        /**
         * The spatial partitioned RDD.
         */
        public JavaRDD<T> spatialPartitionedRDD;
        
        ...
     }}
    

    分区可以说是Spark的一个重要特性,幸运的是,GeoSpark自定义了分区策略,以支持空间对象分区。rawSpatialRDDspatialPartitionedRDD的区别就是spatialPartitionedRDD保存的是rawSpatialRDD分区后的RDD。我们来看下GeoSpark是如何实现自定义分区策略的。

    我们首先从SpatialRDDspatialPartitioning方法看,这里首先要传入一个SpatialPartitioner对象。

    public void spatialPartitioning(SpatialPartitioner partitioner)
    {
        this.partitioner = partitioner;
        this.spatialPartitionedRDD = partition(partitioner);
    }
    

    SpatialPartitioner是一个抽象类,继承了Spark中的Partitioner方法,可以看到,若想自定义分区策略,那么只需要实现这两个函数,第一个函数是告诉Spark要分成多少区,第二个函数是将对象与分区ID对应起来。

    image.png
    abstract class Partitioner extends Serializable {
      def numPartitions: Int
      def getPartition(key: Any): Int
    }
    

    GeoSpark实现了三种分区策略,分别是QuadTreePartitioner,KDBTreePartitioner,FlatGridPartitioner。在选定分区策略后,Geospark就开始调用private JavaRDD<T> partition(final SpatialPartitioner partitioner)方法来进行分区,它是一个私有方法,我们来看他的实现(这里截取了实现的上半部分)

    private JavaRDD<T> partition(final SpatialPartitioner partitioner)
        {
            return this.rawSpatialRDD.flatMapToPair(
                    new PairFlatMapFunction<T, Integer, T>()
                    {
                        @Override
                        public Iterator<Tuple2<Integer, T>> call(T spatialObject)
                                throws Exception
                        {
                            return partitioner.placeObject(spatialObject);
                        }
                    }
            ).partitionBy(partitioner)
            ...
        }
    

    因为Spark的paritionBy需要一个PairRDD(实际上,Spark的paritionBy函数也就是将PariRDD的第一个值传给partitioner获得一个分区ID),所以GeoSpark就先将RDD转为PairRDD,这里注意placeObject这个方法

     @Override
        public <T extends Geometry> Iterator<Tuple2<Integer, T>> placeObject(T spatialObject)
                throws Exception
        {
            Objects.requireNonNull(spatialObject, "spatialObject");
            final int overflowContainerID = grids.size();
            final Envelope envelope = spatialObject.getEnvelopeInternal();
            Set<Tuple2<Integer, T>> result = new HashSet();
            boolean containFlag = false;
            for (int i = 0; i < grids.size(); i++) {
                final Envelope grid = grids.get(i);
                if (grid.covers(envelope)) {
                    result.add(new Tuple2(i, spatialObject));
                    containFlag = true;
                }
                else if (grid.intersects(envelope) || envelope.covers(grid)) {
                    result.add(new Tuple2<>(i, spatialObject));
                }
            }
            if (!containFlag) {
                result.add(new Tuple2<>(overflowContainerID, spatialObject));
            }
            return result.iterator();
        }
    

    以第12行为例,partitioner会首先建好格网,然后对格网进行遍历,若这个格网范围包含或与这个Geometry相交,那就将这格网ID和Geometry构造成一个Tuple并返回,这里的格网ID就是分区ID了。

    然后再将PairRDD转为RDD,就完成了分区操作。

    那这里有个问题,就是partitioner中的格网是如何构建的?并且我们常常调用的是public void spatialPartitioning(GridType gridType, int numPartitions)这个方法,那GeoSpark是如何根据GridType和numPartitions构建格网呢? 我们下节再来分析。

    相关文章

      网友评论

          本文标题:(十四)GeoSpark源码解析(三)

          本文链接:https://www.haomeiwen.com/subject/toqxdctx.html