美文网首页大数据开发
大数据开发:Spark框架Dataset API

大数据开发:Spark框架Dataset API

作者: 成都加米谷大数据 | 来源:发表于2021-06-07 17:53 被阅读0次

Spark作为大数据技术生态当中必学的一个框架,其重要程度早已不用特意去强调,在现有的大数据技术生态体系当中,Spark仍然是大数据开发工程师们需要掌握的重要技能。今天的大数据开发学习分享,我们就主要来讲讲Spark框架Dataset API。

Spark2.0将DataFrame和Dataset API统一为具有类似接口的结构化API,因此开发人员只需要学习一组API。数据集具有两个特性:类型化的和非类型化的API,如下图所示。

从概念上讲,我们可以将Scala中的DataFrame视为通用对象集合Dataset[Row]的别名,其中Row是通用非类型JVM对象,可能包含不同类型的字段。相比之下,Dataset是Scala中或Java中的强类型JVM对象的集合。或者,如Dataset文档所说,Dataset是:

特定域对象的强类型集合,可以使用函数或关系操作进行转换。Scala中的每个Dataset都有一个称为DataFrame的无类型视图,它是一个Dataset Row。

1、有类型的对象、非类型的对象和通用行

在Spark支持的语言中,数据集只在Java和Scala中有意义,而在Python和R中,只有DataFrame有意义。这是因为Python和R不是编译时类型安全的;类型是在执行期间动态推断或分配的,而不是在编译时动态分配的。在Scala和Java中,情况正好相反:类型在编译时绑定到变量和对象。然而,在Scala中,DataFrame只是非类型Dataset[Row]的别名。

Row是Spark中的通用对象类型,它包含可以使用索引访问的混合类型的集合。在内部,Spark会操作Row对象,并将它们转换为相应的等效类型。例如,对于Scala或Java和Python,Row中的一个整数字段将分别映射或转换为整数类型:

// In Scala

import org.apache.spark.sql.Row

val row = Row(350, true, "Learning Spark 2E", null)

# In Python

from pyspark.sql import Row

row = Row(350, True, "Learning Spark 2E", None)

使用行对象的索引,可以使用公共getter方法访问各个字段:

// In Scala

row.getInt(0)

res23: Int = 350

row.getBoolean(1)

res24: Boolean = true

row.getString(2)

res25: String = Learning Spark 2E

# In Python

row[0]

Out[13]: 350

row[1]

Out[14]: True

row[2]

Out[15]: 'Learning Spark 2E'

相比之下,类型化对象是JVM中实际的Java或Scala类对象。数据集中的每个元素都映射到一个JVM对象。

2、创建DataSet

与从数据源创建DataFrame一样,在创建数据集时,你必须知道schema。换句话说,你需要了解数据类型。尽管使用JSON和CSV数据可以推断出schema,但对于大型数据集,这是资源密集型的(成本昂贵),非常消耗资源。

在Scala中创建数据集时,为结果数据集指定schema最简单的方法是使用样例类(Case classes)。在Java中,使用JavaBean类(我们在第6章中进一步讨论JavaBean和Scala样例类)。

Scala: 样例类(Case classes)

当你希望将自己的域中特定的对象实例化为数据集时,你可以通过在Scala中定义一个样例类来实例化。作为一个例子,让我们查看JSON文件中从物联网设备读取的集合。

我们的文件有几行JSON字符串,外观如下:

{"device_id": 198164, "device_name": "sensor-pad-198164owomcJZ", "ip":

"80.55.20.25", "cca2": "PL", "cca3": "POL", "cn": "Poland", "latitude":

53.080000, "longitude": 18.620000, "scale": "Celsius", "temp": 21,

"humidity": 65, "battery_level": 8, "c02_level": 1408,"lcd": "red",

"timestamp" :1458081226051}

要将每个JSON条目表示为DeviceIoTData,一种特定领域的对象,我们可以定义一个Scala样例类:

case class DeviceIoTData (battery_level: Long, c02_level: Long,

cca2: String, cca3: String, cn: String, device_id: Long,

device_name: String, humidity: Long, ip: String, latitude: Double,

lcd: String, longitude: Double, scale:String, temp: Long,

timestamp: Long)

一旦定义,我们可以使用它读取文件并将返回的内容Dataset[Row]转换为Dataset[DeviceIoTData](输出被截断以适合页面):

// In Scala

val ds = spark.read

.json("/databricks-datasets/learning-spark-v2/iot-devices/iot_devices.json")

.as[DeviceIoTData]

ds: org.apache.spark.sql.Dataset[DeviceIoTData] = [battery_level...]

ds.show(5, false)

+-------------|---------|----|----|-------------|---------|---+

|battery_level|c02_level|cca2|cca3|cn |device_id|...|

+-------------|---------|----|----|-------------|---------|---+

|8 |868 |US |USA |United States|1 |...|

|7 |1473 |NO |NOR |Norway |2 |...|

|2 |1556 |IT |ITA |Italy |3 |...|

|6 |1080 |US |USA |United States|4 |...|

|4 |931 |PH |PHL |Philippines |5 |...|

+-------------|---------|----|----|-------------|---------|---+

only showing top 5 rows

3、Dataset操作

就像你可以在DataFrame上执行转换和操作一样,你也可以使用数据集。根据操作类型的不同,操作结果将会有所不同:

// In Scala

val filterTempDS = ds.filter({d => {d.temp > 30 && d.humidity > 70})

filterTempDS: org.apache.spark.sql.Dataset[DeviceIoTData] = [battery_level...]

filterTempDS.show(5, false)

+-------------|---------|----|----|-------------|---------|---+

|battery_level|c02_level|cca2|cca3|cn |device_id|...|

+-------------|---------|----|----|-------------|---------|---+

|0 |1466 |US |USA |United States|17 |...|

|9 |986 |FR |FRA |France |48 |...|

|8 |1436 |US |USA |United States|54 |...|

|4 |1090 |US |USA |United States|63 |...|

|4 |1072 |PH |PHL |Philippines |81 |...|

+-------------|---------|----|----|-------------|---------|---+

only showing top 5 rows

在此查询中,我们使用一个函数作为数据集方法filter()的参数。这是一个具有很多签名的重载方法。我们使用的版本采用filter(func: (T) > Boolean): Dataset[T] lambda函数func: (T) > Boolean作为参数。

lambda函数的参数是类型为DeviceIoTData的JVM对象。这样,我们可以使用点(.)表示法访问其各个数据字段,就像在Scala类或JavaBean中一样。

另一件需要注意的事情是,对于DataFrame,你将filter()条件表示为类似SQL的DSL操作,这些操作是与语言无关的。对于数据集,我们利用原生语言的表达式作为Scala或Java代码。

总的来说,我们可以在数据集上执行filter(),map(),groupBy(),select(),take()这些操作,与DataFrame上的操作相似。在某种程度上,数据集与RDD相似,因为它们提供了与上述方法类似的接口以及编译时安全性,但具有更容易读取和面向对象的编程接口。

当我们使用数据集时,底层的Spark SQL引擎会处理JVM对象的创建、控制版本、序列化和反序列化。它还借助数据集编码器来处理Java外堆内存管理。

关于大数据开发学习,Spark框架Dataset API,以上就为大家做了简单的介绍了。Spark在大数据学习当中,是非常重要的一块,而API编程,是必须要掌握的重要基础,建议多家练习。

相关文章

网友评论

    本文标题:大数据开发:Spark框架Dataset API

    本文链接:https://www.haomeiwen.com/subject/rctreltx.html