1.版本要求
Scala:2.12 (不支持2.13)
JVM for Scala: 11.x
2.Setting Up IntelliJ IDEA CE for Snowpark Scala
(1) 创建项目
image.png
(2) 在SBT文件中加入如下依赖
libraryDependencies += "com.snowflake" % "snowpark" % "1.8.0"
(3) 编写如下代码并运行
import com.snowflake.snowpark._
import com.snowflake.snowpark.functions._
object Main {
def main(args: Array[String]): Unit = {
val configs = Map(
"URL" -> "freewheelmediadev.us-east-1.snowflakecomputing.com",
"USER" -> "pgao@freewheel.tv",
"PASSWORD" -> "Gp5859561@",
"ROLE" -> "USER_PGAO_ROLE",
"WAREHOUSE" -> "WH_USER_PGAO_XS",
"DB" -> "DEMO_DB",
"SCHEMA" -> "LAKE_HOUSE_HUDI"
)
val session = Session.builder.configs(configs).create
session.sql("show tables").show()
}
}
3.Working with DataFrames
(1) 建立数据表
CREATE OR REPLACE TABLE sample_product_data (id INT, parent_id INT, category_id INT, name VARCHAR, serial_number VARCHAR, key INT, "3rd" INT);
INSERT INTO sample_product_data VALUES
(1, 0, 5, 'Product 1', 'prod-1', 1, 10),
(2, 1, 5, 'Product 1A', 'prod-1-A', 1, 20),
(3, 1, 5, 'Product 1B', 'prod-1-B', 1, 30),
(4, 0, 10, 'Product 2', 'prod-2', 2, 40),
(5, 4, 10, 'Product 2A', 'prod-2-A', 2, 50),
(6, 4, 10, 'Product 2B', 'prod-2-B', 2, 60),
(7, 0, 20, 'Product 3', 'prod-3', 3, 70),
(8, 7, 20, 'Product 3A', 'prod-3-A', 3, 80),
(9, 7, 20, 'Product 3B', 'prod-3-B', 3, 90),
(10, 0, 50, 'Product 4', 'prod-4', 4, 100),
(11, 10, 50, 'Product 4A', 'prod-4-A', 4, 100),
(12, 10, 50, 'Product 4B', 'prod-4-B', 4, 100);
(2) 构建Dataframe
- 要从表、视图或流中的数据创建一个DataFrame,可以调用table方法:
// Create a DataFrame from the data in the "sample_product_data" table.
val dfTable = session.table("sample_product_data")
// To print out the first 10 rows, call:
// dfTable.show()
session.table方法返回一个Updatable对象。Updatable继承自DataFrame,并提供了额外的方法来处理表中的数据(例如,用于更新和删除数据的方法)。
- 要从值的序列创建一个DataFrame,可以调用createDataFrame方法:
// Create a DataFrame containing a sequence of values.
// In the DataFrame, name the columns "i" and "s".
val dfSeq = session.createDataFrame(Seq((1, "one"), (2, "two"))).toDF("i", "s")
- 要创建一个包含一定范围值的DataFrame,可以调用range方法:
// Create a DataFrame from a range
val dfRange = session.range(1, 10, 2)
- 要为Stage中的文件创建一个DataFrame,可以调用read方法来获取一个DataFrameReader对象。在DataFrameReader对象中,调用与文件中数据格式相对应的方法:
// Create a DataFrame from data in a stage.
val dfJson = session.read.json("@mystage2/data1.json")
- 要创建一个用于保存 SQL 查询结果的 DataFrame,可以调用 sql 方法:
// Create a DataFrame from a SQL query
val dfSql = session.sql("SELECT name from products")
(3) 对Dataframe进行transformation
- 要指定应返回哪些行,请调用 filter 方法:
// Import the col function from the functions object.
import com.snowflake.snowpark.functions._
// Create a DataFrame for the rows with the ID 1
// in the "sample_product_data" table.
//
// This example uses the === operator of the Column object to perform an
// equality check.
val df = session.table("sample_product_data").filter(col("id") === 1)
df.show()
- 要指定应选择哪些列,请调用 select 方法:
// Import the col function from the functions object.
import com.snowflake.snowpark.functions._
// Create a DataFrame that contains the id, name, and serial_number
// columns in te "sample_product_data" table.
val df = session.table("sample_product_data").select(col("id"), col("name"), col("serial_number"))
df.show()
每个方法返回一个经过转换的新的 DataFrame 对象(不会影响原始的 DataFrame 对象)。这意味着如果你想应用多个转换操作,你可以链式调用方法,将每个后续的转换方法调用应用在前一个方法返回的新 DataFrame 对象上。
请注意,这些transformation方法并不从Snowflake数据库中检索数据。(在执行DataFrame的action方法时,会执行数据检索。)transformation方法仅仅指定了如何构造SQL语句。
- 处理不同Dataframe中的相同Column
当在两个具有相同列名的不同 DataFrame 对象中引用列时(例如,在该列上进行连接操作),您可以在一个 DataFrame 对象中使用 DataFrame.col 方法来引用该对象中的列(例如,df1.col("name") 和 df2.col("name"))。
// Create a DataFrame that joins two other DataFrames (dfLhs and dfRhs).
// Use the DataFrame.col method to refer to the columns used in the join.
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("key") === dfRhs.col("key")).select(dfLhs.col("value").as("L"), dfRhs.col("value").as("R"))
- 将列对象转换为特定类型
// Import for the lit function.
import com.snowflake.snowpark.functions._
// Import for the DecimalType class..
import com.snowflake.snowpark.types._
val decimalValue = lit(0.05).cast(new DecimalType(5,2))
- 链式调用方法
val dfProductInfo = session.table("sample_product_data").filter(col("id") === 1).select(col("name"), col("serial_number"))
dfProductInfo.show()
- 获取列定义
要检索DataFrame数据集中列的定义,请调用schema方法。此方法返回一个StructType对象,其中包含StructField对象的数组。每个StructField对象包含列的定义。
// Get the StructType object that describes the columns in the
// underlying rowset.
val tableSchema = session.table("sample_product_data").schema
println("Schema for sample_product_data: " + tableSchema);
4.Dataframe Join
(1) 示例数据
create or replace table sample_a (
id_a integer,
name_a varchar,
value integer
);
insert into sample_a (id_a, name_a, value) values
(10, 'A1', 5),
(40, 'A2', 10),
(80, 'A3', 15),
(90, 'A4', 20)
;
create or replace table sample_b (
id_b integer,
name_b varchar,
id_a integer,
value integer
);
insert into sample_b (id_b, name_b, id_a, value) values
(4000, 'B1', 40, 10),
(4001, 'B2', 10, 5),
(9000, 'B3', 80, 15),
(9099, 'B4', null, 200)
;
create or replace table sample_c (
id_c integer,
name_c varchar,
id_a integer,
id_b integer
);
insert into sample_c (id_c, name_c, id_a, id_b) values
(1012, 'C1', 10, null),
(1040, 'C2', 40, 4000),
(1041, 'C3', 40, 4001)
;
(2)指定要Join的Column
val dfLhs = session.table("sample_a")
val dfRhs = session.table("sample_b")
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("id_a") === dfRhs.col("id_a"))
val dfSelected = dfJoined.select(dfLhs.col("value").as("LeftValue"), dfRhs.col("value").as("RightValue"))
dfSelected.show()
(3) 指定Join类型
Join类型有以下几种:
// Create a DataFrame that performs a left outer join on
// "sample_a" and "sample_b" on the column named "id_a".
val dfLhs = session.table("sample_a")
val dfRhs = session.table("sample_b")
val dfLeftOuterJoin = dfLhs.join(dfRhs, dfLhs.col("id_a") === dfRhs.col("id_a"), "left")
dfLeftOuterJoin.show()
5.对DataFrame进行Action操作
如前所述,DataFrame是惰性执行的,这意味着直到执行某个操作之前,SQL语句不会被发送到服务器执行。
执行同步Action操作
image.pngfor example:
// Create a DataFrame for the "sample_product_data" table.
val dfProducts = session.table("sample_product_data")
// Send the query to the server for execution and
// print the count of rows in the table.
println("Rows returned: " + dfProducts.count())
执行异步Action操作
要异步执行操作,调用async方法返回一个"async actor"对象(例如DataFrameAsyncActor),然后在该对象中调用异步操作方法。
这些异步 actor 对象的操作方法会返回一个 TypedAsyncJob 对象,你可以使用该对象来检查异步操作的状态并获取操作的结果。
从返回的 TypedAsyncJob 对象中,你可以进行以下操作:
- 要确定操作是否已完成,调用 isDone 方法。
- 要获取与操作对应的查询 ID,请调用 getQueryId 方法。
- 要返回操作的结果(例如 collect 方法的 Row 对象数组或 count 方法的行数),请调用 getResult 方法。请注意,getResult 是一个阻塞调用。
- 要取消操作,请调用 cancel 方法。
example:
// Create a DataFrame with the "id" and "name" columns from the "sample_product_data" table.
// This does not execute the query.
val df = session.table("sample_product_data").select(col("id"), col("name"))
// Execute the query asynchronously.
// This call does not block.
val asyncJob = df.async.collect()
// Check if the query has completed execution.
println(s"Is query ${asyncJob.getQueryId()} done? ${asyncJob.isDone()}")
// Get an Array of Rows containing the results, and print the results.
// Note that getResult is a blocking call.
val results = asyncJob.getResult()
results.foreach(println)
指定最长等待时间
// Wait a maximum of 10 seconds for the query to complete before retrieving the results.
val results = asyncJob.getResult(10)
通过ID获取异步query
val asyncJob = session.createAsyncJob(myQueryId)
// Check if the query has completed execution.
println(s"Is query ${asyncJob.getQueryId()} done? ${asyncJob.isDone()}")
// If you need to retrieve the results, call getRows to return an Array of Rows containing the results.
// Note that getRows is a blocking call.
val rows = asyncJob.getRows()
rows.foreach(println)
6.在Table中Updating/Deleting/Merging Rows
当你调用 Session.table 来为一个表创建一个 DataFrame 对象时,该方法会返回一个 Updatable 对象,它扩展了 DataFrame,并提供了额外的方法用于更新和删除表中的数据。
如果你需要更新或删除表中的行,可以使用 Updatable 类的以下方法:
- 调用 update 来更新表中的现有行。
- 调用 delete 来从表中删除行。
- 调用 merge 来根据第二个表或子查询中的数据,向一个表中插入、更新和删除行(这相当于 SQL 中的 MERGE 命令)。
Updating rows
对于 update 方法,传入一个 Map,将要更新的列与对应的值关联起来。update 方法会返回一个 UpdateResult 对象,其中包含被更新的行数。
update 是一个Action方法,这意味着调用该方法会发送 SQL 语句到服务器执行。
val updatableDf = session.table("sample_product_data")
val updateResult = updatableDf.update(Map("count" -> lit(1)))
println(s"Number of rows updated: ${updateResult.rowsUpdated}")
Deleting rows
对于 delete 方法,你可以指定一个条件来标识要删除的行,并且可以基于与另一个 DataFrame 的连接来构建该条件。delete 方法会返回一个 DeleteResult 对象,其中包含被删除的行数。
delete 是一个操作方法,这意味着调用该方法会发送 SQL 语句到服务器执行。
val updatableDf = session.table("sample_product_data")
val deleteResult = updatableDf.delete(updatableDf("category_id") === 1)
println(s"Number of rows deleted: ${deleteResult.rowsDeleted}")
Merging rows
val mergeResult = target.merge(source, target("id") === source("id"))
.whenNotMatched.insert(Seq(source("id"), source("value")))
.collect()
val mergeResult = target.merge(source, target("id") === source("id"))
.whenMatched.update(Map("value" -> source("value")))
.collect()
7.将数据保存到Table中
df.write.mode(SaveMode.Overwrite).saveAsTable(tableName)
默认情况下,columnOrder 选项被设置为 "index",这意味着 DataFrameWriter 按照列的出现顺序插入值。例如,DataFrameWriter 将从 DataFrame 的第一列插入值到表中的第一列,从 DataFrame 的第二列插入值到表中的第二列,依此类推。
如果你将行插入到现有的表中(SaveMode.Append),并且 DataFrame 中的列名与表中的列名匹配,可以调用 DataFrameWriter.option 方法,传入 "columnOrder" 和 "name" 作为参数。
8.通过DataFrame创建一个View
// 创建persistent view
df.createOrReplaceView("db.schema.viewName")
// 创建tmp view
df.createOrReplaceTempView("db.schema.viewName")
9.Cache DataFrame
在某些情况下,你可能需要执行复杂的查询并保留结果以供后续操作使用(而不是再次执行相同的查询)。在这种情况下,你可以通过调用 DataFrame.cacheResult 方法来缓存 DataFrame 的内容。
该方法的工作方式如下:
- 运行查询。
在调用 cacheResult 之前,你无需调用单独的操作方法来检索结果。cacheResult 是一个执行查询的操作方法。 - 将结果保存在临时表中。
由于 cacheResult 创建了一个临时表,因此你必须对正在使用的模式具有 CREATE TABLE 权限。 - 返回一个 HasCachedResult 对象,该对象提供对临时表中结果的访问。
由于 HasCachedResult 扩展了 DataFrame,你可以对这些缓存数据执行与 DataFrame 上相同的一些操作。
import com.snowflake.snowpark.functions_
// Set up a DataFrame to query a table.
val df = session.table("sample_product_data").filter(col("category_id") > 10)
// Retrieve the results and cache the data.
val cachedDf = df.cacheResult()
// Create a DataFrame containing a subset of the cached data.
val dfSubset = cachedDf.filter(col("category_id") === lit(20)).select(col("name"), col("category_id"))
dfSubset.show()
网友评论