【spark系列10】spark logicalPlan Sta

【spark系列10】spark logicalPlan Sta

作者: 鸿乃江边鸟 | 来源:发表于2021-01-11 11:53 被阅读0次


    本文版本是spark 3.0.1


    逻辑阶段的统计信息,对于逻辑阶段的优化也是很重要的,比如broadcathashJoin,dynamic partitions pruning,本文分析一下spark 是怎么获取stastatics信息的

    trait LogicalPlanStats { self: LogicalPlan =>
       * Returns the estimated statistics for the current logical plan node. Under the hood, this
       * method caches the return value, which is computed based on the configuration passed in the
       * first time. If the configuration changes, the cache can be invalidated by calling
       * [[invalidateStatsCache()]].
      def stats: Statistics = statsCache.getOrElse {
        if (conf.cboEnabled) {
          statsCache = Option(BasicStatsPlanVisitor.visit(self))
        } else {
          statsCache = Option(SizeInBytesOnlyStatsPlanVisitor.visit(self))
      /** A cache for the estimated statistics, such that it will only be computed once. */
      protected var statsCache: Option[Statistics] = None
      /** Invalidates the stats cache. See [[stats]] for more information. */
      final def invalidateStatsCache(): Unit = {
        statsCache = None


    override def default(p: LogicalPlan): Statistics = p match {
        case p: LeafNode => p.computeStats()
        case _: LogicalPlan => Statistics(sizeInBytes = p.children.map(_.stats.sizeInBytes).product)


    • 对于v1版本的,拿hiveTableRelation举例:
    override def computeStats(): Statistics = {
       tableMeta.stats.map(_.toPlanStats(output, conf.cboEnabled || conf.planStatsEnabled))
         .getOrElse {
         throw new IllegalStateException("table stats must be specified.")


    • 对于v2版本的, 拿DataSourceV2Relation举例:
     override def computeStats(): Statistics = {
        if (Utils.isTesting) {
          // when testing, throw an exception if this computeStats method is called because stats should
          // not be accessed before pushing the projection and filters to create a scan. otherwise, the
          // stats are not accurate because they are based on a full table scan of all columns.
          throw new IllegalStateException(
            s"BUG: computeStats called before pushdown on DSv2 relation: $name")
        } else {
          // when not testing, return stats because bad stats are better than failing a query
          table.asReadable.newScanBuilder(options) match {
            case r: SupportsReportStatistics =>
              val statistics = r.estimateStatistics()
              DataSourceV2Relation.transformV2Stats(statistics, None, conf.defaultSizeInBytes)
            case _ =>
              Statistics(sizeInBytes = conf.defaultSizeInBytes)

    直接调用table.newScanBuilder.如果继承了SupportsReportStatistics,则调用该estimateStatistics方法,这里涉及到的Table SupportsRead SupportsReportStatistics 都是spark 3引入的新类,我们直接看ParquetScan,默认是继承FileScan的estimateStatistics方法:

    override def estimateStatistics(): Statistics = {
        new Statistics {
          override def sizeInBytes(): OptionalLong = {
            val compressionFactor = sparkSession.sessionState.conf.fileCompressionFactor
            val size = (compressionFactor * fileIndex.sizeInBytes).toLong
          override def numRows(): OptionalLong = OptionalLong.empty()

    其实可以看出v2版本的没有列统计信息,至少目前是没有,而v1版本的部分是有列统计信息的, 毕竟统计每一列的信息是耗时的.



        本文标题:【spark系列10】spark logicalPlan Sta
