官网(https://docs.delta.io/latest/quick-start.html)例子,如下所示:
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTable = DeltaTable.forPath("/tmp/delta-table")
deltaTable.update(
condition = expr("id % 2 == 0"),
set = Map("id" -> expr("id + 100"))
)
报错:
Caused by: java.lang.UnsupportedOperationException: UPDATE TABLE is not supported temporarily.
at org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:767)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:68)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
at scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187)
at scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:68)
at org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:330)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:94)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:133)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:133)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:94)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:87)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:107)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:133)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:133)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:107)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:100)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$writePlans$5(QueryExecution.scala:199)
at org.apache.spark.sql.catalyst.plans.QueryPlan$.append(QueryPlan.scala:381)
at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$writePlans(QueryExecution.scala:199)
at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:207)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:95)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:92)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:89)
at org.apache.spark.sql.delta.util.AnalysisHelper.toDataset(AnalysisHelper.scala:45)
at org.apache.spark.sql.delta.util.AnalysisHelper.toDataset$(AnalysisHelper.scala:44)
at io.delta.tables.DeltaTable.toDataset(DeltaTable.scala:42)
at io.delta.tables.execution.DeltaTableOperations.$anonfun$executeUpdate$1(DeltaTableOperations.scala:64)
at org.apache.spark.sql.delta.util.AnalysisHelper.improveUnsupportedOpError(AnalysisHelper.scala:60)
... 8 more
pom文件如下所示:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-core_2.12</artifactId>
<version>0.7.0</version>
</dependency>
定位到源码发现,如下所示,在SparkStrategies.scala中,我们可以看到Update和Merge暂时不支持。
(省略)...
case _: UpdateTable =>
throw new UnsupportedOperationException(s"UPDATE TABLE is not supported temporarily.")
case _: MergeIntoTable =>
throw new UnsupportedOperationException(s"MERGE INTO TABLE is not supported temporarily.")
网友评论