GeoSpark源码解析(三)
本节我们在来看一个SpatialRDD
的成员indexedRawRDD
public class SpatialRDD<T extends Geometry>
implements Serializable
{
/**
* The raw spatial RDD.
*/
public JavaRDD<T> rawSpatialRDD;
/**
* The spatial partitioned RDD.
*/
public JavaRDD<T> spatialPartitionedRDD;
...
}}
分区可以说是Spark的一个重要特性,幸运的是,GeoSpark自定义了分区策略,以支持空间对象分区。rawSpatialRDD
和spatialPartitionedRDD
的区别就是spatialPartitionedRDD
保存的是rawSpatialRDD
分区后的RDD。我们来看下GeoSpark是如何实现自定义分区策略的。
我们首先从SpatialRDD
的spatialPartitioning
方法看,这里首先要传入一个SpatialPartitioner
对象。
public void spatialPartitioning(SpatialPartitioner partitioner)
{
this.partitioner = partitioner;
this.spatialPartitionedRDD = partition(partitioner);
}
SpatialPartitioner
是一个抽象类,继承了Spark中的Partitioner
方法,可以看到,若想自定义分区策略,那么只需要实现这两个函数,第一个函数是告诉Spark要分成多少区,第二个函数是将对象与分区ID对应起来。
abstract class Partitioner extends Serializable {
def numPartitions: Int
def getPartition(key: Any): Int
}
GeoSpark实现了三种分区策略,分别是QuadTreePartitioner
,KDBTreePartitioner
,FlatGridPartitioner
。在选定分区策略后,Geospark就开始调用private JavaRDD<T> partition(final SpatialPartitioner partitioner)
方法来进行分区,它是一个私有方法,我们来看他的实现(这里截取了实现的上半部分)
private JavaRDD<T> partition(final SpatialPartitioner partitioner)
{
return this.rawSpatialRDD.flatMapToPair(
new PairFlatMapFunction<T, Integer, T>()
{
@Override
public Iterator<Tuple2<Integer, T>> call(T spatialObject)
throws Exception
{
return partitioner.placeObject(spatialObject);
}
}
).partitionBy(partitioner)
...
}
因为Spark的paritionBy
需要一个PairRDD(实际上,Spark的paritionBy
函数也就是将PariRDD的第一个值传给partitioner
获得一个分区ID),所以GeoSpark就先将RDD转为PairRDD,这里注意placeObject
这个方法
@Override
public <T extends Geometry> Iterator<Tuple2<Integer, T>> placeObject(T spatialObject)
throws Exception
{
Objects.requireNonNull(spatialObject, "spatialObject");
final int overflowContainerID = grids.size();
final Envelope envelope = spatialObject.getEnvelopeInternal();
Set<Tuple2<Integer, T>> result = new HashSet();
boolean containFlag = false;
for (int i = 0; i < grids.size(); i++) {
final Envelope grid = grids.get(i);
if (grid.covers(envelope)) {
result.add(new Tuple2(i, spatialObject));
containFlag = true;
}
else if (grid.intersects(envelope) || envelope.covers(grid)) {
result.add(new Tuple2<>(i, spatialObject));
}
}
if (!containFlag) {
result.add(new Tuple2<>(overflowContainerID, spatialObject));
}
return result.iterator();
}
以第12行为例,partitioner
会首先建好格网,然后对格网进行遍历,若这个格网范围包含或与这个Geometry相交,那就将这格网ID和Geometry构造成一个Tuple并返回,这里的格网ID就是分区ID了。
然后再将PairRDD转为RDD,就完成了分区操作。
那这里有个问题,就是partitioner
中的格网是如何构建的?并且我们常常调用的是public void spatialPartitioning(GridType gridType, int numPartitions)
这个方法,那GeoSpark是如何根据GridType和numPartitions构建格网呢? 我们下节再来分析。
网友评论