美文网首页GeotrellisGIS后端
How it works(21) Geotrellis是如何在S

How it works(21) Geotrellis是如何在S

作者: 默而识之者 | 来源:发表于2021-08-11 22:48 被阅读0次

    1. 引入

    在上一章我们已经讨论了数据读取的实现,了解了Geotrellis如何从本地将若干Geotiff文件读取为RDD[(ProjectedExtent, MultibandTile)]对象的.

    为了进行下一步的计算操作,我们需要将原始的ProjectedExtent索引转换为SpatialKey索引.具体步骤如下:

    // 1. 提取数据集元数据
    val (_, metadata) = geoTiffRDD.collectMetadata[SpatialKey](FloatingLayoutScheme())
    // 2. 转换索引
    val tiledRDD = geoTiffRDD.tileToLayout[SpatialKey](metadata, NearestNeighbor)
    

    主要分为两个步骤:

    1. RDD[(ProjectedExtent, MultibandTile)]数据集中提取元数据.
    2. 根据提取的元数据,将原本的ProjectedExtent索引转换为SpatialKey索引,重新切割原始数据集以与新的索引匹配.

    按顺序我们先从提取元数据开始.

    2. 提取数据集元数据

    提取数据集元数据的代码如下:

    val (_, metadata) = geoTiffRDD.collectMetadata[SpatialKey](FloatingLayoutScheme())
    

    这部分的主体是collectMetadata[T]方法,整理后代码如下:

    [代码位于spark/Implicit.scala]

    implicit class withCollectMetadataMethods[K1, V <: CellGrid[Int]](rdd: RDD[(K1, V)]) extends Serializable {
        def collectMetadata[K2: Boundable: SpatialComponent](layoutScheme: LayoutScheme)
            (implicit ev: K1 => TilerKeyMethods[K1, K2], ev1: GetComponent[K1, ProjectedExtent]): (Int, TileLayerMetadata[K2]) = {
          CollectTileLayerMetadata.fromRDD[K1, V, K2](rdd, layoutScheme)
        }
    }
    

    该方法位于隐式类中.也包含多个隐式参数.隐式限定与隐式参数的匹配逻辑如下:

    1. 隐式类withCollectMetadataMethods位于spark/Implicit.scala,通过import geotrellis.spark._导入
    2. geoTiffRDD调用collectMetadata方法,该方法并非RDD方法,而是属于withCollectMetadataMethods类的方法,Scala开始在上下文中寻找RDD向withCollectMetadataMethods类型转换是否可行
    3. geoTiffRDD的类型为RDD[(ProjectedExtent, MultibandTile)],即K1为ProjectedExtent,V为MultibandTile,是CellGrid[Int]的扩展,符合V <: CellGrid[Int]的上界定义,因此可以被转换
    4. collectMetadata[SpatialKey]方法通过泛型类标签表明需两个与SpatialKey相关的隐式参数:
      1. Boundable[SpatialKey]的隐式转换定义于SpatialKey.scala:
      implicit object Boundable extends Boundable[SpatialKey] {
          def minBound(a: SpatialKey, b: SpatialKey) = {
              SpatialKey(math.min(a.col, b.col), math.min(a.row, b.row))
          }
          def maxBound(a: SpatialKey, b: SpatialKey) = {
              SpatialKey(math.max(a.col, b.col), math.max(a.row, b.row))
          }
      }
      
      1. SpatialComponent[SpatialKey]Component[SpatialKey, SpatialKey]的隐式转换定义于utils/package.scala:
      implicit def identityComponent[T]: Component[T, T] = Component(v => v, (_, v) => v)
      
    5. collectMetadata[SpatialKey]方法通过隐式参数列表表明需要两个与ProjectedExtent相关的参数ev和ev1:
      1. ev即ProjectedExtent => TilerKeyMethods[ProjectedExtent, SpatialKey],定义于spark/Implicit.scala:
      implicit class withProjectedExtentTilerKeyMethods[K: Component[*, ProjectedExtent]](val self: K) extends TilerKeyMethods[K, SpatialKey] {
          def extent = self.getComponent[ProjectedExtent].extent
          def translate(spatialKey: SpatialKey) = spatialKey
      }
      
      1. ev1即GetComponent[ProjectedExtent, ProjectedExtent],与Component[SpatialKey, SpatialKey]一致.

    我们回到函数本身.这部分的主体是CollectTileLayerMetadata.fromRDD方法,整理后代码如下:

    [代码位于CollectTileLayerMetadata.scala]

    def fromRDD[
        K: GetComponent[*, ProjectedExtent]: * => TilerKeyMethods[K, K2],
        V <: CellGrid[Int],
        K2: SpatialComponent: Boundable
      ](rdd: RDD[(K, V)], scheme: LayoutScheme): (Int, TileLayerMetadata[K2]) = {
        // 1. 收集数据集元数据
        val (extent, cellType, cellSize, bounds, crs) = collectMetadataWithCRS(rdd)
        // 2. 根据指定的LayoutScheme计算LayoutDefinition
        val LayoutLevel(zoom, layout) = scheme.levelFor(extent, cellSize)
        // 3. 计算SpatialKey索引范围
        val kb = bounds.setSpatialBounds(KeyBounds(layout.mapTransform(extent)))
        // 4. 返回创建的元数据对象
        (zoom, TileLayerMetadata(cellType, layout, extent, crs, kb))
      }
    

    代码分为4个步骤:

    1. 从RDD数据集中收集归纳各项信息,构成数据集的元数据
    2. 根据收集到的元数据创建LayoutDefinition对象
    3. 计算在此LayoutDefinition下SpatialKey索引的范围
    4. 构建TileLayerMetadata对象并返回

    按顺序我们先从收集数据集信息开始.

    2.1 收集数据集元数据

    针对整个数据集收集元数据面向的是若干可能存在不同参数的影像,为了后面统一处理,因此需要归纳/计算出一套能兼容全部数据的元数据.

    收集数据集元数据的代码如下:

    val (extent, cellType, cellSize, bounds, crs) = collectMetadataWithCRS(rdd)
    

    这部分的主体是collectMetadataWithCRS方法,整理后代码如下:

    [代码位于CollectTileLayerMetadata.scala]

      private def collectMetadataWithCRS[
        K: GetComponent[*, ProjectedExtent]: * => TilerKeyMethods[K, K2], // 从隐式函数可以推断出K2的类型,因此调用的时候没有传递类型
        V <: CellGrid[Int],
        K2: SpatialComponent: Boundable // 辅助类型推断
      ](rdd: RDD[(K, V)]): (Extent, CellType, CellSize, KeyBounds[K2], CRS) = {
        val (extent, cellType, cellSize, crsSet, bounds) =
          rdd
          .map { case (key, grid) =>
            // 1. 计算每个瓦片的元数据
            // 提取每一个ProjectedExtend所代表的的范围和CRS
            // getComponent调用了隐式类withGetComponentMethods的方法
            val ProjectedExtent(extent, crs) = key.getComponent[ProjectedExtent]
            // 将每一个boundsKey设置为初始值(0,0),实际值在之后的步骤再设置
            // translate调用了隐式类withProjectedExtentTilerKeyMethods的方法
            val boundsKey = key.translate(SpatialKey(0,0))
            // 返回该瓦片的元数据,便于后面Reduce操作
            (extent, grid.cellType, CellSize(extent, grid.cols, grid.rows), Set(crs), KeyBounds(boundsKey, boundsKey))
          }
          .reduce { (tuple1, tuple2) =>
            // 2. 融合元数据
            val (extent1, cellType1, cellSize1, crs1, bounds1) = tuple1
            val (extent2, cellType2, cellSize2, crs2, bounds2) = tuple2
            (
              extent1.combine(extent2), // 合并空间范围
              cellType1.union(cellType2), // 融合数据类型
              if (cellSize1.resolution < cellSize2.resolution) cellSize1 else cellSize2, // 单元格大小
              crs1 ++ crs2, // 合并非重复CRS
              bounds1.combine(bounds2) // 融合空间范围(实际全部为(0,0))
              )
          }
        (extent, cellType, cellSize, bounds, crsSet.head)
      }
    

    代码分为两个步骤:

    1. 从原始的ProjectedExtentMultibandTile对象中提取所需的元数据,包括:
      1. 每个瓦片对应的范围和空间参考
      2. 每个瓦片自身的尺寸和类型
    2. 合并单个瓦片的元数据以描述整个数据集,遵循以下规则:
      1. 针对Extent对象,进行空间范围融合,最终形成包含全部瓦片范围的空间范围
      2. 针对数据类型,最终留下所占用空间最大的一种数据类型,虽然占用更多内存空间,但相比保留原始数据类型,计算更简便,也不会发生数据溢出的错误
      3. 单元格大小取最大的那一个
      4. 收集全部非重复CRS,如果有不止一种空间参考,则取第一个

    有了数据集的元数据,我们就可以进行下一个步骤,计算LayoutDefinition了.

    2.2 计算LayoutDefinition

    在这一步,我们用到了传入的LayoutScheme对象,整理后的等效代码如下:

    // 1. 创建LayoutScheme对象
    val scheme: LayoutScheme = FloatingLayoutScheme(256)
    // 2. 获取LayoutDefinition对象
    val LayoutLevel(zoom, layout) = scheme.levelFor(extent, cellSize)
    

    代码分为两个步骤:

    1. 创建一个FloatingLayoutScheme(256)对象作为LayoutScheme
    2. 基于该LayoutScheme计算整个数据集范围的LayoutDefinition

    按顺序我们先从创建LayoutScheme开始.

    LayoutScheme即瓦片的布局方案.在Geotrellis中,存在三种布局方案,分别为:

    1. FloatLayoutScheme:非金字塔方案,适用于不需要金字塔的场景
    2. ZoomedLayoutScheme:类TMS方案,有金字塔结构,且对瓦片进行坐标系转换
    3. LocalLayoutSchme:不考虑空间参考的金字塔方案

    我们再回到最原始的需求:我们需要得到一张原始数据集的NDVI结果Tiff影像,不需要以TMS瓦片的形式生成分级结果,因此我们选择了FloatingLayoutScheme作为布局方案.

    FloatingLayoutScheme类定义整理后代码如下:

    [代码位于FloatingLayoutScheme.scala]

    object FloatingLayoutScheme {
      def apply(tileSize: Int): FloatingLayoutScheme =
        apply(tileSize, tileSize)
    }
    
    class FloatingLayoutScheme(val tileCols: Int, val tileRows: Int) extends LayoutScheme {
      // FloatingLayoutScheme返回的Zoom永远为0,因为逻辑上它只有一层.
      def levelFor(extent: Extent, cellSize: CellSize) =
        0 -> LayoutDefinition(GridExtent[Long](extent, cellSize), tileCols, tileRows)
    }
    

    我们创建了一个描述256x256尺寸的FloatingLayoutScheme,即在该方案下,原始数据集将只具有1层原始层,该层中每个瓦片的大小为256x256像素.

    选择256像素的瓦片与512像素的瓦片有何不同?

    抛开某些需要照顾前端显示的因素,从spark计算上来讲,瓦片的尺寸影响计算的粒度:理论上来说,瓦片越小,越能将计算量更均匀的分配在不同分区上,但同时也意味着需要更多的上下文切换时间,最终哪个对总体时间影响更大,需要实际测试,但一般来说,256或512像素的方案都是合适的.

    然后我们再看一下如何获取LayoutDefinition对象.

    从FloatingLayoutScheme生成LayoutDefinition的等效代码如下:

    val LayoutLevel(zoom, layout) = 0 -> LayoutDefinition(GridExtent[Long](extent, cellSize), tileCols, tileRows)
    

    这部分的主体是LayoutDefinition类,整理后代码如下:

    [代码位于LayoutDefinition.scala]

    object LayoutDefinition {
      def apply[N: Integral](grid: GridExtent[N], tileCols: Int, tileRows: Int): LayoutDefinition = {
        // 1. 计算Layout
        val extent = grid.extent
        val cellSize = grid.cellSize
        val totalPixelWidth = extent.width / cellSize.width
        val totalPixelHeight = extent.height / cellSize.height
        val tileLayoutCols = (totalPixelWidth / tileCols).ceil.toInt
        val tileLayoutRows = (totalPixelHeight / tileRows).ceil.toInt
        val layout = TileLayout(tileLayoutCols, tileLayoutRows, tileCols, tileRows)
        // 重新计算Extent
        val layoutExtent = Extent(
          extent.xmin,
          extent.ymax - (layout.totalRows * cellSize.height),
          extent.xmin + layout.totalCols * cellSize.width,
          extent.ymax
        )
        // 2. 生成LayoutDefinition对象
        LayoutDefinition(layoutExtent, layout)
      }
    }
    

    按顺序我们先从计算Layout开始.

    在这一步,我们需要重新梳理一下各种概念:

    • Extent:具有实际地理意义的范围,可能是经纬度范围或者投影坐标范围描述的一块实际区域
      • cellSize:与Extent直接关联,描述一个像素代表了多少度或米.
    • Layout:不具有地理意义,描述的是一个瓦片矩阵:单个瓦片的尺寸,以及瓦片有多少行多少列

    我们再来看看计算Layout的实际步骤:

    1. 通过Extent除以cellSize,可以反算出一行或一列有多少个像素
    2. 用行或列的总像素数除以单个瓦片的尺寸,可以得到一行或一列理论上有多少个瓦片.因为不能保证整除,因此需要向上取整,保留一定的冗余.
    3. 因为存在瓦片冗余,因此需要按带有冗余的行列数重新计算Extent,该Extent理论上不小于原始范围
    4. 根据Extent和Layout创建LayoutDefinition对象.

    我们为何需要与地理无关的Layout?

    这个问题可以延伸为:为何我们要将ProjectedExtent索引转换为SpatialKey索引?

    答案是效率.

    ProjectedExtent索引描述的是实际的空间范围,但并没有逻辑意义上的紧密关联关系.SpatialKey索引只记录行列号,通过迭代操作即可快速的定位瓦片,无需经过复杂的空间计算,而Geotrellis正是使用大量迭代操作进行并行计算的,因此SpatialKEy更加适合.同时,在我们需要让其具有地理空间意义时,也能通过相关的参数反算回来.这种操作一般来说是低频操作,其性能消耗是可以容忍的.

    通过观察LayoutDefinition的定义可以发现,它包含了Extent,cellSize与Layout信息,因此,他就是从ProjectedExtent向SpatialKey转换的桥梁:

    // GridExtent包含了Extent,cellSize信息
    class GridExtent[@specialized(Int, Long) N: Integral](
      val extent: Extent,
      val cellwidth: Double,
      val cellheight: Double,
      val cols: N,
      val rows: N
    ) extends Grid[N] with Serializable {
      import GridExtent._
    
      def this(extent: Extent, cols: N, rows: N) =
        this(extent, (extent.width / cols.toDouble), (extent.height / rows.toDouble), cols, rows)
        
      def this(extent: Extent, cellSize: CellSize) =
        this(extent, cellSize.width, cellSize.height,
          cols = Integral[N].fromDouble(math.round(extent.width / cellSize.width)),
          rows = Integral[N].fromDouble(math.round(extent.height / cellSize.height)))
    }
    
    // LayoutDefinition继承了GridExtent的Extent和cellSize,也包含了TileLayout信息
    case class LayoutDefinition(override val extent: Extent, tileLayout: TileLayout) extends GridExtent[Long](extent, tileLayout.cellSize(extent)) {
      // ...
    }
    

    我们得到LayoutDefinition后,就可以进行下一步:计算索引范围.

    2.3 计算索引范围

    在收集数据集信息阶段,我们获取了一个KeyBounds(SpatialKey(0,0),SpatialKey(0,0))对象,作为布局的初始索引范围,表明该布局的索引范围从SpatialKey(0,0)开始.因此我们在获取到整个数据集的空间范围后,需要计算出对应的结束索引.整理后代码如下:

    // 1. 地理范围转换到瓦片行列号
    val tileBounds : TileBounds = layout.mapTransform(extent)
    // 2. 设置该索引范围
    val _bounds = KeyBounds(tileBounds)
    val kb = bounds.setSpatialBounds(_bounds)
    

    代码分为两个步骤:

    1. 计算出在该LayoutDefinition下,数据集所描述的空间范围在用SpatialKey描述的值.
    2. 将该值赋予我们前面得到的KeyBounds(SpatialKey(0,0),SpatialKey(0,0))对象,得到正确的值.

    按顺序我们先从地理范围转换到瓦片行列号开始.

    mapTransform定义在LayoutDefinition中:

    case class LayoutDefinition(override val extent: Extent, tileLayout: TileLayout) extends GridExtent[Long](extent, tileLayout.cellSize(extent)) {
        // 为何mapTransform要使用Lazy定义?
        // 可以观察到LayoutDefinition在定义中覆写了父类的extent,即定义了一个重名变量
        // 不使用Lazy的话传入MapKeyTransform的extent就是父类的extent
        lazy val mapTransform = MapKeyTransform(extent, tileLayout.layoutCols, tileLayout.layoutRows)
    }
    

    这部分的主体是MapKeyTransform类,整理后代码如下:

    [代码位于MapKeyTransform.scala]

    class MapKeyTransform(val extent: Extent, val layoutCols: Int, val layoutRows: Int) extends Serializable {
    
      def apply(x: Double, y: Double): SpatialKey = {
        val tcol =
          ((x - extent.xmin) / extent.width) * layoutCols
        val trow =
          ((extent.ymax - y) / extent.height) * layoutRows
        (tcol.floor.toInt, trow.floor.toInt)
      }
    
      // 我们实际调用的layout.mapTransform(extent)方法
      def apply(otherExtent: Extent): TileBounds = {
        // 因为extent和otherExtent实际是一样的,因此colMin和rowMin都是0
        val SpatialKey(colMin, rowMin) = apply(otherExtent.xmin, otherExtent.ymax)
        val colMax = {
          // 由于otherExtent与extent一致,因此
          // otherExtent.xmax - extent.xmin = extent.width
          // 即d=layoutCols
          val d = (otherExtent.xmax - extent.xmin) / (extent.width / layoutCols)
          // 计算GridBounds的规律是保留西北边界,抛弃东南边界
          if(d == math.floor(d) && d != colMin) { d.toInt - 1 }
          else { d.toInt }
        }
    
        val rowMax = {
          val d = (extent.ymax - otherExtent.ymin) / (extent.height / layoutRows)
          if(d == math.floor(d) && d != rowMin) { d.toInt - 1 }
          else { d.toInt }
        }
    
        GridBounds(colMin, rowMin, colMax, rowMax)
      }
    }
    

    得到了结束索引,我们就可以将其赋值给KeyBounds,得到一个索引范围了:

    // 将TileBoudns对象转换为KeyBounds对象
    val _bounds = KeyBounds(tileBounds)
    // 合并索引范围
    val kb = bounds.setSpatialBounds(_bounds)
    

    这部分的主体是KeyBounds类,整理后代码如下:

    [代码位于KeyBounds.scala]

    object KeyBounds {
        def apply(gridBounds: TileBounds): KeyBounds[SpatialKey] =
            KeyBounds(SpatialKey(gridBounds.colMin, gridBounds.rowMin), SpatialKey(gridBounds.colMax, gridBounds.rowMax))
    }
    
    case class KeyBounds[+K](
      minKey: K,
      maxKey: K
    ) extends Bounds[K] {
        // 直接替换为新的TileBounds
        def setSpatialBounds[B >: K](other: KeyBounds[SpatialKey])(implicit ev: SpatialComponent[B]): KeyBounds[B] =
            KeyBounds((minKey: B).setComponent(other.minKey), (maxKey: B).setComponent(other.maxKey))
    }
    

    在这里出现了一个语法糖setComponent,可以直接让对象原地替换.它是如何实现的呢?

    这部分的主体是若干隐式转换,整理后代码如下:

    [代码位于utils/package.scala]

    implicit def setIdentityComponent[T, C <: T]: SetComponent[T, C] =
        SetComponent((_, v) => v)
    
    implicit class withSetComponentMethods[T](val self: T) extends MethodExtensions[T] {
        def setComponent[C](value: C)(implicit component: SetComponent[T, C]): T =
          component.set(self, value)
    }
    

    [代码位于utils/Setcomponent.scala]

    trait SetComponent[T, C] extends Serializable {
      def set: (T, C) => T
    }
    
    object SetComponent {
      def apply[T, C](_set: (T, C) => T): SetComponent[T, C] =
        new SetComponent[T, C] {
          val set = _set
        }
    }
    

    由代码可知,调用SpatialKey.setComponent时,Scala发现该方法存在于withSetComponentMethods类中,如果存在一个SetComponent[SpatialKey,SpatialKey]类型的对象,即可匹配成果,而该对象也可以通过隐式转换方法setIdentityComponent[SpatialKey,SpatialKey]得到,因此可以完成匹配.

    SetComponent[T,C]实现了一个set方法,可以按照给定的方法,将T类型的旧对象转换为C类型的新对象.在本例中,就是直接将SpatialKey类型的旧对象self抛弃,替换为SpatialKey类型的新对象other.minKey

    至此,所有所需的变量都准备就绪.

    3. 转换RDD索引

    在上一步我们提取元数据,因此可以将ProjectedExtent索引转换为SpatialKey索引了:

    val tiledRDD = geoTiffRDD.tileToLayout[SpatialKey](metadata, NearestNeighbor)
    

    这部分的主体是tileToLayout方法,其中含有几个隐式转换.整理后代码如下:

    [代码位于merge/Implicit.scala]

    // 类定义中要求存在隐式转换* => TileMergeMethods[MultibandTile]
    implicit class withMultibandMergeMethods(val self: MultibandTile) extends MultibandTileMergeMethods
    

    [代码位于MultibandTileMergeMethods.scala]

    trait MultibandTileMergeMethods extends TileMergeMethods[MultibandTile] {
        // ...
    }
    

    [代码位于Tiler.scala]

    // 将NearestNeighbor对象转换为Option类型对象
    implicit def methodToOptions(method: ResampleMethod): Options =
          Options(resampleMethod = method)
    

    [代码位于TilerMethod.scala]

    class TilerMethods[K, V <: CellGrid[Int]: ClassTag: * => TileMergeMethods[V]: * => TilePrototypeMethods[V]](val self: RDD[(K, V)]) extends MethodExtensions[RDD[(K, V)]] {
        def tileToLayout[K2: SpatialComponent: ClassTag](tileLayerMetadata: TileLayerMetadata[K2], options: Options)
              (implicit ev: K => TilerKeyMethods[K, K2]): RDD[(K2, V)] with Metadata[TileLayerMetadata[K2]] = {
              // 1. 将RDD按照LayoutDefinition重新分割
              val _rdd = CutTiles[K, K2, V](self, tileLayerMetadata.cellType, tileLayerMetadata.layout, options.resampleMethod)
              // 2. 合并冗余
              val _mergedRdd =  _rdd.merge(options.partitioner)
              // 3. 包装RDD
              ContextRDD(_rdd, tileLayerMetadata)
        }
    }
    

    代码分为3个步骤:

    1. 将RDD从原来ProjectedExtent按照LayoutDefinition重新分割
    2. 合并冗余
    3. 包装RDD,为下一步的操作做准备

    按顺序我们先从将RDD按照LayoutDefinition重新分割开始.

    3.1 将RDD按照LayoutDefinition重新分割

    因为ProjectedExtent与SpatialKey往往不是完全对应的,因此一个ProjectedExtent索引可能转换为多个SpatialKey索引,我们需要重新切割按ProjectedExtent索引定义的数据块,使之与新的SpatialKey对应.

    这部分的主体是CutTiles方法,整理后代码如下:

    [代码位于CutTiles.scala]

    object CutTiles {
    
      def apply[
        K1: * => TilerKeyMethods[K1, K2],
        K2: SpatialComponent: ClassTag,
        V <: CellGrid[Int]: ClassTag: * => TileMergeMethods[V]: * => TilePrototypeMethods[V]
      ] (
        rdd: RDD[(K1, V)],
        cellType: CellType,
        layoutDefinition: LayoutDefinition,
        resampleMethod: ResampleMethod = NearestNeighbor
      ): RDD[(K2, V)] = {
        val mapTransform = layoutDefinition.mapTransform
        
        // 这里我们设置的tileCols/Rows都是256
        val Dimensions(tileCols, tileRows) = layoutDefinition.tileLayout.tileDimensions
    
        rdd
            // 1. 将RDD展开
          .flatMap { tup =>
            // 2. 得到ProjectedExtent和MultibandTile
            val (inKey, tile) = tup
            // 3. 得到单个瓦片的空间范围
            val extent = inKey.extent
            // 4. 将该空间范围转为行列号范围
            mapTransform(extent)
              // 5. 遍历Extent对应对应的全部行列号
              .coordsIter
              .map  { spatialComponent => // (col,row) 通过隐式转换为SpatialKey
                // 6. ProjectedExtent索引转换为SpatialKey索引
                val outKey = inKey.translate(spatialComponent)
                // 7. 以给定的原型参数创建一个新ArrayMultibandTile对象,保留原本tile的波段数,没有数据
                val newTile = tile.prototype(cellType, tileCols, tileRows)
                // 8. 融合每个波段的数据
                // merge方法定义于MultibandTileMergeMethods.merge
                (outKey, newTile.merge(
                   mapTransform.keyToExtent(outKey.getComponent[SpatialKey]), // 再反算回这个SpatialKey所对应的extent,理论上不会比extent大
                   extent,
                   tile, // 原始Tile,有数据
                   resampleMethod
                 ))
              }
          }
      }
    }
    

    [代码位于MultibandTileMergeMethods.scala]

    trait MultibandTileMergeMethods extends TileMergeMethods[MultibandTile] {
        // 融合每个波段数据
        def merge(extent: Extent, otherExtent: Extent, other: MultibandTile, method: ResampleMethod): MultibandTile = {
            val bands: Seq[Tile] =
              for {
                // 1. 遍历每一个波段
                bandIndex <- 0 until self.bandCount
              } yield {
                // 2. 获取新Tile的band和原始Tile的对应band
                val thisBand = self.band(bandIndex)
                val thatBand = other.band(bandIndex)
                // 3. 按照给定的方式融合每个波段指定区域的数据
                thisBand.merge(extent, otherExtent, thatBand, method)
              }
            // 4. 返回ArrayMultibandTile
            ArrayMultibandTile(bands)
        }
    }
    

    [代码位于SinglebandTileMergeMethods.scala]

    def merge(extent: Extent, otherExtent: Extent, other: Tile, method: ResampleMethod): Tile =
    otherExtent & extent match {
      // sharedExtent是两个区域交集区域
      case Some(sharedExtent) =>
        val mutableTile = self.mutable
        // 1. 创建新Tile的RasterExtent对象,用于空间范围与行列号转换
        // cols/rows都是256
        val re = RasterExtent(extent, self.cols, self.rows)
        // 计算新的cellsize,用于重采样.但在我们使用的NearestNeighbor方法中没有使用
        val gridBounds = re.gridBoundsFor(sharedExtent)
        val targetCS = CellSize(sharedExtent, gridBounds.width, gridBounds.height)
    
        self.cellType match {
          case BitCellType | ByteCellType | UByteCellType | ShortCellType | UShortCellType | IntCellType  =>
            // 2. 定义重采样函数
            val interpolate: (Double, Double) => Int = Resample(method, other, otherExtent, targetCS).resample _
            // 3. 遍历新Tile的每一个像素
            cfor(0)(_ < self.rows, _ + 1) { row =>
              cfor(0)(_ < self.cols, _ + 1) { col =>
                // 如果新Tile的对应位置为0则赋值,因为新Tile不含数据,因此总是赋值
                if (self.get(col, row) == 0) {
                  //4. 反算新Tile的该位置在原始Tile上的对应位置
                  val (x, y) = re.gridToMap(col, row)
                  // 5. 获取并赋值
                  val v = interpolate(x, y)
                  if(isData(v)) {
                    mutableTile.set(col, row, v)
                  }
                }
              }
            }
            // ...省略其他类型,处理方法类似
        }
        mutableTile
      case _ =>
        self
    }
    

    代码分为如下两个主要步骤:

    1. 将原有的ProjectedExtent索引拆成若干SpatialKey索引.
    2. 针对每个SpatialKey索引,将其对应的空间范围内的每个波段的数据拷贝到一个按LayoutSchema定义的新瓦片上

    至此,我们已经完成了从ProjectedExtent索引向SpatialKey索引及其对应数据的转换.但转换过程中,由于ProjectedExtent与SpatialKey是一对多的关系,因此造成了多个相同SpatialKey描述原有一个ProjectedExtent不同位置的情况,需要通过合并数据来减少冗余.

    3.2 合并冗余

    合并冗余的代码如下:

    // options.partitioner为默认值None
    val _mergedRdd =  _rdd.merge(options.partitioner)
    

    这部分的主体是merge方法,该方法定义于隐式转换中.整理后代码如下:

    [代码位于merge/Implicit.scala]

    // 隐式类,为RDD对象赋予merge方法
    trait Implicits {
      implicit class withTileRDDMergeMethods[K: ClassTag, V <: CellGrid[Int]: ClassTag: * => TileMergeMethods[V]](self: RDD[(K, V)])
        extends TileRDDMergeMethods[K, V](self)
    }
    

    [代码位于TileRDDMergeMethods.scala]

    class TileRDDMergeMethods[K: ClassTag, V: ClassTag: * => TileMergeMethods[V]](val self: RDD[(K, V)]) extends MethodExtensions[RDD[(K, V)]] {
      def merge(partitioner: Option[Partitioner]): RDD[(K, V)] =
        TileRDDMerge(self, partitioner)
    }
    

    [代码位于TileRDDMerge.scala]

    object TileRDDMerge {
      def apply[K: ClassTag, V: ClassTag: * => TileMergeMethods[V]](rdd: RDD[(K, V)], partitioner: Option[Partitioner]): RDD[(K, V)] = {
        // 实际调用了这个方法
        rdd.reduceByKey(_ merge _)
      }
    }
    

    由代码可知,最终针对结果rdd调用了reduceByKey方法,对持有相同SpatialKey键的对象进行融合.融合的方法是隐式方法:

    [代码位于MultibandTileMergeMethods.scala]

    trait MultibandTileMergeMethods extends TileMergeMethods[MultibandTile] {
      def merge(other: T): T = merge(other, 0, 0)
      def merge(other: MultibandTile, baseCol: Int, baseRow: Int): MultibandTile = {
        val bands: Seq[Tile] =
          for {
            bandIndex <- 0 until self.bandCount
          } yield {
            val thisBand = self.band(bandIndex)
            val thatBand = other.band(bandIndex)
            // 调用了SinglebandTileMergeMethods的merge方法
            thisBand.merge(thatBand, baseCol, baseRow)
          }
        ArrayMultibandTile(bands)
      }
    }
    

    [代码位于SinglebandTileMergeMethods.scala]

    trait SinglebandTileMergeMethods extends TileMergeMethods[Tile] {
      // 针对整个瓦片,不再限制区域
      def merge(other: Tile, baseCol: Int, baseRow: Int): Tile = {
        val mutableTile = self.mutable
        self.cellType match {
          case ByteCellType | UByteCellType | ShortCellType | UShortCellType | IntCellType  =>
            cfor(0)(_ < other.rows, _ + 1) { row =>
              cfor(0)(_ < other.cols, _ + 1) { col =>
                if (self.get(col + baseCol, row + baseRow) == 0) {
                  mutableTile.set(col + baseCol, row + baseRow, other.get(col, row))
                }
              }
            }
            // 省略其他类型
        }
    }
    

    经过合并,整个数据集完成从ProjectedExtent索引到SpatialKey索引的完美转换.

    3.3 包装RDD

    通过包装,为原本的RDD附着了元数据属性,方便后面的计算操作.包装RDD的代码如下:

    ContextRDD(_rdd, tileLayerMetadata)
    

    这部分的主体是ContextRDD方法,整理后代码如下:

    [代码位于ContextRDD.scala]

    object ContextRDD {
      def apply[K, V, M](rdd: RDD[(K, V)], metadata: M): RDD[(K, V)] with Metadata[M] =
        new ContextRDD(rdd, metadata)
    }
    
    class ContextRDD[K, V, M](val rdd: RDD[(K, V)], val metadata: M) extends RDD[(K, V)](rdd) with Metadata[M] {
        // ...
    }
    

    为了方便使用,Geotrellis将具有元数据的RDD对象定义为一种新的类型:

    [代码位于spark/package.scala]

    type MultibandTileLayerRDD[K] = RDD[(K, MultibandTile)] with Metadata[TileLayerMetadata[K]]
    

    至此,转换索引就结束了,我们得到了一个MultibandTileLayerRDD[SpatialKey]对象.

    4. 总结

    在本篇文章中,我们剖析了Geotrellis是如何将具有原始ProjectedExtent索引的数据转换为更便于迭代计算的SpatialKey索引的数据.主要分为两个步骤:

    1. 计算元数据,为重新分割原始数据做准备
    2. 正式分割数据,将数据按SpatialKey重新整理.

    SpatialKey与TemporalSpaceKey是Geotrellis中最常用的两种索引,本质上都是将连续的地理空间范围转换为便于迭代计算的整数序列,为后面的实际计算做好准备.

    相关文章

      网友评论

        本文标题:How it works(21) Geotrellis是如何在S

        本文链接:https://www.haomeiwen.com/subject/vpqkbltx.html