美文网首页
Snowpark Scala Example

Snowpark Scala Example

作者: 阿猫阿狗Hakuna | 来源:发表于2023-06-05 14:32 被阅读0次

    1.版本要求

    Scala:2.12 (不支持2.13)
    JVM for Scala: 11.x

    2.Setting Up IntelliJ IDEA CE for Snowpark Scala

    (1) 创建项目


    image.png

    (2) 在SBT文件中加入如下依赖

    libraryDependencies += "com.snowflake" % "snowpark" % "1.8.0"
    

    (3) 编写如下代码并运行

    import com.snowflake.snowpark._
    import com.snowflake.snowpark.functions._
    
    object Main {
      def main(args: Array[String]): Unit = {
        val configs = Map(
          "URL" -> "freewheelmediadev.us-east-1.snowflakecomputing.com",
          "USER" -> "pgao@freewheel.tv",
          "PASSWORD" -> "Gp5859561@",
          "ROLE" -> "USER_PGAO_ROLE",
          "WAREHOUSE" -> "WH_USER_PGAO_XS",
          "DB" -> "DEMO_DB",
          "SCHEMA" -> "LAKE_HOUSE_HUDI"
        )
        val session = Session.builder.configs(configs).create
        session.sql("show tables").show()
      }
    }
    

    3.Working with DataFrames

    (1) 建立数据表

    CREATE OR REPLACE TABLE sample_product_data (id INT, parent_id INT, category_id INT, name VARCHAR, serial_number VARCHAR, key INT, "3rd" INT);
    INSERT INTO sample_product_data VALUES
        (1, 0, 5, 'Product 1', 'prod-1', 1, 10),
        (2, 1, 5, 'Product 1A', 'prod-1-A', 1, 20),
        (3, 1, 5, 'Product 1B', 'prod-1-B', 1, 30),
        (4, 0, 10, 'Product 2', 'prod-2', 2, 40),
        (5, 4, 10, 'Product 2A', 'prod-2-A', 2, 50),
        (6, 4, 10, 'Product 2B', 'prod-2-B', 2, 60),
        (7, 0, 20, 'Product 3', 'prod-3', 3, 70),
        (8, 7, 20, 'Product 3A', 'prod-3-A', 3, 80),
        (9, 7, 20, 'Product 3B', 'prod-3-B', 3, 90),
        (10, 0, 50, 'Product 4', 'prod-4', 4, 100),
        (11, 10, 50, 'Product 4A', 'prod-4-A', 4, 100),
        (12, 10, 50, 'Product 4B', 'prod-4-B', 4, 100);
    

    (2) 构建Dataframe

    • 要从表、视图或流中的数据创建一个DataFrame,可以调用table方法:
    // Create a DataFrame from the data in the "sample_product_data" table.
    val dfTable = session.table("sample_product_data")
    
    // To print out the first 10 rows, call:
    //   dfTable.show()
    

    session.table方法返回一个Updatable对象。Updatable继承自DataFrame,并提供了额外的方法来处理表中的数据(例如,用于更新和删除数据的方法)。

    • 要从值的序列创建一个DataFrame,可以调用createDataFrame方法:
    // Create a DataFrame containing a sequence of values.
    // In the DataFrame, name the columns "i" and "s".
    val dfSeq = session.createDataFrame(Seq((1, "one"), (2, "two"))).toDF("i", "s")
    
    • 要创建一个包含一定范围值的DataFrame,可以调用range方法:
    // Create a DataFrame from a range
    val dfRange = session.range(1, 10, 2)
    
    • 要为Stage中的文件创建一个DataFrame,可以调用read方法来获取一个DataFrameReader对象。在DataFrameReader对象中,调用与文件中数据格式相对应的方法:
    // Create a DataFrame from data in a stage.
    val dfJson = session.read.json("@mystage2/data1.json")
    
    • 要创建一个用于保存 SQL 查询结果的 DataFrame,可以调用 sql 方法:
    // Create a DataFrame from a SQL query
    val dfSql = session.sql("SELECT name from products")
    

    (3) 对Dataframe进行transformation

    • 要指定应返回哪些行,请调用 filter 方法:
    // Import the col function from the functions object.
    import com.snowflake.snowpark.functions._
    
    // Create a DataFrame for the rows with the ID 1
    // in the "sample_product_data" table.
    //
    // This example uses the === operator of the Column object to perform an
    // equality check.
    val df = session.table("sample_product_data").filter(col("id") === 1)
    df.show()
    
    • 要指定应选择哪些列,请调用 select 方法:
    // Import the col function from the functions object.
    import com.snowflake.snowpark.functions._
    
    // Create a DataFrame that contains the id, name, and serial_number
    // columns in te "sample_product_data" table.
    val df = session.table("sample_product_data").select(col("id"), col("name"), col("serial_number"))
    df.show()
    

    每个方法返回一个经过转换的新的 DataFrame 对象(不会影响原始的 DataFrame 对象)。这意味着如果你想应用多个转换操作,你可以链式调用方法,将每个后续的转换方法调用应用在前一个方法返回的新 DataFrame 对象上。
    请注意,这些transformation方法并不从Snowflake数据库中检索数据。(在执行DataFrame的action方法时,会执行数据检索。)transformation方法仅仅指定了如何构造SQL语句。

    • 处理不同Dataframe中的相同Column
      当在两个具有相同列名的不同 DataFrame 对象中引用列时(例如,在该列上进行连接操作),您可以在一个 DataFrame 对象中使用 DataFrame.col 方法来引用该对象中的列(例如,df1.col("name") 和 df2.col("name"))。
    // Create a DataFrame that joins two other DataFrames (dfLhs and dfRhs).
    // Use the DataFrame.col method to refer to the columns used in the join.
    val dfJoined = dfLhs.join(dfRhs, dfLhs.col("key") === dfRhs.col("key")).select(dfLhs.col("value").as("L"), dfRhs.col("value").as("R"))
    
    • 将列对象转换为特定类型
    // Import for the lit function.
    import com.snowflake.snowpark.functions._
    // Import for the DecimalType class..
    import com.snowflake.snowpark.types._
    
    val decimalValue = lit(0.05).cast(new DecimalType(5,2))
    
    • 链式调用方法
    val dfProductInfo = session.table("sample_product_data").filter(col("id") === 1).select(col("name"), col("serial_number"))
    dfProductInfo.show()
    
    • 获取列定义
      要检索DataFrame数据集中列的定义,请调用schema方法。此方法返回一个StructType对象,其中包含StructField对象的数组。每个StructField对象包含列的定义。
    // Get the StructType object that describes the columns in the
    // underlying rowset.
    val tableSchema = session.table("sample_product_data").schema
    println("Schema for sample_product_data: " + tableSchema);
    

    4.Dataframe Join

    (1) 示例数据

    create or replace table sample_a (
      id_a integer,
      name_a varchar,
      value integer
    );
    insert into sample_a (id_a, name_a, value) values
      (10, 'A1', 5),
      (40, 'A2', 10),
      (80, 'A3', 15),
      (90, 'A4', 20)
    ;
    create or replace table sample_b (
      id_b integer,
      name_b varchar,
      id_a integer,
      value integer
    );
    insert into sample_b (id_b, name_b, id_a, value) values
      (4000, 'B1', 40, 10),
      (4001, 'B2', 10, 5),
      (9000, 'B3', 80, 15),
      (9099, 'B4', null, 200)
    ;
    create or replace table sample_c (
      id_c integer,
      name_c varchar,
      id_a integer,
      id_b integer
    );
    insert into sample_c (id_c, name_c, id_a, id_b) values
      (1012, 'C1', 10, null),
      (1040, 'C2', 40, 4000),
      (1041, 'C3', 40, 4001)
    ;
    

    (2)指定要Join的Column

    val dfLhs = session.table("sample_a")
    val dfRhs = session.table("sample_b")
    val dfJoined = dfLhs.join(dfRhs, dfLhs.col("id_a") === dfRhs.col("id_a"))
    val dfSelected = dfJoined.select(dfLhs.col("value").as("LeftValue"), dfRhs.col("value").as("RightValue"))
    dfSelected.show()
    

    (3) 指定Join类型
    Join类型有以下几种:

    image.png
    // Create a DataFrame that performs a left outer join on
    // "sample_a" and "sample_b" on the column named "id_a".
    val dfLhs = session.table("sample_a")
    val dfRhs = session.table("sample_b")
    val dfLeftOuterJoin = dfLhs.join(dfRhs, dfLhs.col("id_a") === dfRhs.col("id_a"), "left")
    dfLeftOuterJoin.show()
    

    5.对DataFrame进行Action操作

    如前所述,DataFrame是惰性执行的,这意味着直到执行某个操作之前,SQL语句不会被发送到服务器执行。

    执行同步Action操作

    image.png

    for example:

    // Create a DataFrame for the "sample_product_data" table.
    val dfProducts = session.table("sample_product_data")
    
    // Send the query to the server for execution and
    // print the count of rows in the table.
    println("Rows returned: " + dfProducts.count())
    

    执行异步Action操作

    要异步执行操作,调用async方法返回一个"async actor"对象(例如DataFrameAsyncActor),然后在该对象中调用异步操作方法。
    这些异步 actor 对象的操作方法会返回一个 TypedAsyncJob 对象,你可以使用该对象来检查异步操作的状态并获取操作的结果。

    image.png

    从返回的 TypedAsyncJob 对象中,你可以进行以下操作:

    • 要确定操作是否已完成,调用 isDone 方法。
    • 要获取与操作对应的查询 ID,请调用 getQueryId 方法。
    • 要返回操作的结果(例如 collect 方法的 Row 对象数组或 count 方法的行数),请调用 getResult 方法。请注意,getResult 是一个阻塞调用。
    • 要取消操作,请调用 cancel 方法。

    example:

    // Create a DataFrame with the "id" and "name" columns from the "sample_product_data" table.
    // This does not execute the query.
    val df = session.table("sample_product_data").select(col("id"), col("name"))
    
    // Execute the query asynchronously.
    // This call does not block.
    val asyncJob = df.async.collect()
    // Check if the query has completed execution.
    println(s"Is query ${asyncJob.getQueryId()} done? ${asyncJob.isDone()}")
    // Get an Array of Rows containing the results, and print the results.
    // Note that getResult is a blocking call.
    val results = asyncJob.getResult()
    results.foreach(println)
    

    指定最长等待时间

    // Wait a maximum of 10 seconds for the query to complete before retrieving the results.
    val results = asyncJob.getResult(10)
    

    通过ID获取异步query

    val asyncJob = session.createAsyncJob(myQueryId)
    // Check if the query has completed execution.
    println(s"Is query ${asyncJob.getQueryId()} done? ${asyncJob.isDone()}")
    // If you need to retrieve the results, call getRows to return an Array of Rows containing the results.
    // Note that getRows is a blocking call.
    val rows = asyncJob.getRows()
    rows.foreach(println)
    

    6.在Table中Updating/Deleting/Merging Rows

    当你调用 Session.table 来为一个表创建一个 DataFrame 对象时,该方法会返回一个 Updatable 对象,它扩展了 DataFrame,并提供了额外的方法用于更新和删除表中的数据。
    如果你需要更新或删除表中的行,可以使用 Updatable 类的以下方法:

    • 调用 update 来更新表中的现有行。
    • 调用 delete 来从表中删除行。
    • 调用 merge 来根据第二个表或子查询中的数据,向一个表中插入、更新和删除行(这相当于 SQL 中的 MERGE 命令)。

    Updating rows
    对于 update 方法,传入一个 Map,将要更新的列与对应的值关联起来。update 方法会返回一个 UpdateResult 对象,其中包含被更新的行数。
    update 是一个Action方法,这意味着调用该方法会发送 SQL 语句到服务器执行。

    val updatableDf = session.table("sample_product_data")
    val updateResult = updatableDf.update(Map("count" -> lit(1)))
    println(s"Number of rows updated: ${updateResult.rowsUpdated}")
    

    Deleting rows
    对于 delete 方法,你可以指定一个条件来标识要删除的行,并且可以基于与另一个 DataFrame 的连接来构建该条件。delete 方法会返回一个 DeleteResult 对象,其中包含被删除的行数。
    delete 是一个操作方法,这意味着调用该方法会发送 SQL 语句到服务器执行。

    val updatableDf = session.table("sample_product_data")
    val deleteResult = updatableDf.delete(updatableDf("category_id") === 1)
    println(s"Number of rows deleted: ${deleteResult.rowsDeleted}")
    

    Merging rows

    val mergeResult = target.merge(source, target("id") === source("id"))
                          .whenNotMatched.insert(Seq(source("id"), source("value")))
                          .collect()
    
    val mergeResult = target.merge(source, target("id") === source("id"))
                          .whenMatched.update(Map("value" -> source("value")))
                          .collect()
    

    7.将数据保存到Table中

    df.write.mode(SaveMode.Overwrite).saveAsTable(tableName)
    

    默认情况下,columnOrder 选项被设置为 "index",这意味着 DataFrameWriter 按照列的出现顺序插入值。例如,DataFrameWriter 将从 DataFrame 的第一列插入值到表中的第一列,从 DataFrame 的第二列插入值到表中的第二列,依此类推。
    如果你将行插入到现有的表中(SaveMode.Append),并且 DataFrame 中的列名与表中的列名匹配,可以调用 DataFrameWriter.option 方法,传入 "columnOrder" 和 "name" 作为参数。

    8.通过DataFrame创建一个View

    // 创建persistent view
    df.createOrReplaceView("db.schema.viewName")
    
    // 创建tmp view
    df.createOrReplaceTempView("db.schema.viewName")
    

    9.Cache DataFrame

    在某些情况下,你可能需要执行复杂的查询并保留结果以供后续操作使用(而不是再次执行相同的查询)。在这种情况下,你可以通过调用 DataFrame.cacheResult 方法来缓存 DataFrame 的内容。
    该方法的工作方式如下:

    • 运行查询。
      在调用 cacheResult 之前,你无需调用单独的操作方法来检索结果。cacheResult 是一个执行查询的操作方法。
    • 将结果保存在临时表中。
      由于 cacheResult 创建了一个临时表,因此你必须对正在使用的模式具有 CREATE TABLE 权限。
    • 返回一个 HasCachedResult 对象,该对象提供对临时表中结果的访问。
      由于 HasCachedResult 扩展了 DataFrame,你可以对这些缓存数据执行与 DataFrame 上相同的一些操作。
    import com.snowflake.snowpark.functions_
    
    // Set up a DataFrame to query a table.
    val df = session.table("sample_product_data").filter(col("category_id") > 10)
    // Retrieve the results and cache the data.
    val cachedDf = df.cacheResult()
    // Create a DataFrame containing a subset of the cached data.
    val dfSubset = cachedDf.filter(col("category_id") === lit(20)).select(col("name"), col("category_id"))
    dfSubset.show()
    

    相关文章

      网友评论

          本文标题:Snowpark Scala Example

          本文链接:https://www.haomeiwen.com/subject/txiuedtx.html