美文网首页Spark
Learning Spark [2] - Spark API结构

Learning Spark [2] - Spark API结构

作者: 屹然1ran | 来源:发表于2021-01-07 15:28 被阅读0次

    RDD

    RDD是spark最基础的抽象类

    其拥有以下几个特点:

    • Dependencies(依赖)
    • Partitions(分区)
    • Compute Function(计算函数):Partition => Iterator[T]

    Dependencies提供了RDD的结构性,例如需要重新输出结果,Spark就可以使用Denpendecies重新创建RDD,来复制一个运行。这个特性使得RDD更加灵活。
    Partitions是的Spark可以将一个任务拆分开,使用不同的Executors平行运行。另外有些时候,例如从HDFS读取信息,Spark会利用本地信息去派发任务到更接近数据的Executors。
    最后,一个RDD拥有Compute Function,他可以为RDD中的数据输出一个遍历器Iterator[T]。

    目前存在以下几个问题:

    • 对于Spark来说,Compute Function是模糊的。例如我们使用了Select(), Filter(), Spark得到的仅仅是一个Lambda表达式。
    • 其次,Iterator[T]数据类型同样是模糊的,Spark直知道这是一个Python的通用类。

    因无法检测计算类型,Spark是没有办法去优化表达式的。

    • 最后,Spark同样不知道T的确切的数据类型。

    Key Merits and Benefits

    例子:根据名字聚合平均年龄。

    使用low-level RDD API:

    ## Create an RDD of tuples (name, age)
    dataRDD = sc.parallelize([("Brooke", 20), ("Denny", 31), ("Jules", 30), ("TD", 20), ("Brooke", 25)])
    
    ## Use map and reduceByKey transformation with their lambda expression to aggregate and then compute average
    agesRDD = (dataRDD.
               map(lambda x:(x[0], (x[1], 1))). # [('Brooke', (20, 1)), ('Denny', (31, 1)), ('Jules', (30, 1)), ('TD', (20, 1)), ('Brooke', (25, 1))]
               reduceByKey(lambda x, y:(x[0] + y[0], x[1] + y[1])). # [('Brooke', (45, 2)), ('Denny', (31, 1)), ('Jules', (30, 1)), ('TD', (20, 1))]
               map(lambda x: (x[0], x[1][0]/x[1][1]))) # [('Brooke', 22.5), ('Denny', 31), ('Jules', 30), ('TD', 20)]
    

    使用High-level DSL:

    from pyspark.sql import SparkSession 
    from pyspark.sql.functions import avg 
    
    ## Create DataFrame Using SparkSession
    spark = (SparkSession.
             builder.
             appName('AuthorsAges').
             getOrCreate())
    
    ## Create a DataFrame
    data_df = spark.createDataFrame([("Brooke", 20), ("Denny", 31), ("Jules", 30), ("TD", 20), ("Brooke", 25)], ['name', 'age'])
    avg_df = data_df.groupby('name').agg(avg('age'))
    avg_df.show()
    

    DataFrame API

    Basic Data Type:

    Structured Data Type:

    Schemas 和创建DataFrames

    Schema指列名和该列的数据类型。提前声明Schema的优点:

    • 免去了Spark判断数据类型的职责
    • 避免Spark去读大量的数据来确认Schemas
    • 检测数据类型不符合的错误

    声明Schemas的两种方法

    • 程式化的声明
    from pyspark.sql.types import * 
    schema = StructType([StructField("author", StringType(), False),  StructField("title", StringType(), False),  StructField("pages", IntegerType(), False)]) 
    
    • DDL(Data Definition Language)
    schema = "author STRING, title STRING, pages INT"
    

    DataFrame运算

    • DataFrameReader & DataFrameWriter
    from pyspark.sql.types import *
    # Programmatic way to define a schema 
    fire_schema = StructType([StructField('CallNumber', IntegerType(), True),                StructField('UnitID', StringType(), True),                StructField('IncidentNumber', IntegerType(), True),                StructField('CallType', StringType(), True),                                  StructField('CallDate', StringType(), True),                      StructField('WatchDate', StringType(), True),                StructField('CallFinalDisposition', StringType(), True),                StructField('AvailableDtTm', StringType(), True),                StructField('Address', StringType(), True),                       StructField('City', StringType(), True),                       StructField('Zipcode', IntegerType(), True),                       StructField('Battalion', StringType(), True),                                 StructField('StationArea', StringType(), True),                       StructField('Box', StringType(), True),                       StructField('OriginalPriority', StringType(), True),                       StructField('Priority', StringType(), True),                       StructField('FinalPriority', IntegerType(), True),                       StructField('ALSUnit', BooleanType(), True),                       StructField('CallTypeGroup', StringType(), True),                StructField('NumAlarms', IntegerType(), True),                StructField('UnitType', StringType(), True),                StructField('UnitSequenceInCallDispatch', IntegerType(), True),                StructField('FirePreventionDistrict', StringType(), True),                StructField('SupervisorDistrict', StringType(), True),                StructField('Neighborhood', StringType(), True),                StructField('Location', StringType(), True),                StructField('RowID', StringType(), True),                StructField('Delay', FloatType(), True)])
    # Use the DataFrameReader interface to read a CSV file 
    sf_fire_file = "/databricks-datasets/learning-spark-v2/sf-fire/sf-fire-calls.csv" fire_df = spark.read.csv(sf_fire_file, header=True, schema=fire_schema)
    
    # Save to a path
    parquet_path = ... 
    fire_df.write.format("parquet").save(parquet_path) 
    
    # Save to Hive Table
    parquet_table = ... # name of the table fire_df.write.format("parquet").saveAsTable(parquet_table)
    
    • Projections and Filters
    few_fire_df = (fire_df  .select("IncidentNumber", "AvailableDtTm", "CallType")   .where(col("CallType") != "Medical Incident")) few_fire_df.show(5, truncate=False)
    
    from pyspark.sql.functions import * 
    fire_df.
    select("CallType").
    where(col("CallType").
    isNotNull()).
    agg(countDistinct("CallType").
    alias("DistinctCallTypes")).
    show()
    

    Spark SQL 和底层引擎

    Spark SQL允许开发者使用ANSI SQL:2003-compatible对有结构且包含schema的数据进行查询。Spark SQL是在Spark 1.3添加至核心引擎,所以大多数high-level程式同样可以使用。除了可以使用类似SQL的查询语句外,Spark SQL还会提供以下几个功能:

    Reference
    Learning Spark 2nd - Lightning Fast Big Data Analysis by Jules S. Damji, Brooke Wenig, Tathagata Das, and Denny Lee

    相关文章

      网友评论

        本文标题:Learning Spark [2] - Spark API结构

        本文链接:https://www.haomeiwen.com/subject/aquzoktx.html