1. 摘要
主题:在这篇文章中可以找到一些简单的示例说明Spark在读取存储在 Parquet 中的分区表时的重要特性,尤其是性能调优。涵盖的主要主题是:
•分区修剪
•列投影
•谓词下推
•用于调查 Parquet 元数据的工具
•测量 Spark 指标的工具
动机:Spark 和 Parquet 的结合是目前一个非常流行的构建可扩展分析平台的基础。性能、可扩展性和易用性是该解决方案的关键点,对用户非常有吸引力。也是本文描述的工作,特别是 Spark 工作负载和 Parquet 内部的性能特征,以更好地了解幕后发生的事情,哪些工作正常以及目前的一些限制是什么。
2. 实验环境
这篇文章有些示例代码片段,以便用户能够在测试环境中使用这些示例。我在这篇文章中使用了来自 TPCDS 基准测试架构的表。Spark 上的 TPCDS 设置说明可以在 Github 上找到,Spark SQL Performance Tests[1]
本文在 12 个节点的 YARN/Hadoop 集群上使用 Spark来测试示例和性能,但这并非绝对依赖,你可以使用本地文件系统以及本地模式来运行相关测试。我们使用 spark-shell 运行了大部分示例,但是这些示例使用 Spark SQL,因此在大多数情况下在 PySpark 和 Notebook环境中也不用变化。
本文中讨论的实验室已经使用 Spark 2.2.0 版(候选发布版)和 Spark 2.1.1版本进行测试。Spark 2.2.0 依赖 Parquet 1.8.2版本(Spark 2.1.1 依赖 Parquet 1.8.1)。
2.1 测试的Parquet表
此处提供的示例使用TPCDS 模式[2]的 1500 GB 规模生成的表。示例中只使用了一张表,我们选择使用最大的事实表:STORE_SALES,该表是分区表。
实验环境中表总大小为 185 GB,考虑到 HDFS的三副本,它总共增加了 556 GB,可以通过如下方式查看:
$ hdfs dfs -du -h -s TPCDS/tpcds_1500/store_sales
185.3 G 556.0 G TPCDS/tpcds_1500/store_sales
通过列出表根目录的内容可以看到分区结构。我们使用的测试模式中有 1824 个分区,每个分区都组织在一个单独的文件夹中,文件夹名称包含分区键,这些文件是用 snappy 压缩的,每个单独文件的大小取决于给定分区中的数据量,下面是构成测试模式中 store_sale 表的文件之一的路径和大小示例:
Name: TPCDS/tpcds_1500/store_sales/ss_sold_date_sk=2452621/part-00077-57653b27-17f1-4069-85f2-7d7adf7ab7df.snappy.parquet
Bytes: 208004821
2.2 Spark and Parquet
分区是许多数据库和数据处理框架的一项功能,它是运行大规模 Spark 作业的关键,Spark 以一种直接的方式处理 Parquet 中的分区表,上一段中描述的 TPCDS 模式中的 STORES_SALES 是如何在文件系统(在这种情况下为 HDFS)上实现分区的示例,Spark 可以读取存储在 Parquet 中的表并使用简单的 API 进行分区发现,下面是使用DataFrame方式读取 STORE_SALES 表的示例
val df = spark.read.
format("parquet").
load("TPCDS/tpcds_1500/store_sales")
下面是将 Spark DataFrame df 按分区写入 Parquet 文件的示例(按照表 STORE_SALES 的示例,分区键是 ss_sold_date_sk)
df.write.
partitionBy("ss_sold_date_sk").
parquet("TPCDS/tpcds_1500/store_sales_copy")
3. 分区裁剪
让我们从简单的事情开始探索:分区裁剪。此功能是大多数实现分区的系统所共有的,它可以通过减少处理查询/数据访问所需的 I/O 来显着加快工作负载速度。分区裁剪背后的基本思想,至少是这里讨论的示例中最简单的单表访问形式,是基于分区键上的过滤器仅从分区列表中读取数据,跳过其余部分。说明该概念的一个示例是query(2),然而在开始之前我们先介绍一个"基线工作负载",目的是有一个基准参考点来比较这篇文章中测试的所有优化的结果。
1.测试基线:全表扫描,无分区裁剪
在 YARN 集群上使用 spark-shell运行作业,当然也可以在本地模式下(即使用 --master local[*])或使用 pyspark 或Notebook来运行测试。启动 spark-shell(根据您的环境进行自定义)的命令如下
$ spark-shell --num-executors 12 --executor-cores 4 --executor-memory 4g
下一步是使用 Spark Dataframe API 读取 Parquet 中的文件,并将生成的 DataFrame 注册为 Spark 中的临时视图。
spark.read.format("parquet").load("TPCDS/tpcds_1500/store_sales").createOrReplaceTempView("store_sales")
这个 Spark SQL 命令会导致对表 store_sales 的所有分区进行全面扫描,我们将在本文中将其用作"基线工作负载"。
// query (1), this is a full scan of the table store_sales
spark.sql("select * from store_sales where ss_sales_price=-1.0").collect()
在测试中查询读取了大约 185 GB 的数据和 43 亿行,可以从 Spark Web UI[3](查找阶段指标:输入大小/记录)或使用本文稍后讨论的工具 sparkMeasure 找到此类性能指标,以下是在query(1) 测试中衡量的关键指标:
•Total Time Across All Tasks: 59 min
•Locality Level Summary: Node local: 1675
•Input Size / Records: 185.3 GB / 4319943621
•Duration: 1.3 min
查询的执行计划显示了Spark如何执行查询:它分发给Executor读取HDFS上的所有分区/文件,然后过滤"ss_sales_price = -1",最后将结果集收集到Driver。请注意可以在 Web UI 中或使用 Spark DataFrame 上的explain方法打印执行计划。
注意:在此阶段可以忽略测试查询中的 where 子句"ss_sales_price = -1.0",这样做是为了让查询返回一个空的结果集(没有销售价格是负数),而不是返回 185 GB 的数据来让Driver挂掉!执行query(1)时发生的情况是 Spark 必须在应用过滤器"ss_sales_price = -1.0"之前扫描 Parquet 中的所有表分区(文件),因此此示例可以当做是表全扫描基线工作负载,在这篇文章的关于谓词下推的讨论中找到有关其工作原理和原因的更多详细信息。
2.在分区键上使用过滤器并进行分区裁剪
这是 Spark SQL 可以使用分区裁剪的查询示例,该查询类似于基线query(1),但对分区键的附加过滤器进行了变更,查询可以通过只读取 STORE_SALES 表的一个分区来执行。
// query(2), example of partition pruning
spark.sql("select * from store_sales where ss_sold_date_sk=2452621 and ss_sales_price=-1.0").collect()
query(2)的执行比query(1) 的快得多,这是因为查询SQL和过滤器是不同的,其中值得注意的是query(2)只需要读取一个分区,这会产生巨大的差异。Web UI 指标显示query(2)只需要读取 198.6 MB 的数据(4502609 条记录)。
•Total Time Across All Tasks: 6 s
•Locality Level Summary: Node local: 48
•Input Size / Records: 198.6 MB / 4502609
•Duration: 3 s
执行计划确认分区键"ss_sold_date_sk=2452621"上的过滤器已经下推到 Parquet 读取器,它可以使用该信息来读取包含该分区数据的文件。注意:在前面的例子中,额外的过滤器"ss_sales_price = -1"用于返回一个空集,而不是用结果集填充Driver端,如前所述,为了比较扫描与分区裁剪,因为过滤器仅在 Spark 读取分区数据后评估。
回顾一下本节展示了两个使用 Spark 读取 Parquet 分区表的示例。第一个示例(query 1)是基线工作负载,对整个表执行全扫描,第二个示例(query 2)对分区进行滤器允许 Spark 使用分区裁剪来减少 I/O 。如果 Spark 未使用分区裁剪,则第二个查询也必须扫描全表。
4. 列投影
此功能背后的想法很简单:只需读取查询需要处理的列的数据并跳过其余列的数据。Parquet 等面向列的数据格式可以很自然地实现此功能,这与面向行的数据格式形成对比,后者通常用于关系数据库和/或系统,其中优化单行插入和更新非常重要。
列投影可以显着减少读取表格所需的工作并提高性能,实际的性能提升取决于查询,特别是需要读取的数据/列的比例来回答查询背后的业务问题。
例如,用于示例query(1) 和 (2) 的表"store_sales"有 23 列,对于不需要检索表所有列的值,而是需要检索完整模式的子集的查询,Spark 和 Parquet 可以优化 I/O 并减少从存储读取的数据量。
此命令显示"store_sales"表有 23 列并显示它们的名称和数据类型
spark.sql("desc store_sales").show(50)
+--------------------+------------+-------+
| col_name| data_type|comment|
+--------------------+------------+-------+
| ss_sold_time_sk| int| null|
| ss_item_sk| int| null|
| ss_customer_sk| int| null|
| ss_cdemo_sk| int| null|
| ss_hdemo_sk| int| null|
| ss_addr_sk| int| null|
| ss_store_sk| int| null|
| ss_promo_sk| int| null|
| ss_ticket_number| int| null|
| ss_quantity| int| null|
| ss_wholesale_cost|decimal(7,2)| null|
| ss_list_price|decimal(7,2)| null|
| ss_sales_price|decimal(7,2)| null|
| ss_ext_discount_amt|decimal(7,2)| null|
| ss_ext_sales_price|decimal(7,2)| null|
|ss_ext_wholesale_...|decimal(7,2)| null|
| ss_ext_list_price|decimal(7,2)| null|
| ss_ext_tax|decimal(7,2)| null|
| ss_coupon_amt|decimal(7,2)| null|
| ss_net_paid|decimal(7,2)| null|
| ss_net_paid_inc_tax|decimal(7,2)| null|
| ss_net_profit|decimal(7,2)| null|
| ss_sold_date_sk| int| null|
+--------------------+------------+-------+
以下示例query(3) 与前面讨论的基线query(1) 类似,但显着的变化是query(3) 仅使用 4 列:ss_sold_date_sk、ss_item_sk、ss_list_price 和 ss_sales_price 。与需要读取整个表的query(1)相比减少了 I/O 。
// query (3), an example of column pruning
spark.sql("select ss_sold_date_sk, ss_item_sk, ss_list_price from store_sales where ss_sales_price=-1.0").collect()
在测试系统上执行查询后只读取了 22.4 GB:
•Total Time Across All Tasks: 13 min
•Locality Level Summary: Node local: 1675
•Input Size / Records: 22.4 GB / 4319943621
•Duration: 18 s
将此与基线query()1进行比较,后者访问了 185 GB 的数据。
执行计划确认查询只需要读取 ss_sold_date_sk、ss_item_sk、ss_list_price 列的数据:
5. 谓词下推
谓词下推是 Spark 和 Parquet 的另一个特性,它可以通过减少从 Parquet 文件读取的数据量来提高查询性能,谓词下推的工作原理是根据 Parquet 文件中存储的元数据评估查询中的过滤谓词,Parquet 可以选择在其文件的相关元数据部分存储统计信息(特别是列块的最小值和最大值),并且可以使用该信息做出决策,例如如果提供的过滤谓词值,如果查询中的值超出了为给定列存储的值范围则跳过读取数据块,这是一个简化的解释,还有更多细节和异常没有捕捉到,本篇博文中讨论 Parquet 内部结构部分可以找到更多详细信息。
5.1 谓词下推提高查询性能示例
这是一个示例,其中谓词下推显着提高 Parquet 上的 Spark 查询的性能。
// query (4), example of predicate push down
spark.sql("select * from store_sales where ss_quantity=-1").collect()
query(4)过滤谓词"where ss_quantity = -1",STORE_SALES 表中的谓词从不为真(ss_quantity 是正值),在这种情况下,可以进行强大的优化:Spark 可以将过滤器推送到 Parquet,而 Parquet 可以根据其元数据对其进行评估。因此,Spark 和 Parquet 可以完全跳过对数据执行 I/O,从而显着减少工作负载并提高性能。准确地说,Spark/Parquet 仍然需要访问使表读取元数据的所有文件,但这比读取数据要快几个数量级。通过比较查询 (4) 和基线查询 (1) 的执行指标,您可以看到:
•Total Time Across All Tasks: 1.0 min
•Locality Level Summary: Node local: 1675
•Input Size / Records: 16.2 MB / 0
•Duration: 3 s
执行计划显示过滤器确实下推到了Parquet:
5.2 谓词下推的例外(无法下推)
继续回到 query(1)
// query(1), baseline with full scan of store_sales
spark.sql("select * from store_sales where ss_sales_price=-1.0").collect()
如果比较query(1) 和query(4),会发现它们看起来很相似,但它们的性能却大不相同,为什么?
这是因为 Parquet 中不是所有数据类型都可以进行谓词下推,特别是对于当前版本的 Spark+Parquet(即 Parquet 版本 1.8.2,对于此处报告的 Spark 2.2.0 RC 测试)列上的谓词 DECIMAL 类型的值不会被下推,而 INT(整数)值会被下推(另见 PARQUET-281[4]),参考query(1) 在 ss_sales_price 上有一个类型为 decimal(7,2) 的过滤器,而query(4) 在 ss_quantity 上有一个 INT 类型的谓词。
关于执行计划的注意事项:query(1) 的 Spark 物理执行计划报告谓词"ss_sales_price=-1.0"实际上与query(2) 中看到的类似,这可能会产生误导,因为 Parquet 实际上不能够下推这个值。性能指标、已用时间和读取的数据量确认在query(1) 的情况下对表进行了全面扫描。
另一个重要的点是只有使用某些运算符的谓词才能作为过滤器下推到 Parquet,在query(4) 的示例中可以看到一个带有相等谓词的过滤被下推。
其他可以下推的运算符是"<、<=、>、>="。可以在源代码中找到有关 Spark 可以作为 Parquet 过滤器下推的数据类型和运算符的更多详细信息。更多详细信息请访问 ParquetFilters.scala[5] 的相关源代码片段链接。
5.3 谓词下推性能提升小的示例
这是另一个示例,可用于了解有关过滤器下推的一些限制,执行query(5) Spark 可以将谓词下推到 Parquet 但最终结果只是数据读取的少量减少,因此对性能的影响很小。
// query (5), another example of predicate push down
// This time however with small gains in performance, due to data distribution and filter value
spark.sql("select * from store_sales where ss_ticket_number=348594569").collect()
结果如下
•Total Time Across All Tasks: 50 min
•Locality Level Summary: Node local: 1675
•Input Size / Records: 173.7 GB / 4007845204
•Duration: 1.1 min
执行计划如下
了解query(4)和query(5)之间的不同行为很有用,原因在于数据:对于大多数表分区,用于过滤器"ss_ticket_number=348594569"的值是在 ss_ticket_number 的最小值和最大值之间的范围内,更准确地说Parquet 存储可用于谓词下推的元数据的粒度称为“行组RowGroup”,它是 Parquet 文件的子集。
由于数据的性质和过滤谓词的值,Parquet 发现大多数行组的过滤值都在最小值到最大值的范围内,因此最终会读取本示例中的大部分表格,对于某些分区进行谓词下推,实际读取的数据量略低于本示例中的全表扫描值,query(5) 中的 173 GB 与query(1) 中的 185 GB。
这个例子的主要目的是说明谓词下推并不能保证所有情况下都会改善性能,由于谓词下推而导致的 I/O 减少的实际收益取决于数据分布和谓词值。
可以预期随着 Parquet 的成熟,更多功能将被添加到谓词推送中,PARQUET-384将基于字典的过滤添加到 Filter2 API[6]的示例,该改进计划在Parquet 1.9.0发布。
Spark 中默认启用 Parquet 下推,如果想进一步验证,可以使用以下参数来开启功能spark.sql.parquet.filterPushdown=<true|false>
总之谓词下推是 Spark 和 Parquet 的一项功能,它可以通过减少基于 Parquet 中存储的元数据读取的数据量来帮助提高查询的性能,该元数据关于存储在列中的最小值和最大值以及每个行组的聚合,但是此功能有局限性,特别是它只能与 Parquet 和 Spark 中实现的某些数据类型和运算符一起使用。此外即使进行谓词下推,I/O 的实际减少和性能的相对增加也会有所不同,结果取决于提供的过滤器值和源表中的数据分布。
6. 使用诊断工具深入了解 Parquet 元数据
上一节中关于谓词下推的讨论触及了有关 Parquet 内部结构,在此阶段将探索 Parquet 文件的内部结构及其元数据结构,以进一步了解查询的性能,一个很好的资源是 https://parquet.apache.org/documentation/latest 上的文档
此外Parquet 源代码以代码注释的形式提供了许多其他详细信息,参见示例:ParquetOutputFormat.java[7]
另外有关 Parquet 内部结构的一些要点如下
•从层次上讲,Parquet 文件由一个或多个"行组RowGroup"组成,行组包含分组的数据"列块ColumnChunk",每列一个,每个列块包含一个或多个页面Page。•Parquet 文件有几个元数据结构,其中包含模式、列列表和存储数据的详细信息,例如列的名称和数据类型、它们的大小、记录数和基本统计信息作为最小值和最大值(对于支持此功能的数据类型,如上一节所述)。•Parquet 可以使用压缩和编码,用户可以选择使用的压缩算法(如果有)。默认情况下,Spark 使用 snappy。•Parquet 可以存储复杂的数据类型并支持嵌套结构,这是一个非常强大的功能,本文中介绍的简单示例并未涵盖。
有一些工具和实用程序可帮助了解 Parquet 文件内部结构,使用此类工具测试 Parquet 数据帮助我更好地了解 Parquet 的工作原理。
6.1 Parquet-tools
Parquet-tools 是主要 Apache Parquet 存储库的一部分,可以从 https://github.com/apache/parquet-mr/releases 下载。
此处描述的测试基于 2017 年 1 月发布的 Parquet 1.8.2 版。注意:Parquet 1.9.0 版也自 2016 年 10 月起发布,但 Spark 至少直到 Spark 2.2.0 版才使用它。
可以使用以下命令构建和打包 parquet-tools 的 jar:
cd parquet-mr-apache-parquet-1.8.2/parquet-tools
mvn package
可以使用 parquet-tools检查 HDFS 上 Parquet 文件的元数据:"hadoop jarmeta "。除了"meta"之外,parquet-tools 可用的其他命令包括:cat、head、schema、meta、dump,只需运行带有 -h 选项的 parquet-tools 即可查看语法。
$ echo "read metadata from a Parquet file in HDFS"
$ hadoop jar parquet-mr-apache-parquet-1.8.2/parquet-tools/target/parquet-tools-1.8.2.jar meta TPCDS/tpcds_1500/store_sales/ss_sold_date_sk=2452621/part-00077-57653b27-17f1-4069-85f2-7d7adf7ab7df.snappy.parquet
输出列出了文件元数据的几个细节:
文件路径、用于写入文件的Parquet版本(本例中为 1.8.2)、附加信息(本例中为 Spark Row 元数据):
file: hdfs://XXX.XXX.XXX/user/YYY/TPCDS/tpcds_1500/store_sales/ss_sold_date_sk=2452621/part-00077-57653b27-17f1-4069-85f2-7d7adf7ab7df.snappy.parquet
creator: parquet-mr version 1.8.2 (build c6522788629e590a53eb79874b95f6c3ff11f16c)
extra: org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields":[{"name":"ss_sold_time_sk","type":"integer","nullable":true,"metadata":{}},{"name":"ss_item_sk","type":"integer","nullable":true,"metadata":{}},
...omitted in the interest of space...
{"name":"ss_net_profit","type":"decimal(7,2)","nullable":true,"metadata":{}}]}
另外关于模式的元数据:
file schema: spark_schema
--------------------------------------------------------------
ss_sold_time_sk: OPTIONAL INT32 R:0 D:1
ss_item_sk: OPTIONAL INT32 R:0 D:1
ss_customer_sk: OPTIONAL INT32 R:0 D:1
ss_cdemo_sk: OPTIONAL INT32 R:0 D:1
ss_hdemo_sk: OPTIONAL INT32 R:0 D:1
ss_addr_sk: OPTIONAL INT32 R:0 D:1
ss_store_sk: OPTIONAL INT32 R:0 D:1
ss_promo_sk: OPTIONAL INT32 R:0 D:1
ss_ticket_number: OPTIONAL INT32 R:0 D:1
ss_quantity: OPTIONAL INT32 R:0 D:1
ss_wholesale_cost: OPTIONAL INT32 O:DECIMAL R:0 D:1
ss_list_price: OPTIONAL INT32 O:DECIMAL R:0 D:1
ss_sales_price: OPTIONAL INT32 O:DECIMAL R:0 D:1
ss_ext_discount_amt: OPTIONAL INT32 O:DECIMAL R:0 D:1
ss_ext_sales_price: OPTIONAL INT32 O:DECIMAL R:0 D:1
ss_ext_wholesale_cost: OPTIONAL INT32 O:DECIMAL R:0 D:1
ss_ext_list_price: OPTIONAL INT32 O:DECIMAL R:0 D:1
ss_ext_tax: OPTIONAL INT32 O:DECIMAL R:0 D:1
ss_coupon_amt: OPTIONAL INT32 O:DECIMAL R:0 D:1
ss_net_paid: OPTIONAL INT32 O:DECIMAL R:0 D:1
ss_net_paid_inc_tax: OPTIONAL INT32 O:DECIMAL R:0 D:1
ss_net_profit: OPTIONAL INT32 O:DECIMAL R:0 D:1
有关行组的元数据:
注意:如果想进一步了解,还可以使用以下命令将输出页面级别信息,parquet-tools 命令"dump --disable-data"在感兴趣的 Parquet 文件上。
6.2 Parquet_reader
Parquet_reader 这是另一个实用程序,可以帮助浏览 Parquet 文件的内部结构和元数据,特别是 parquet-cpp 显示与 Parquet 列相关的统计信息,有助于理解谓词下推。Parquet_reader 是与 Parquet-cpp 项目一起发布的实用程序,可以从 https://github.com/apache/parquet-cpp/releases 下载,此处的测试是使用 2017 年 5 月发布的 1.1.0 版本。
提示:可以使用"cmake"或者"make"构建项目,之后可以在文件夹 build/latest 中找到实用程序 parquet_reader。
下面是如何使用 parquet_reader 浏览文件元数据的示例,该工具适用于文件系统数据,因此在运行之前需要将Parquet文件从 HDFS 复制到本地文件系统:
./parquet_reader --only-metadata part-00077-57653b27-17f1-4069-85f2-7d7adf7ab7df.snappy.parquet
文件元数据:与Parquet-tools情况类似,可以找到列及其数据类型的列表,但请注意未标识 DECIMAL 列。
File Name: part-00077-57653b27-17f1-4069-85f2-7d7adf7ab7df.snappy.parquet
Version: 0
Created By: parquet-mr version 1.8.2 (build c6522788629e590a53eb79874b95f6c3ff11f16c)
Total rows: 2840100
Number of RowGroups: 2
Number of Real Columns: 22
Number of Columns: 22
Number of Selected Columns: 22
Column 0: ss_sold_time_sk (INT32)
Column 1: ss_item_sk (INT32)
Column 2: ss_customer_sk (INT32)
Column 3: ss_cdemo_sk (INT32)
Column 4: ss_hdemo_sk (INT32)
Column 5: ss_addr_sk (INT32)
Column 6: ss_store_sk (INT32)
Column 7: ss_promo_sk (INT32)
Column 8: ss_ticket_number (INT32)
Column 9: ss_quantity (INT32)
Column 10: ss_wholesale_cost (INT32)
Column 11: ss_list_price (INT32)
Column 12: ss_sales_price (INT32)
Column 13: ss_ext_discount_amt (INT32)
Column 14: ss_ext_sales_price (INT32)
Column 15: ss_ext_wholesale_cost (INT32)
Column 16: ss_ext_list_price (INT32)
Column 17: ss_ext_tax (INT32)
Column 18: ss_coupon_amt (INT32)
Column 19: ss_net_paid (INT32)
Column 20: ss_net_paid_inc_tax (INT32)
Column 21: ss_net_profit (INT32)
行组元数据:这里是与第一个行组相关的元数据片段,它包含以字节为单位的总大小和行数。
--- Row Group 0 ---
--- Total Bytes 154947976 ---
Rows: 2840100---
列块元数据:与Parquet-tools类似,可以找到有关行数和压缩/未压缩大小的详细信息。此外parquet_reader 显示最小值和最大值的统计信息,还包含了空值的数量,而不同的值似乎为 0(未填充)。
Column 0
, Values: 2840100, Null Values: 66393, Distinct Values: 0
Max: 75599, Min: 28800
Compression: SNAPPY, Encodings: RLE PLAIN_DICTIONARY BIT_PACKED
Uncompressed Size: 5886233, Compressed Size: 2419027
Column 1
, Values: 2840100, Null Values: 0, Distinct Values: 0
Max: 32000, Min: 1
Compression: SNAPPY, Encodings: RLE PLAIN_DICTIONARY BIT_PACKED
Uncompressed Size: 5040503, Compressed Size: 5040853
Column 2
, Values: 2840100, Null Values: 66684, Distinct Values: 0
Max: 4599961, Min: 15
Compression: SNAPPY, Encodings: RLE PLAIN_DICTIONARY BIT_PACKED
Uncompressed Size: 7168827, Compressed Size: 4200678
...
值得注意的是,没有针对 DECIMAL 类型的列的统计信息,正如本文前面所讨论的,这对过滤/谓词下推有影响。
...
Column 10
, Values: 2840100 Statistics Not Set
Compression: SNAPPY, Encodings: RLE PLAIN_DICTIONARY BIT_PACKED
Uncompressed Size: 5113188, Compressed Size: 5036313
Column 11
, Values: 2840100 Statistics Not Set
Compression: SNAPPY, Encodings: RLE PLAIN_DICTIONARY BIT_PACKED
Uncompressed Size: 5500119, Compressed Size: 5422519
...
6.3 用于测量 Spark 指标的自定义工具:sparkMeasure
Spark 性能指标通过 WebUI 和 REST API[8] 查看,除此之外还可以使用小型定制开发工具sparkMeasure[9]来测量 Spark 任务指标和 SQL 指标,博客文章关于测量 Apache Spark 工作负载指标以进行性能故障排除[10]中对其进行了描述。
要使用 sparkMeasure,请下载 jar 或指向其在 Maven Central 上的坐标,示例如下
bin/spark-shell --packages ch.cern.sparkmeasure:spark-measure_2.11:0.11
这是如何使用 sparkMeasure[11] 测量 Spark SQL 指标的示例
val stageMetrics = ch.cern.sparkmeasure.StageMetrics(spark)
stageMetrics.runAndMeasure(spark.sql("select * from store_sales where ss_sales_price=-1").collect())
Scheduling mode = FIFO
Spark Context default degree of parallelism = 48
Aggregated Spark stage metrics:
numStages => 1
sum(numTasks) => 1675
elapsedTime => 75684 (1.3 min)
sum(stageDuration) => 75684 (1.3 min)
sum(executorRunTime) => 3525951 (59 min)
sum(executorCpuTime) => 1006093 (17 min)
sum(executorDeserializeTime) => 4410 (4 s)
sum(executorDeserializeCpuTime) => 2106 (2 s)
sum(resultSerializationTime) => 69 (69 ms)
sum(jvmGCTime) => 811933 (14 min)
sum(shuffleFetchWaitTime) => 0 (0 ms)
sum(shuffleWriteTime) => 0 (0 ms)
max(resultSize) => 2346124 (2.0 MB)
sum(numUpdatedBlockStatuses) => 48
sum(diskBytesSpilled) => 0 (0 Bytes)
sum(memoryBytesSpilled) => 0 (0 Bytes)
max(peakExecutionMemory) => 0
sum(recordsRead) => 4319943621
sum(bytesRead) => 198992404103 (185.0 GB)
sum(recordsWritten) => 0
sum(bytesWritten) => 0 (0 Bytes)
sum(shuffleTotalBytesRead) => 0 (0 Bytes)
sum(shuffleTotalBlocksFetched) => 0
sum(shuffleLocalBlocksFetched) => 0
sum(shuffleRemoteBlocksFetched) => 0
sum(shuffleBytesWritten) => 0 (0 Bytes)
sum(shuffleRecordsWritten) => 0
scala> stageMetrics.printAccumulables
Aggregated Spark accumulables of type internal.metric. Sum of values grouped by metric name
Name => sum(value) [group by name]
executorCpuTime => 1006093 (17 min)
executorDeserializeCpuTime => 2106 (2 s)
executorDeserializeTime => 4410 (4 s)
executorRunTime => 3525951 (59 min)
input.bytesRead => 198992404103 (185.0 GB)
input.recordsRead => 4319943621
jvmGCTime => 811933 (14 min)
resultSerializationTime => 69 (69 ms)
resultSize => 2346124 (2.0 MB)
SQL Metrics and other non-internal metrics. Values grouped per accumulatorId and metric name.
Accid, Name => max(value) [group by accId, name]
256, number of output rows => 4319943621
259, scan time total => 3327453 (55 min)
260, duration total => 3522020 (59 min)
7. 结论
Spark 和 Parquet 目前是许多分析平台的核心技术,这篇博文通过示例介绍了一些基本功能和工作负载,重点介绍了 Spark + Parquet 在处理大型分区表(数据仓库和分析的典型用例)时如何发挥作用,这篇文章特别介绍了分区发现、分区裁剪、数据压缩、列投影和过滤器/谓词下推。此外本文还展示了一些用于查看 Parquet 元数据的诊断工具示例(parquet-tools、parquet_reader)和测量 Spark 工作负载的工具(Spark WebUI 和自定义工具 sparkMeasure),这只是探索 Spark 和 Parquet 的开始,希望读者在自己的调查中收获更多。
网友评论