Select操作
在PySpark中,select()函数是用来从DataFrame结构中选择一个或多个列,同样可以选择嵌套的列。select()在PySpark中是一个transformation函数,它返回一个包含指定列的新的DataFrame。
首先,我们先创建一个DataFrame。
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
data = [("James","Smith","USA","CA"),
("Michael","Rose","USA","NY"),
("Robert","Williams","USA","CA"),
("Maria","Jones","USA","FL")
]
columns = ["firstname","lastname","country","state"]
df = spark.createDataFrame(data = data, schema = columns)
df.show(truncate=False)
输出如下:
选取单列
我们可以通过给select()函数传入列的名称来筛选指定的列。由于DataFrame是不可变的,因此此函数会返回一个包含选定列的新DataFrame。
df.select("lastname").show()
结果:
选取多行
如果要选取多列的话,只需要把每个列的名称传入select()函数即可。
df.select("firstname", "lastname").show()
结果:
除此之外,还可以使用DataFrame对象名称来筛选:
df.select(df.firstname, df.lastname).show()
也可以使用col函数来指定列:
from pyspark.sql.functions import col
df.select(col("firstname"), col("lastname")).show()
结果与上面是一样的。
选取嵌套的列
我们先创建一个带有嵌套列结构的DataFrame,如果你不清楚怎么创建,可以先看下这篇文章。
data = [
(("James",None,"Smith"),"OH","M"),
(("Anna","Rose",""),"NY","F"),
(("Julia","","Williams"),"OH","F"),
(("Maria","Anne","Jones"),"NY","M"),
(("Jen","Mary","Brown"),"NY","M"),
(("Mike","Mary","Williams"),"OH","M")
]
from pyspark.sql.types import StructType,StructField, StringType
schema = StructType([
StructField('name', StructType([
StructField('firstname', StringType(), True),
StructField('middlename', StringType(), True),
StructField('lastname', StringType(), True)
])),
StructField('state', StringType(), True),
StructField('gender', StringType(), True)
])
df2 = spark.createDataFrame(data = data, schema = schema)
df2.printSchema()
df2.show(truncate=False) # shows all columns
输出:
在第一列“name”中,又包含了3个子列“firstname”、“middlename”、“lastname”。我们先选择"name"列,看一下输出:
df2.select("name").show(truncate=False)
结果:
可以看到,它直接显示出了第一列,那么如果我们想选取嵌套在第一列中的某一列,可以使用以下方式:
df2.select("name.firstname","name.lastname").show(truncate=False)
输出:
当然也可以获取嵌套在第一列中的所有列:
df2.select("name.*").show(truncate=False)
输出:
Collect操作
PySpark的collect()操作是用来将所有结点中的数据收集到驱动结点上(PySpark基于分布式架构)。因此collect()操作一般用于小型数据及上,在大型数据及上使用可能会导致内存不足。
还是使用前一节定义的数据:
df.show()
结果:
使用collect()函数来收集数据:
datacollect = df.collect()
print(type(datacollect))
print(datacollect)
结果:
可以看到collect()返回的是一个列表,列表的元素是Row类型,一个Row对象代表的就是DataFrame中的一行。
对于Row对象,我们可以使用列名称来访问其中的数据,如下:
for row in datacollect:
print(row['firstname'] + ': ' + row['state'])
结果:
Select() VS Collect()
- select()返回一个包含指定列的新的DataFrame,而collect()以列表形式返回整个数据集。
- select()是一个transformation操作,而collect()是一个action操作。
网友评论