GeoSpark源码解析(二)
本节我们还是以查询为例,看下GeoSpark如何封装JTS中的索引的。在上节我们简单看了SpaitalRDD
,其中降到了JavaRDD<T> rawSpatialRDD
,在不使用索引的情况下,也是我们GeoSpark提供了match方法,调用RDD的map方法完成的,本节我们在看一个SpatialRDD
的成员indexedRawRDD
public class SpatialRDD<T extends Geometry>
implements Serializable
{
/**
* The raw spatial RDD.
*/
public JavaRDD<T> rawSpatialRDD;
/**
* The indexed raw RDD.
*/
public JavaRDD<SpatialIndex> indexedRawRDD;
...
}}
indexedRawRDD
需要我们调用buildIndex(final IndexType indexType, boolean buildIndexOnSpatialPartitionedRDD)
来构建,我们看下这个函数是如何构建索引的
public void buildIndex(final IndexType indexType, boolean buildIndexOnSpatialPartitionedRDD)
throws Exception
{
if (buildIndexOnSpatialPartitionedRDD == false) {
//This index is built on top of unpartitioned SRDD
this.indexedRawRDD = this.rawSpatialRDD.mapPartitions(new IndexBuilder(indexType));
}
else {
if (this.spatialPartitionedRDD == null) {
throw new Exception("[AbstractSpatialRDD][buildIndex] spatialPartitionedRDD is null. Please do spatial partitioning before build index.");
}
this.indexedRDD = this.spatialPartitionedRDD.mapPartitions(new IndexBuilder(indexType));
}
}
我们先看buildIndexOnSpatialPartitionedRDD
为false
的情况(分区的情况我们下次介绍),第6行代码this.indexedRawRDD = this.rawSpatialRDD.mapPartitions(new IndexBuilder(indexType));
又调用了mapPartitions
方法,只不过这次传递的是IndexBuilder
对象,我们看下这个对象
public final class IndexBuilder<T extends Geometry>
implements FlatMapFunction<Iterator<T>, SpatialIndex>
{
IndexType indexType;
public IndexBuilder(IndexType indexType)
{
this.indexType = indexType;
}
@Override
public Iterator<SpatialIndex> call(Iterator<T> objectIterator)
throws Exception
{
SpatialIndex spatialIndex;
if (indexType == IndexType.RTREE) {
spatialIndex = new STRtree();
}
else {
spatialIndex = new Quadtree();
}
while (objectIterator.hasNext()) {
T spatialObject = objectIterator.next();
spatialIndex.insert(spatialObject.getEnvelopeInternal(), spatialObject);
}
Set<SpatialIndex> result = new HashSet();
spatialIndex.query(new Envelope(0.0, 0.0, 0.0, 0.0));
result.add(spatialIndex);
return result.iterator();
}
}
整个类仅有30行左右代码,功能很简单,就是提供一个Map映射函数,indexType是构建索引类型,JTS提供了两种STRTreee
和Quadtree
,关于他们的原理大家可以去看GIS相关教材,第20行的while循环就开始将RDD中的Geometry添加到索引树中,因为Spark规定call
函数是一定要返回一个迭代器的,所以GeoSpark就将spatialIndex
加到Set集合,并返回其迭代器。这里补充一点,从这里我们还能得出一点,实际上一个分区只有一个索引,这也与GIS实际情况相吻合。
索引构造完成后,就可以利用索引来进行并行分析了,我们还是以查询为例。
public static <U extends Geometry, T extends Geometry> JavaRDD<T> SpatialRangeQuery(SpatialRDD<T> spatialRDD, U originalQueryGeometry, boolean considerBoundaryIntersection, boolean useIndex)
throws Exception
{
U queryGeometry = originalQueryGeometry;
if (spatialRDD.getCRStransformation()) {
queryGeometry = CRSTransformation.Transform(spatialRDD.getSourceEpsgCode(), spatialRDD.getTargetEpgsgCode(), originalQueryGeometry);
}
if (useIndex == true) {
if (spatialRDD.indexedRawRDD == null) {
throw new Exception("[RangeQuery][SpatialRangeQuery] Index doesn't exist. Please build index on rawSpatialRDD.");
}
return spatialRDD.indexedRawRDD.mapPartitions(new RangeFilterUsingIndex(queryGeometry, considerBoundaryIntersection, true));
}
else {
return spatialRDD.getRawSpatialRDD().filter(new RangeFilter(queryGeometry, considerBoundaryIntersection, true));
}
}
首先,GeoSpark在第9行先判断是否使用索引,紧接着判断是否构建了索引,若构建了索引,就执行第13行return spatialRDD.indexedRawRDD.mapPartitions(new RangeFilterUsingIndex(queryGeometry, considerBoundaryIntersection, true));
我们看RangeFilterUsingIndex
这个类。
public class RangeFilterUsingIndex<U extends Geometry, T extends Geometry>
extends JudgementBase
implements FlatMapFunction<Iterator<SpatialIndex>, T>
{
public RangeFilterUsingIndex(U queryWindow, boolean considerBoundaryIntersection, boolean leftCoveredByRight)
{
super(queryWindow, considerBoundaryIntersection, leftCoveredByRight);
}
@Override
public Iterator<T> call(Iterator<SpatialIndex> treeIndexes)
throws Exception
{
assert treeIndexes.hasNext() == true;
SpatialIndex treeIndex = treeIndexes.next();
List<T> results = new ArrayList<T>();
List<T> tempResults = treeIndex.query(this.queryGeometry.getEnvelopeInternal());
for (T tempResult : tempResults) {
if (leftCoveredByRight) {
if (match(tempResult, queryGeometry)) {
results.add(tempResult);
}
}
else {
if (match(queryGeometry, tempResult)) {
results.add(tempResult);
}
}
}
return results.iterator();
}
}
注意到call
这个方法,首先在第14行SpatialIndex treeIndex = treeIndexes.next();
取出索引treeIndex
,然后首先根据查询窗口queryGeometry
利用索引树快速查出这个范围内的Geometry(因为是索引查询,结果不是精确的),然后从第17行开始遍历tempResults
,调用match方法,相比于直接搜索,优势就在于我们不在搜索整个结果集,当数据量大的时候,是有这很大优势的,match
它在父类JudgementBase
定义有
public boolean match(Geometry spatialObject, Geometry queryWindow)
{
if (considerBoundaryIntersection) {
if (queryWindow.intersects(spatialObject)) { return true; }
}
else {
if (queryWindow.covers(spatialObject)) { return true; }
}
return false;
}
这里面,我们可以看到第4行和第7行均是利用了JTS来判断的,到这里,就一目了然了,实际上还是我们提供了match这个方法,利用Spark来计算。
到这里,我们就将索引和非索引查询分析完了,下节我们来看下Spark的另一个重要特性分区。
网友评论