美文网首页
(十三)GeoSpark源码解析(二)

(十三)GeoSpark源码解析(二)

作者: Scially | 来源:发表于2019-07-27 16:13 被阅读0次

GeoSpark源码解析(二)

本节我们还是以查询为例,看下GeoSpark如何封装JTS中的索引的。在上节我们简单看了SpaitalRDD,其中降到了JavaRDD<T> rawSpatialRDD,在不使用索引的情况下,也是我们GeoSpark提供了match方法,调用RDD的map方法完成的,本节我们在看一个SpatialRDD的成员indexedRawRDD

public class SpatialRDD<T extends Geometry>
        implements Serializable
{
    /**
     * The raw spatial RDD.
     */
    public JavaRDD<T> rawSpatialRDD;
    
   /**
     * The indexed raw RDD.
     */
    public JavaRDD<SpatialIndex> indexedRawRDD;
    
    ...
 }}

indexedRawRDD需要我们调用buildIndex(final IndexType indexType, boolean buildIndexOnSpatialPartitionedRDD)来构建,我们看下这个函数是如何构建索引的

public void buildIndex(final IndexType indexType, boolean buildIndexOnSpatialPartitionedRDD)
            throws Exception
    {
        if (buildIndexOnSpatialPartitionedRDD == false) {
            //This index is built on top of unpartitioned SRDD
            this.indexedRawRDD = this.rawSpatialRDD.mapPartitions(new IndexBuilder(indexType));
        }
        else {
            if (this.spatialPartitionedRDD == null) {
                throw new Exception("[AbstractSpatialRDD][buildIndex] spatialPartitionedRDD is null. Please do spatial partitioning before build index.");
            }
            this.indexedRDD = this.spatialPartitionedRDD.mapPartitions(new IndexBuilder(indexType));
        }
    }

我们先看buildIndexOnSpatialPartitionedRDDfalse的情况(分区的情况我们下次介绍),第6行代码this.indexedRawRDD = this.rawSpatialRDD.mapPartitions(new IndexBuilder(indexType));又调用了mapPartitions方法,只不过这次传递的是IndexBuilder对象,我们看下这个对象

public final class IndexBuilder<T extends Geometry>
        implements FlatMapFunction<Iterator<T>, SpatialIndex>
{
    IndexType indexType;
    public IndexBuilder(IndexType indexType)
    {
        this.indexType = indexType;
    }
    @Override
    public Iterator<SpatialIndex> call(Iterator<T> objectIterator)
            throws Exception
    {
        SpatialIndex spatialIndex;
        if (indexType == IndexType.RTREE) {
            spatialIndex = new STRtree();
        }
        else {
            spatialIndex = new Quadtree();
        }
        while (objectIterator.hasNext()) {
            T spatialObject = objectIterator.next();
            spatialIndex.insert(spatialObject.getEnvelopeInternal(), spatialObject);
        }
        Set<SpatialIndex> result = new HashSet();
        spatialIndex.query(new Envelope(0.0, 0.0, 0.0, 0.0));
        result.add(spatialIndex);
        return result.iterator();
    }
}

整个类仅有30行左右代码,功能很简单,就是提供一个Map映射函数,indexType是构建索引类型,JTS提供了两种STRTreeeQuadtree,关于他们的原理大家可以去看GIS相关教材,第20行的while循环就开始将RDD中的Geometry添加到索引树中,因为Spark规定call函数是一定要返回一个迭代器的,所以GeoSpark就将spatialIndex加到Set集合,并返回其迭代器。这里补充一点,从这里我们还能得出一点,实际上一个分区只有一个索引,这也与GIS实际情况相吻合。

索引构造完成后,就可以利用索引来进行并行分析了,我们还是以查询为例。

public static <U extends Geometry, T extends Geometry> JavaRDD<T> SpatialRangeQuery(SpatialRDD<T> spatialRDD, U originalQueryGeometry, boolean considerBoundaryIntersection, boolean useIndex)
            throws Exception
    {
        U queryGeometry = originalQueryGeometry;
        if (spatialRDD.getCRStransformation()) {
            queryGeometry = CRSTransformation.Transform(spatialRDD.getSourceEpsgCode(), spatialRDD.getTargetEpgsgCode(), originalQueryGeometry);
        }

        if (useIndex == true) {
            if (spatialRDD.indexedRawRDD == null) {
                throw new Exception("[RangeQuery][SpatialRangeQuery] Index doesn't exist. Please build index on rawSpatialRDD.");
            }
            return spatialRDD.indexedRawRDD.mapPartitions(new RangeFilterUsingIndex(queryGeometry, considerBoundaryIntersection, true));
        }
        else {
            return spatialRDD.getRawSpatialRDD().filter(new RangeFilter(queryGeometry, considerBoundaryIntersection, true));
        }
    }

首先,GeoSpark在第9行先判断是否使用索引,紧接着判断是否构建了索引,若构建了索引,就执行第13行return spatialRDD.indexedRawRDD.mapPartitions(new RangeFilterUsingIndex(queryGeometry, considerBoundaryIntersection, true));我们看RangeFilterUsingIndex这个类。

public class RangeFilterUsingIndex<U extends Geometry, T extends Geometry>
        extends JudgementBase
        implements FlatMapFunction<Iterator<SpatialIndex>, T>
{
    public RangeFilterUsingIndex(U queryWindow, boolean considerBoundaryIntersection, boolean leftCoveredByRight)
    {
        super(queryWindow, considerBoundaryIntersection, leftCoveredByRight);
    }
    @Override
    public Iterator<T> call(Iterator<SpatialIndex> treeIndexes)
            throws Exception
    {
        assert treeIndexes.hasNext() == true;
        SpatialIndex treeIndex = treeIndexes.next();
        List<T> results = new ArrayList<T>();
        List<T> tempResults = treeIndex.query(this.queryGeometry.getEnvelopeInternal());
        for (T tempResult : tempResults) {
            if (leftCoveredByRight) {
                if (match(tempResult, queryGeometry)) {
                    results.add(tempResult);
                }
            }
            else {
                if (match(queryGeometry, tempResult)) {
                    results.add(tempResult);
                }
            }
        }
        return results.iterator();
    }
}

注意到call这个方法,首先在第14行SpatialIndex treeIndex = treeIndexes.next();取出索引treeIndex,然后首先根据查询窗口queryGeometry利用索引树快速查出这个范围内的Geometry(因为是索引查询,结果不是精确的),然后从第17行开始遍历tempResults,调用match方法,相比于直接搜索,优势就在于我们不在搜索整个结果集,当数据量大的时候,是有这很大优势的,match它在父类JudgementBase定义有

public boolean match(Geometry spatialObject, Geometry queryWindow)
    {
        if (considerBoundaryIntersection) {
            if (queryWindow.intersects(spatialObject)) { return true; }
        }
        else {
            if (queryWindow.covers(spatialObject)) { return true; }
        }
        return false;
    }

这里面,我们可以看到第4行和第7行均是利用了JTS来判断的,到这里,就一目了然了,实际上还是我们提供了match这个方法,利用Spark来计算。

到这里,我们就将索引和非索引查询分析完了,下节我们来看下Spark的另一个重要特性分区

相关文章

网友评论

      本文标题:(十三)GeoSpark源码解析(二)

      本文链接:https://www.haomeiwen.com/subject/gryerctx.html