这个有两种方法
1 使用zipWithUniqueId
获取id 并重建 DataFrame.
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate()
import spark.implicits._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, StructField, LongType}
val df =Seq(("a", -1.0), ("b", -2.0), ("c", -3.0)).toDF("foo", "bar")
// 获取df 的表头
val s = df.schema
// 将原表转换成带有rdd,
//再转换成带有id的rdd,
//再展开成Seq方便转化成 Dataframe
val rows = df.rdd.zipWithUniqueId.map{case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq)}
// 再由 row 根据原表头进行转换
val dfWithPK = spark.createDataFrame( rows, StructType(StructField("id", LongType, false) +: s.fields))
2 直接使用spark 自己的api,monotonicallyIncreasingId
这个id虽然是唯一的,但是不能从零开始,也不是顺序排列,可以简单理解为是随机产生的标识码
import org.apache.spark.sql.functions.monotonicallyIncreasingId
df.withColumn("id", monotonicallyIncreasingId).show()
// |foo| bar| id|
// | a|-1.0|17179869184|
// | b|-2.0|42949672960|
// | c|-3.0|60129542144|
网友评论