美文网首页
Spark中Cache操作的实现梳理

Spark中Cache操作的实现梳理

作者: 分裂四人组 | 来源:发表于2019-04-16 17:44 被阅读0次

    Spark中Cache语义的表达

    • SQL中支持Cache语句;
    • DataSet支持cache/persist方法;
    • Catalog支持cache表;

    1. SQL中支持cache查询语句

    通过SQL暴露用户Cache查询作为临时表,然后调用catalog.cacheTable()实现;
    具体可以参考CacheTableCommand方法实现;

    CACHE LAZY? TABLE tableIdentifier (AS? query)?                   #cacheTable
    UNCACHE TABLE (IF EXISTS)? tableIdentifier                       #uncacheTable
    CLEAR CACHE                                                      #clearCache
    

    CacheTableCommand 代码逻辑

    case class CacheTableCommand(
        tableIdent: TableIdentifier,
        plan: Option[LogicalPlan],
        isLazy: Boolean) extends RunnableCommand {
      require(plan.isEmpty || tableIdent.database.isEmpty,
        "Database name is not allowed in CACHE TABLE AS SELECT")
    
      override protected def innerChildren: Seq[QueryPlan[_]] = plan.toSeq
    
      override def run(sparkSession: SparkSession): Seq[Row] = {
        // 创建临时TempView table
        plan.foreach { logicalPlan =>
          Dataset.ofRows(sparkSession, logicalPlan).createTempView(tableIdent.quotedString)
        }
        // 调用Catalog接口实现cacheTable
        sparkSession.catalog.cacheTable(tableIdent.quotedString)
        
        // 针对非lazy表,基于count()方法手动触发执行,达到物化View的效果
        if (!isLazy) {
          // Performs eager caching
          sparkSession.table(tableIdent).count()
        }
    
        Seq.empty[Row]
      }
    }
    

    Cache功能的具体实现是在CacheManager中实现的,介绍一个主要的函数实现cacheQuery方法:

      /**
       * Caches the data produced by the logical representation of the given [[Dataset]].
       * Unlike `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because
       * recomputing the in-memory columnar representation of the underlying table is expensive.
       */
      def cacheQuery(
          query: Dataset[_],
          tableName: Option[String] = None,
          storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock {
        val planToCache = query.logicalPlan
        
        if (lookupCachedData(planToCache).nonEmpty) {
          // 如果plan已经存在在cache中,则忽略
          logWarning("Asked to cache already cached data.")
        } else {
          val sparkSession = query.sparkSession
          // 基于plan构造出InMemoryRelation:
          // 为何是InMemoryRelation?其实可以理解为将执行计划封装成一个可以保存在InMemory的结构,
          // 这样需要相同的plan时,就不需要执行,而是直接从该plan对应的InMemoryRelation获取就行;
          // 该代码并不会真实执行,只是逻辑层的实现,物理执行需要手动触发;
          val inMemoryRelation = InMemoryRelation(
            sparkSession.sessionState.conf.useCompression,
            sparkSession.sessionState.conf.columnBatchSize, storageLevel,
            sparkSession.sessionState.executePlan(planToCache).executedPlan,
            tableName,
            planToCache.stats)
          cachedData.add(CachedData(planToCache, inMemoryRelation))
        }
      }
    

    2. Catalog支持cache表

      /**
       * Caches the specified table with the given storage level.
       *
       * @param tableName is either a qualified or unqualified name that designates a table/view.
       *                  If no database identifier is provided, it refers to a temporary view or
       *                  a table/view in the current database.
       * @param storageLevel storage level to cache table.
       * @since 2.3.0
       */
      def cacheTable(tableName: String, storageLevel: StorageLevel): Unit
    

    3. DataSet支持cache/persist方法

      /**
       * Persist this Dataset with the default storage level (`MEMORY_AND_DISK`).
       *
       * @group basic
       * @since 1.6.0
       */
      def cache(): this.type = persist()
    
      /**
       * Persist this Dataset with the given storage level.
       * @param newLevel One of: `MEMORY_ONLY`, `MEMORY_AND_DISK`, `MEMORY_ONLY_SER`,
       *                 `MEMORY_AND_DISK_SER`, `DISK_ONLY`, `MEMORY_ONLY_2`,
       *                 `MEMORY_AND_DISK_2`, etc.
       *
       * @group basic
       * @since 1.6.0
       */
      def persist(newLevel: StorageLevel): this.type = {
        sparkSession.sharedState.cacheManager.cacheQuery(this, None, newLevel)
        this
      }
    

    相关文章

      网友评论

          本文标题:Spark中Cache操作的实现梳理

          本文链接:https://www.haomeiwen.com/subject/prhiwqtx.html