美文网首页
(十二)Geospark源码解析(一)

(十二)Geospark源码解析(一)

作者: Scially | 来源:发表于2019-07-26 17:26 被阅读0次

    Geospark源码解析(一)

    本节我们以查询为例,看下GeoSpark如何利用分布式来实现高效查询。首先,对于Spark来说,想要利用Spark,必须要将自己的类型转为RDD,我们就先看下Geospark是如何读取GeoJson,并将Geometry转为RDD的。

    public class SpatialRDD<T extends Geometry>
            implements Serializable
    {
        /**
         * The raw spatial RDD.
         */
        public JavaRDD<T> rawSpatialRDD;
        
        ...
     }}
    

    Geospark自定义了一个RDD,SpatialRDD,他是一个泛型类,并且泛型要求是Geometry的子类,对于Geometry来说,他的子类有PointLinePolygon等,这个大家可以去看JTS库http://www.tsusiatsoftware.net/jts/main.html。然后我这里列举了SpatialRDD一个重要的成员,对于rawSpatialRDD来说,他里面存储的就是我们的需要分析的Geometry

    GeoSpark提供了PointRDDPolygonRDD等,他们都继承自SpatialRDD,我们以PointRDD为例,分析一下GeoSpark是如何将geojson转为RDD的。

    public PointRDD(JavaSparkContext sparkContext, String InputLocation, Integer Offset, FileDataSplitter splitter,
                boolean carryInputData, Integer partitions, StorageLevel newLevel, String sourceEpsgCRSCode, String targetEpsgCode)
        {
            JavaRDD rawTextRDD = partitions != null ? sparkContext.textFile(InputLocation, partitions) : sparkContext.textFile(InputLocation);
            if (Offset != null) {this.setRawSpatialRDD(rawTextRDD.mapPartitions(new PointFormatMapper(Offset, splitter, carryInputData)));}
            else {this.setRawSpatialRDD(rawTextRDD.mapPartitions(new PointFormatMapper(splitter, carryInputData)));}
            if (sourceEpsgCRSCode != null && targetEpsgCode != null) { this.CRSTransform(sourceEpsgCRSCode, targetEpsgCode);}
            if (newLevel != null) { this.analyze(newLevel);}
            if (splitter.equals(FileDataSplitter.GEOJSON)) { this.fieldNames = FormatMapper.readGeoJsonPropertyNames(rawTextRDD.take(1).get(0).toString()); }
        }
    

    这是PointRDD常用的一个构造函数,其中第4行JavaRDD rawTextRDD = partitions != null ? sparkContext.textFile(InputLocation, partitions) : sparkContext.textFile(InputLocation);则是利用Spark的原生方法将geojson首先转为一个RDD,他的类型可以理解为是String,第7行if (sourceEpsgCRSCode != null && targetEpsgCode != null) { this.CRSTransform(sourceEpsgCRSCode, targetEpsgCode);}则是做了一个坐标转换,关键是第5行this.setRawSpatialRDD(rawTextRDD.mapPartitions(new PointFormatMapper(Offset, splitter, carryInputData)));

    在第5行中,Geospark首先调用了mapPartitions方法来将rawTextRDD中的每一行转为Geometry,其中pointFormatMapper中有一个方法

    public Iterator<T> call(Iterator<String> stringIterator)
                throws Exception
        {
            List<T> result = new ArrayList<>();
            while (stringIterator.hasNext()) {
                String line = stringIterator.next();
                addGeometry(readGeometry(line), result);
            }
            return result.iterator();
        }
    

    他是一个重载,函数参数stringIterator是每个分区的所有string,Geospark遍历这个集合,在每一行调用了一个addGeometry方法,将String转为Geometry,这个方法就不细讲,主要是解析GeoJson,感兴趣的可以去看GeoSpark源码。

    这样构造完成后,就将GeoJson转为了一个RDD,此时我们还没有构建空间索引,但是对于大数据量的空间数据我们已经可以利用Spark的RDD进行并行计算了。

    public static <U extends Geometry, T extends Geometry> JavaRDD<T> SpatialRangeQuery(SpatialRDD<T> spatialRDD, U originalQueryGeometry, boolean considerBoundaryIntersection, boolean useIndex)
                throws Exception
        {
            U queryGeometry = originalQueryGeometry;
            if (spatialRDD.getCRStransformation()) {
                queryGeometry = CRSTransformation.Transform(spatialRDD.getSourceEpsgCode(), spatialRDD.getTargetEpgsgCode(), originalQueryGeometry);
            }
    
            if (useIndex == true) {
                if (spatialRDD.indexedRawRDD == null) {
                    throw new Exception("[RangeQuery][SpatialRangeQuery] Index doesn't exist. Please build index on rawSpatialRDD.");
                }
                return spatialRDD.indexedRawRDD.mapPartitions(new RangeFilterUsingIndex(queryGeometry, considerBoundaryIntersection, true));
            }
            else {
                return spatialRDD.getRawSpatialRDD().filter(new RangeFilter(queryGeometry, considerBoundaryIntersection, true));
            }
        }
    

    这里我们看第16行return spatialRDD.getRawSpatialRDD().filter(new RangeFilter(queryGeometry, considerBoundaryIntersection, true));在第9行if (useIndex == true)判断不用索引时,就会跳到第16行,本质上还是用了RDD来利用自定义函数进行判断,如果是真,就过滤出来,我们看RangeFilter这个类。

    public class RangeFilter<U extends Geometry, T extends Geometry>
            extends JudgementBase
            implements Function<T, Boolean>
    {
        public RangeFilter(U queryWindow, boolean considerBoundaryIntersection, boolean leftCoveredByRight)
        {
            super(queryWindow, considerBoundaryIntersection, leftCoveredByRight);
        }
        public Boolean call(T geometry)
                throws Exception
        {
            if (leftCoveredByRight) {
                return match(geometry, queryGeometry);
            }
            else {
                return match(queryGeometry, queryGeometry);
            }
        }
    }
    

    注意到call这个方法,里面又调用了match方法,它在父类JudgementBase定义有:

    public boolean match(Geometry spatialObject, Geometry queryWindow)
        {
            if (considerBoundaryIntersection) {
                if (queryWindow.intersects(spatialObject)) { return true; }
            }
            else {
                if (queryWindow.covers(spatialObject)) { return true; }
            }
            return false;
        }
    

    这里面,我们可以看到第4行和第7行均是利用了JTS来判断的,到这里,就一目了然了,实际上还是我们提供了match这个方法,利用Spark来计算。

    本文中,我们并没有涉及到索引,GeoSpark也将JTS的索引进行了封装,原理和上面讲的是一样的,我们下一篇文章中在进行分析。

    相关文章

      网友评论

          本文标题:(十二)Geospark源码解析(一)

          本文链接:https://www.haomeiwen.com/subject/jzvdrctx.html