美文网首页工作生活
pyspark DataFrame笔记

pyspark DataFrame笔记

作者: AsdilFibrizo | 来源:发表于2019-07-04 15:30 被阅读0次
  • 相较于rdd,在数据挖掘中更常用的数据格式是DataFrame,由于Catalyst优化器的原因,DataFrame在python上并不比scala上慢多少
# 引入必要包
from pyspark.sql import SparkSession
from pyspark.sql import types
spark = SparkSession.builder.master("local").appName("learnsparkdf").enableHiveSupport().getOrCreate()
sc = spark.sparkContext

创建DataFrame

# 使用sc创建df
#方法一:通过json创建
stringJSONRDD = sc.parallelize([""" 
  { "id": "123",
    "name": "Katie",
    "age": 19,
    "eyeColor": "brown"
  }""",
   """{
    "id": "234",
    "name": "Michael",
    "age": 22,
    "eyeColor": "green"
  }""", 
  """{
    "id": "345",
    "name": "Simone",
    "age": 23,
    "eyeColor": "blue"
  }"""]
)
df = spark.read.json(stringJSONRDD)
df.show()
[out]: 
+---+--------+---+-------+
|age|eyeColor| id|   name|
+---+--------+---+-------+
| 19|   brown|123|  Katie|
| 22|   green|234|Michael|
| 23|    blue|345| Simone|
+---+--------+---+-------+
# 方法二,通过sc创建,通常要指定列名不然会变成[-1,-2]
list_rdd = sc.parallelize([('TOM', 23), ('JIM', 18), ('BOSE', 50), ('JAME',23), ('JAM')],4)
df2 = spark.createDataFrame(list_rdd)
df2.show()
print(df2.schema) # ***默认int数据类型为LongType
[out]:
+----+---+
|  _1| _2|
+----+---+
| TOM| 23|
| JIM| 18|
|BOSE| 50|
|JAME| 23|
+----+---+
StructType(List(StructField(_1,StringType,true),StructField(_2,LongType,true)))

# 需要注意的是整数位LongType,与FloatType和DoubleType不能隐式转换
schema = types.StructType([
  types.StructField('Name', types.StringType(), True), # 列名,数据类型,能否为空
  types.StructField('Age', types.ShortType(), True),
])
df2 = spark.createDataFrame(list_rdd, schema)
df2.show()
print(df2.schema)
[out]:
+----+---+
|Name|Age|
+----+---+
| TOM| 23|
| JIM| 18|
|BOSE| 50|
|JAME| 23|
+----+---+
StructType(List(StructField(Name,StringType,true),StructField(Age,ShortType,true)))

数据类型总共有一下类型
"DataType", "NullType", "StringType", "BinaryType", "BooleanType", "DateType",
"TimestampType", "DecimalType", "DoubleType", "FloatType", "ByteType", "IntegerType",
"LongType", "ShortType", "ArrayType", "MapType", "StructField", "StructType"

创建临时表

dataframe的操作通常会使用sql语句完成,下面有四个创建表的方法
#df.createGlobalTempView("tempViewName")           创建一个全局临时表,生命周期为程序的生命周期  **使用的时候 global_temp.tempViewName
#df.createOrReplaceGlobalTempView("tempViewName")  创建或者替换一个全局临时表,生命周期为程序的生命周期
#df.createOrReplaceTempView("tempViewName")        创建一个临时表,生命周期为当前SparkSession的生命周期
#df.createTempView("tempViewName")                 创建或者替换一个临时表,生命周期为当前SparkSession的生命周期

# 删除临时表
# spark.catalog.dropTempView("tempViewName")
# spark.catalog.dropGlobalTempView("tempViewName")

创建表之后,剩余的操作就和sql基本一样,一般来说sql操作都会返回一个新的dataframe

查看临时表

# 创建临时表,查看信息,
df2.createOrReplaceTempView('df')
spark.sql("select * from df").show()   #***注意返回的也是一个dataframe
[out]:
+----+---+
|Name|Age|
+----+---+
| TOM| 23|
| JIM| 18|
|BOSE| 50|
|JAME| 23|
+----+---+
[Row(Name='TOM', Age=23),
 Row(Name='JIM', Age=18),
 Row(Name='BOSE', Age=50),
 Row(Name='JAME', Age=23)]

# 取出数据后
ret= spark.sql("select * from df").collect() # 也可以直接取出来
ret[0]['name'] 类似于字典和列表综合,这两种方法都可以获取元素
ret[0][0]          类似于字典和列表综合,这两种方法都可以获取元素

数据结构

printSchema()函数
输出dataframe schema结构
若不指定dataframe结构,系统会自动推断数据类型
df.printSchema()
[out]:
root
 |-- age: long (nullable = true)
 |-- eyeColor: string (nullable = true)
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)

API操作

dataframe API查询
dataframe可以通过take(),show()展示结果
可以使用select filter选择过滤数据,还有很多函数
df2.select("*").filter("age>18").show()
[out]:
+----+---+
|Name|Age|
+----+---+
| TOM| 23|
|BOSE| 50|
|JAME| 23|
+----+---+

保存文件

# 保存文件
# 一般来说可以存储为csv,json,不过更常见的是使用parquet存储
# Parquet仅仅是一种存储格式,它是语言、平台无关的,并且不需要和任何一种数据处理框架绑定,目前能够和Parquet适配的组件包括下面这些,可以看出基本上通常使用的查询引擎和计算框架都已适配,并且可以很方便的将其它序列化工具生成的数据转换成Parquet格式
df.rdd.getNumPartitions() # 获取分区数目
df.write.parquet('/FileStore/tables/testpar', mode = 'overwrite') # 以parquet格式保存数据
df.toPandas().to_csv('/FileStore/tables/testpar2.csv')# 以csv格式保存数据

from sklearn.externals import joblib
joblib.dump(df.toJSON().collect(), '/FileStore/tables/testpar3.json') # 以json格式保存


# 保存后的/FileStore/tables/testpar是一个文件加,有多少个分区数目就有多少个文件***********
df2 = spark.read.parquet('/FileStore/tables/testpar')# 读取parquet格式文件

jupyter 代码

相关文章

网友评论

    本文标题:pyspark DataFrame笔记

    本文链接:https://www.haomeiwen.com/subject/aqhrhctx.html