RDD
RDD是spark最基础的抽象类
其拥有以下几个特点:
- Dependencies(依赖)
- Partitions(分区)
- Compute Function(计算函数):Partition => Iterator[T]
Dependencies提供了RDD的结构性,例如需要重新输出结果,Spark就可以使用Denpendecies重新创建RDD,来复制一个运行。这个特性使得RDD更加灵活。
Partitions是的Spark可以将一个任务拆分开,使用不同的Executors平行运行。另外有些时候,例如从HDFS读取信息,Spark会利用本地信息去派发任务到更接近数据的Executors。
最后,一个RDD拥有Compute Function,他可以为RDD中的数据输出一个遍历器Iterator[T]。
目前存在以下几个问题:
- 对于Spark来说,Compute Function是模糊的。例如我们使用了Select(), Filter(), Spark得到的仅仅是一个Lambda表达式。
- 其次,Iterator[T]数据类型同样是模糊的,Spark直知道这是一个Python的通用类。
因无法检测计算类型,Spark是没有办法去优化表达式的。
- 最后,Spark同样不知道T的确切的数据类型。
Key Merits and Benefits
例子:根据名字聚合平均年龄。
使用low-level RDD API:
## Create an RDD of tuples (name, age)
dataRDD = sc.parallelize([("Brooke", 20), ("Denny", 31), ("Jules", 30), ("TD", 20), ("Brooke", 25)])
## Use map and reduceByKey transformation with their lambda expression to aggregate and then compute average
agesRDD = (dataRDD.
map(lambda x:(x[0], (x[1], 1))). # [('Brooke', (20, 1)), ('Denny', (31, 1)), ('Jules', (30, 1)), ('TD', (20, 1)), ('Brooke', (25, 1))]
reduceByKey(lambda x, y:(x[0] + y[0], x[1] + y[1])). # [('Brooke', (45, 2)), ('Denny', (31, 1)), ('Jules', (30, 1)), ('TD', (20, 1))]
map(lambda x: (x[0], x[1][0]/x[1][1]))) # [('Brooke', 22.5), ('Denny', 31), ('Jules', 30), ('TD', 20)]
使用High-level DSL:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg
## Create DataFrame Using SparkSession
spark = (SparkSession.
builder.
appName('AuthorsAges').
getOrCreate())
## Create a DataFrame
data_df = spark.createDataFrame([("Brooke", 20), ("Denny", 31), ("Jules", 30), ("TD", 20), ("Brooke", 25)], ['name', 'age'])
avg_df = data_df.groupby('name').agg(avg('age'))
avg_df.show()
DataFrame API
Basic Data Type:
Structured Data Type:
Schemas 和创建DataFrames
Schema指列名和该列的数据类型。提前声明Schema的优点:
- 免去了Spark判断数据类型的职责
- 避免Spark去读大量的数据来确认Schemas
- 检测数据类型不符合的错误
声明Schemas的两种方法
- 程式化的声明
from pyspark.sql.types import *
schema = StructType([StructField("author", StringType(), False), StructField("title", StringType(), False), StructField("pages", IntegerType(), False)])
- DDL(Data Definition Language)
schema = "author STRING, title STRING, pages INT"
DataFrame运算
- DataFrameReader & DataFrameWriter
from pyspark.sql.types import *
# Programmatic way to define a schema
fire_schema = StructType([StructField('CallNumber', IntegerType(), True), StructField('UnitID', StringType(), True), StructField('IncidentNumber', IntegerType(), True), StructField('CallType', StringType(), True), StructField('CallDate', StringType(), True), StructField('WatchDate', StringType(), True), StructField('CallFinalDisposition', StringType(), True), StructField('AvailableDtTm', StringType(), True), StructField('Address', StringType(), True), StructField('City', StringType(), True), StructField('Zipcode', IntegerType(), True), StructField('Battalion', StringType(), True), StructField('StationArea', StringType(), True), StructField('Box', StringType(), True), StructField('OriginalPriority', StringType(), True), StructField('Priority', StringType(), True), StructField('FinalPriority', IntegerType(), True), StructField('ALSUnit', BooleanType(), True), StructField('CallTypeGroup', StringType(), True), StructField('NumAlarms', IntegerType(), True), StructField('UnitType', StringType(), True), StructField('UnitSequenceInCallDispatch', IntegerType(), True), StructField('FirePreventionDistrict', StringType(), True), StructField('SupervisorDistrict', StringType(), True), StructField('Neighborhood', StringType(), True), StructField('Location', StringType(), True), StructField('RowID', StringType(), True), StructField('Delay', FloatType(), True)])
# Use the DataFrameReader interface to read a CSV file
sf_fire_file = "/databricks-datasets/learning-spark-v2/sf-fire/sf-fire-calls.csv" fire_df = spark.read.csv(sf_fire_file, header=True, schema=fire_schema)
# Save to a path
parquet_path = ...
fire_df.write.format("parquet").save(parquet_path)
# Save to Hive Table
parquet_table = ... # name of the table fire_df.write.format("parquet").saveAsTable(parquet_table)
- Projections and Filters
few_fire_df = (fire_df .select("IncidentNumber", "AvailableDtTm", "CallType") .where(col("CallType") != "Medical Incident")) few_fire_df.show(5, truncate=False)
from pyspark.sql.functions import *
fire_df.
select("CallType").
where(col("CallType").
isNotNull()).
agg(countDistinct("CallType").
alias("DistinctCallTypes")).
show()
Spark SQL 和底层引擎
Spark SQL允许开发者使用ANSI SQL:2003-compatible对有结构且包含schema的数据进行查询。Spark SQL是在Spark 1.3添加至核心引擎,所以大多数high-level程式同样可以使用。除了可以使用类似SQL的查询语句外,Spark SQL还会提供以下几个功能:
Reference
Learning Spark 2nd - Lightning Fast Big Data Analysis by Jules S. Damji, Brooke Wenig, Tathagata Das, and Denny Lee
网友评论