美文网首页pysparkpyspark
Spark 结构化API DataFrames

Spark 结构化API DataFrames

作者: spraysss | 来源:发表于2019-01-04 16:03 被阅读0次

    结构化API

    结构化API是处理各种数据的工具,从非结构化log文件到半结构化CSV文件和高度结构化的Parquet文件,spark中三种主要的三类结构化API为:

    • Datasets
    • DataFrames
    • SQL tables and views

    App dag stage task

    DataFrame Dataset

    DataFrame、 Dataset是(分布式)1-。4类表集合具有定义良好的行和列
    DataFrame无类型(runtime check)
    Dataset有类型 (compile time)

    对于Spark来说, DataFrame是类型为 RowDataset
    特点:

    • immutable
    • lazily evaluated plans

    .Scala type reference

    Schemas

    模式定义了一个DataFrame的列名和类型。
    可以手动定义schema或者schema on read
    Spark类型直接映射到Spark维护的不同语言api,在Scala、Java、Python、SQLR中,每种api都有一个查询表,简单的说最终代码 使用纯spark执行( Spark’s internal Catalyst representation)

    结构化```API的执行流程

    1. 编写DataFrame/Dataset/SQL 代码.
    2. 如果代码是正确的spark将其转化为Logical Plan.
    3. Logical Plan 转为 Physical Plan
      4.Spark 在集群上执行Physical Plan (RDD 操作)

    Logical Planning

    catalogy是一个包含所有tableDataFrame信息的仓库,用于check代码是否有问题 (eg:table column 不存在)
    check 通过的plan 通过Catalyst Optimizer优化
    用户可以扩展Catalyst自定义优化规则

    Physical Planning

    DataFrams 相关操作

    加载数据

    df=spark.read.json("file:///usr/local/xldw/2015-summary.json")
    

    或者

    df = spark.read.format("json").load("file:///usr/local/xldw/2015-summary.json")
    

    查看schema

    df.schema
    df.printSchema()
    
    org.apache.spark.sql.types.StructType = ...
    StructType(StructField(DEST_COUNTRY_NAME,StringType,true),
    StructField(ORIGIN_COUNTRY_NAME,StringType,true),
    StructField(count,LongType,true))
    
    • spark 可以根据文件的前几行推断出schema(schema on read)
    • schemaStructType 实例
    • StructType 由一个StructFields 构成
    • Boolean代表这个列是否可以为空

    手动指定Schema

    // in Scala
    import org.apache.spark.sql.types.{StructField, StructType, StringType, LongType}
    import org.apache.spark.sql.types.Metadata
    val myManualSchema = StructType(Array(StructField("DEST_COUNTRY_NAME", StringType,true),
      StructField("ORIGIN_COUNTRY_NAME", StringType, true),
      StructField("count", LongType, false,
      Metadata.fromJson("{\"hello\":\"world\"}"))
    ))
    val df = spark.read.format("json").schema(myManualSchema).load("/data/flight-data/json/2015-summary.json")
    

    Columns

    spark来说, 列是一种逻辑结构,它仅表示通过表达式按每个记录计算的值. 这意味着要为列赋实值,我们需要有一行;为了得到一行,我们需要一个DataFrame

    构造和引用列的两种最基本的方式:
    colcolumn 方法

    Column作为表达式

    expr函数实际上可以解析字符串中的转换和列引用,然后可以将它们传递到进一步的转换中,下面三者等价:

    col("someCol") - 5
    expr("someCol - 5")
    expr("someCol") - 5
    

    Columns 只是表达式
    这些列和这些列的转换编译成与解析表达式相同的逻辑计划

    这意味着您可以将表达式编写为DataFrame代码或SQL表达式,并获得完全相同的性能特征

    from pyspark.sql.functions import expr, col, column
    df.select(
    expr("DEST_COUNTRY_NAME"),
    col("DEST_COUNTRY_NAME"),
    column("DEST_COUNTRY_NAME"))\
    .show(2)
    

    Records and Rows

    spark中一行使用Row对象表示,Spark使用列表达式操作行对象,以生成可用的值,行对象在内部表示字节数组。字节数组接口永远不会显示给用户,因为我们只使用列表达式来操作它们

    展示

    df.show(2)
    
    +-----------------+-------------------+-----+
    |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
    +-----------------+-------------------+-----+
    |    United States|            Romania|   15|
    |    United States|            Croatia|    1|
    +-----------------+-------------------+-----+
    

    select and selectExpr

    df.select("DEST_COUNTRY_NAME").show(2)
    -- in SQL
    SELECT DEST_COUNTRY_NAME FROM dfTable LIMIT 2
    
    # select muti columns
    df.select("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME").show(2)
    
    # select function favor
    df.select(expr("DEST_COUNTRY_NAME").alias("dest")).show(2)
    
    # select sql favor
    df.select(expr("DEST_COUNTRY_NAME AS destination")).show(2)
    
    # selectExpr
    df.selectExpr("DEST_COUNTRY_NAME as dest", "ORIGIN_COUNTRY_NAME").show(2)
    # selectExpr  opens up the true power of Spark
    df.selectExpr("*",
                  "(DEST_COUNTRY_NAME=ORIGIN_COUNTRY_NAME) as withinCountry").show(2)
    
    
    df.selectExpr("avg(count)", "count(distinct(DEST_COUNTRY_NAME))").show(1)
    
    

    字面量

    df.select(expr("*"), lit(1).alias("One")).show(2)
    

    添加列

    df.withColumn("numberOne", lit(1)).show(2)
    -- in SQL
    SELECT *, 1 as numberOne FROM dfTable LIMIT 2
    
    df.withColumn("withinCountry", expr("ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME"))
    .show(2)
    //rename
    df.withColumn("Destination", expr("DEST_COUNTRY_NAME")).columns
    

    重命名列

    df.withColumnRenamed("DEST_COUNTRY_NAME", "dest").columns
    

    保留字符和关键字

    withColumn不用特殊处理
    selectExpr需要加上`

    dfWithLongColumnName = df.withColumn("this is a long column", expr("ORIGIN_COUNTRY_NAME"))
    dfWithLongColumnName.selectExpr("`this is a long column` as `long column`").show(2)
    

    大小写敏感性

    默认spark大小写不敏感,可以通过配置开启大小写敏感

    set spark.sql.caseSensitive true
    

    删除列

    df.drop("ORIGIN_COUNTRY_NAME").columns
    

    改变列的类型cast

    df.withColumn("count2", col("count").cast("long"))
    -- in SQL
    SELECT *, cast(count as long) AS count2 FROM dfTabl
    

    过滤行

    wherefilter有一样的过滤功能

    df.filter(col("count") < 2).sho
    df.where("count < 2").show(2)
    

    spark自动在同一时间执行所有过滤操作,而不管过滤器的顺序如何。这意味着,如果您想指定多个过滤器,只需按顺序将它们链接起来,其余的由Spark处理

    df.where(col("count") < 2).where(col("ORIGIN_COUNTRY_NAME") != "Croatia")\
    .show(2)
    

    Unique

    df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").distinct().count()
    -- in SQL
    SELECT COUNT(DISTINCT(ORIGIN_COUNTRY_NAME, DEST_COUNTRY_NAME)) FROM dfTable
    

    随机分片

    dataFrames = df.randomSplit([0.25, 0.75], seed)
    dataFrames[0].count() > dataFrames[1].count() # False
    

    Union

    from pyspark.sql import Row
    schema = df.schema
    newRows = [
    Row("New Country", "Other Country", 5L),
    Row("New Country 2", "Other Country 3", 1L)
    ]
    parallelizedRows = spark.sparkContext.parallelize(newRows)
    newDF = spark.createDataFrame(parallelizedRows, schema)
    
    df.union(newDF)\
    .where("count = 1")\
    .where(col("ORIGIN_COUNTRY_NAME") != "United States")\
    .show()
    

    sort

    sortorderBy 有相同的排序功能

    df.sort("count").show(5)
    df.orderBy("count", "DEST_COUNTRY_NAME").show(5)
    df.orderBy(col("count"), col("DEST_COUNTRY_NAME")).show(5)
    
    from pyspark.sql.functions import desc, asc
    df.orderBy(expr("count desc")).show(2)
    df.orderBy(col("count").desc(), col("DEST_COUNTRY_NAME").asc()).show(2)
    -- in SQL
    SELECT * FROM dfTable ORDER BY count DESC, DEST_COUNTRY_NAME ASC LIMIT 2
    

    一个高级技巧是使用asc_nulls_firstdesc_nulls_first、asc_nulls_lastdesc_nulls_last来指定希望DataFrame空值按顺序出现在哪里

    出于优化目的,有时建议先对每个分区排序然后执行之后的transformations

    spark.read.format("json").load("/data/flight-data/json/*-summary.json")\
    .sortWithinPartitions("count")
    

    Limit

    df.limit(5).show()
    -- in SQL
    SELECT * FROM dfTable LIMIT 6
    
    df.orderBy(expr("count desc")).limit(6).show()
    -- in SQL
    SELECT * FROM dfTable ORDER BY count desc LIMIT 6
    

    重新分区和合并

    Sparkdriver程序中维护集群的状态。有时您需要向driver程序收集一些数据,以便在本地机器上对其进行操作
    到目前为止,我们还没有明确定义这个操作。然而,我们使用了几种不同的方法来实现这一点,它们实际上都是一样的
    collect从整个DataFrame获取所有数据,take获取前N行,show以表格样式打印

    collectDF = df.limit(10)
    collectDF.take(5) # take works with an Integer count
    collectDF.show() # this prints it out nicely
    collectDF.show(5, False)
    collectDF.collect()
    collectDF.toLocalIterator()
    

    相关文章

      网友评论

        本文标题:Spark 结构化API DataFrames

        本文链接:https://www.haomeiwen.com/subject/tbmikqtx.html