美文网首页
为 Delta 新增 Upsert(Merge)功能

为 Delta 新增 Upsert(Merge)功能

作者: 祝威廉 | 来源:发表于2019-06-10 16:13 被阅读0次

    前言

    今天花了一早上以及午休时间,终于把delta的Upsert功能做完了。加上上周周四做的Delta Compaction支持,我想要的功能基本就都有了。

    Delta的核心是DeltaLog,其实就是元数据管理。通过该套元数据管理,我们可以很容易的将Compaction,Update,Upsert,Delete等功能加上,因为本质上就是调用元数据管理API完成数据最后的提交。

    代码使用方式

    Upsert支持流式和批的方式进行更新。因为受限于Spark的SQL解析,大家可以使用Dataframe 或者 MLSQL的方式进行调用。

    批使用方式:

    val log = DeltaLog.forTable(spark, outputDir.getCanonicalPath)
    val upsertTableInDelta = UpsertTableInDelta(data, Option(SaveMode.Append), None, log,
                new DeltaOptions(Map[String, String](), df.sparkSession.sessionState.conf),
                Seq(),
                Map("idCols" -> "key,value"))
    val items = upsertTableInDelta.run(df.sparkSession)
    

    唯一需要大家指定的就是 idCols, 也就是你的表的唯一主键组合是啥。比如我这里是key,value两个字段组成唯一主键。

    流使用技巧是一模一样的,只需要做一点点修改:

     UpsertTableInDelta(data, None, Option(OutputMode.Append())
    

    UpsertTableInDelta 根据你设置的是SaveMode还是OutputMode来看是不是流写入。

    MLSQL 使用方式

    写入数据到Kafka:

    set abc='''
    { "x": 100, "y": 201, "z": 204 ,"dataType":"A group"}
    ''';
    load jsonStr.`abc` as table1;
    
    select to_json(struct(*)) as value from table1 as table2;
    save append table2 as kafka.`wow` where 
    kafka.bootstrap.servers="127.0.0.1:9092";
    
    

    使用流程序消费Kafka:

    -- the stream name, should be uniq.
    set streamName="kafkaStreamExample";
    
    !kafkaTool registerSchema 2 records from "127.0.0.1:9092" wow;
    
    -- convert table as stream source
    load kafka.`wow` options 
    kafka.bootstrap.servers="127.0.0.1:9092"
    and failOnDataLoss="false"
    as newkafkatable1;
    
    -- aggregation 
    select *  from newkafkatable1
    as table21;
    
    -- output the the result to console.
    save append table21  
    as rate.`/tmp/delta/wow-0` 
    options mode="Append"
    and idCols="x,y"
    and duration="5"
    and checkpointLocation="/tmp/s-cpl6";
    

    同样的,我们设置了idCols,指定x,y为唯一主键。

    然后查看对应的记录变化:

    load delta.`/tmp/delta/wow-0` as show_table1;
    select * from show_table1 where x=100 and z=204 as output;
    

    你会惊喜的发现数据可以更新了。

    实现剖析

    一共涉及到三个新文件:

    org.apache.spark.sql.delta.commands.UpsertTableInDelta
    org.apache.spark.sql.delta.sources.MLSQLDeltaDataSource
    org.apache.spark.sql.delta.sources.MLSQLDeltaSink
    

    对应源码参看我fork的delta项目: mlsql-delta

    第一个文件是实现核心的更新逻辑。第二个第三个支持Spark的datasource API来进行批和流的写入。

    这篇文章我们主要介绍UpsertTableInDelta。

    case class UpsertTableInDelta(_data: Dataset[_],
                                  saveMode: Option[SaveMode],
                                  outputMode: Option[OutputMode],
                                  deltaLog: DeltaLog,
                                  options: DeltaOptions,
                                  partitionColumns: Seq[String],
                                  configuration: Map[String, String]
                                 ) extends RunnableCommand
      with ImplicitMetadataOperation
      with DeltaCommand with DeltaCommandsFun {
    

    UpsertTableInDelta 集成了delta一些必要的基础类,ImplicitMetadataOperation,DeltaCommand,主要是为了方便得到一些操作日志写入的方法。

    saveMode 和 outputMode 主要是为了方便区分现在是流在写,还是批在写,以及写的模式是什么。

    assert(configuration.contains(UpsertTableInDelta.ID_COLS), "idCols is required ")
    
        if (outputMode.isDefined) {
          assert(outputMode.get == OutputMode.Append(), "append is required ")
        }
    
        if (saveMode.isDefined) {
          assert(saveMode.get == SaveMode.Append, "append is required ")
        }
    

    限制条件是必须都是用Append模式,并且idCols是必须存在的。

    saveMode match {
          case Some(mode) =>
            deltaLog.withNewTransaction { txn =>
              actions = upsert(txn, sparkSession)
              val operation = DeltaOperations.Write(SaveMode.Overwrite,
                Option(partitionColumns),
                options.replaceWhere)
              txn.commit(actions, operation)
            }
          case None => outputMode match {
    

    如果是批写入,那么直接调用deltaLog开启一个新的事物,然后进行upsert操作。同时进行commit,然后就搞定了。

    如果是流写入则麻烦一点,

    case None => outputMode match {
            case Some(mode) =>
              val queryId = sparkSession.sparkContext.getLocalProperty(StreamExecution.QUERY_ID_KEY)
              assert(queryId != null)
    
              if (SchemaUtils.typeExistsRecursively(_data.schema)(_.isInstanceOf[NullType])) {
                throw DeltaErrors.streamWriteNullTypeException
              }
    
              val txn = deltaLog.startTransaction()
              // Streaming sinks can't blindly overwrite schema.
              // See Schema Management design doc for details
              updateMetadata(
                txn,
                _data,
                partitionColumns,
                configuration = Map.empty,
                false)
    
              val currentVersion = txn.txnVersion(queryId)
              val batchId = configuration(UpsertTableInDelta.BATCH_ID).toLong
              if (currentVersion >= batchId) {
                logInfo(s"Skipping already complete epoch $batchId, in query $queryId")
              } else {
                actions = upsert(txn, sparkSession)
                val setTxn = SetTransaction(queryId,
                  batchId, Some(deltaLog.clock.getTimeMillis())) :: Nil
                val info = DeltaOperations.StreamingUpdate(outputMode.get, queryId, batchId)
                txn.commit(setTxn ++ actions, info)
              }
          }
        }
    

    首选我们获取queryId,因为在delta里需要使用queryId获取事务ID(batchId),并且最后写完成之后的会额外写入一些数据到元数据里,也需要queryId。

    updateMetadata 主要是为了检测schema信息,譬如如果stream 是complte模式,那么是直接覆盖的,而如果是其他模式,则需要做schema合并。

    如果我们发现当前事务ID>batchId,说明这个已经运行过了,跳过。如果没有,则使用upsert进行实际的操作。最后设置一些额外的信息提交。

    upsert 方法

    upsert的基本逻辑是:

    1. 获取idCols是不是有分区字段,如果有,先根据分区字段过滤出所有的文件。
    2. 如果没有分区字段,则得到所有的文件
    3. 将这些文件转化为dataframe
    4. 和新写入的dataframe进行join操作,得到受影响的行(需要更新的行),然后得到这些行所在的文件。
    5. 获取这些文件里没有无需变更的记录,写成新文件。
    6. 删除这些文件
    7. 将新数据写成新文件

    4,5两个步骤需要对数据进行join,但是在Spark里静态表并不能直接join流表,所以我们需要将流表转化为静态表。

    def upsert(txn: OptimisticTransaction, sparkSession: SparkSession): Seq[Action] = {
    
        // if _data is stream dataframe, we should convert it to normal
        // dataframe and so we can join it later
        val data = if (_data.isStreaming) {
          class ConvertStreamDataFrame[T](encoder: ExpressionEncoder[T]) {
    
            def toBatch(data: Dataset[_]): Dataset[_] = {
              val resolvedEncoder = encoder.resolveAndBind(
                data.logicalPlan.output,
                data.sparkSession.sessionState.analyzer)
              val rdd = data.queryExecution.toRdd.map(resolvedEncoder.fromRow)(encoder.clsTag)
              val ds = data.sparkSession.createDataset(rdd)(encoder)
              ds
            }
          }
          new ConvertStreamDataFrame[Row](_data.asInstanceOf[Dataset[Row]].exprEnc).toBatch(_data)
        } else _data
    
    

    上述代码就是将流表转化为普通静态表。接着我们需要拿到主键字段里满足分区字段的字段,然后获取他们的min/max值

    val minMaxColumns = partitionColumnsInIdCols.flatMap { column =>
            Seq(F.lit(column), F.min(column).as(s"${column}_min"), F.max(F.max(s"${column}_max")))
          }.toArray
          val minxMaxKeyValues = data.select(minMaxColumns: _*).collect()
    

    最后得到过滤条件:

    // build our where statement
          val whereStatement = minxMaxKeyValues.map { row =>
            val column = row.getString(0)
            val minValue = row.get(1).toString
            val maxValue = row.get(2).toString
    
            if (isNumber(column)) {
              s"${column} >= ${minValue} and   ${maxValue} >= ${column}"
            } else {
              s"""${column} >= "${minValue}" and   "${maxValue}" >= ${column}"""
            }
          }
          logInfo(s"whereStatement: ${whereStatement.mkString(" and ")}")
          val predicates = parsePartitionPredicates(sparkSession, whereStatement.mkString(" and "))
          Some(predicates)
    

    现在可以得到所有相关的文件了:

    val filterFilesDataSet = partitionFilters match {
          case None =>
            snapshot.allFiles
          case Some(predicates) =>
            DeltaLog.filterFileList(
              metadata.partitionColumns, snapshot.allFiles.toDF(), predicates).as[AddFile]
        }
    

    将这些文件转化为dataframe,并且将里面的每条记录都带上所属文件的路径:

    // Again, we collect all files to driver,
        // this may impact performance and even make the driver OOM when
        // the number of files are very huge.
        // So please make sure you have configured the partition columns or make compaction frequently
    
        val filterFiles = filterFilesDataSet.collect
        val dataInTableWeShouldProcess = deltaLog.createDataFrame(snapshot, filterFiles, false)
    
        val dataInTableWeShouldProcessWithFileName = dataInTableWeShouldProcess.
          withColumn(UpsertTableInDelta.FILE_NAME, F.input_file_name())
    

    通过Join获取哪些文件里面的记录需要被更新:

    // get all files that are affected by the new data(update)
        val filesAreAffected = dataInTableWeShouldProcessWithFileName.join(data,
          usingColumns = idColsList,
          joinType = "inner").select(UpsertTableInDelta.FILE_NAME).
          distinct().collect().map(f => f.getString(0))
    val tmpFilePathSet = filesAreAffected.map(f => f.split("/").last).toSet
    
        val filesAreAffectedWithDeltaFormat = filterFiles.filter { file =>
          tmpFilePathSet.contains(file.path.split("/").last)
        }
    
        val deletedFiles = filesAreAffectedWithDeltaFormat.map(_.remove)
    

    将需要删除的文件里没有改变的记录单独拿出来写成新文件:

    // we should get  not changed records in affected files and write them back again
        val affectedRecords = deltaLog.createDataFrame(snapshot, filesAreAffectedWithDeltaFormat, false)
    
        val notChangedRecords = affectedRecords.join(data,
          usingColumns = idColsList, joinType = "leftanti").
          drop(F.col(UpsertTableInDelta.FILE_NAME))
    val notChangedRecordsNewFiles = txn.writeFiles(notChangedRecords, Some(options))
    

    最后将我们新增数据写入:

    val newFiles = txn.writeFiles(data, Some(options))
    

    因为第一次写入的时候,schema还没有形成,所以不能走upsert逻辑,而是需要直接写入,这里我偷懒,没有把逻辑写在UpsertTableInDelta里,而是写在了MLSQLDeltaSink里:

    override def addBatch(batchId: Long, data: DataFrame): Unit = {
        val metadata = deltaLog.snapshot.metadata
        val readVersion = deltaLog.snapshot.version
        val isInitial = readVersion < 0
        if (!isInitial && parameters.contains(UpsertTableInDelta.ID_COLS)) {
          UpsertTableInDelta(data, None, Option(outputMode), deltaLog,
            new DeltaOptions(Map[String, String](), sqlContext.sparkSession.sessionState.conf),
            Seq(),
            Map(UpsertTableInDelta.ID_COLS -> parameters(UpsertTableInDelta.ID_COLS),
              UpsertTableInDelta.BATCH_ID -> batchId.toString
            )).run(sqlContext.sparkSession)
    
        } else {
          super.addBatch(batchId, data)
        }
      }
    

    总结

    Delta 具备了数据的增删改查能力,同时流批共享,并发修改控制,加上小文件compaction功能,基本解决了我们之前在使用流计算遇到的大部分问题。后续持续优化delta的查询功能,相信前景无限。

    相关文章

      网友评论

          本文标题:为 Delta 新增 Upsert(Merge)功能

          本文链接:https://www.haomeiwen.com/subject/voczxctx.html