Spark SQL和DataFrames重要的类有:
- pyspark.sql.SQLContext: DataFrame和SQL方法的主入口
- pyspark.sql.DataFrame: 将分布式数据集分组到指定列名的数据框中
- pyspark.sql.Column :DataFrame中的列
- pyspark.sql.Row: DataFrame数据的行
- pyspark.sql.HiveContext: 访问Hive数据的主入口
- pyspark.sql.GroupedData: 由DataFrame.groupBy()创建的聚合方法集
- pyspark.sql.DataFrameNaFunctions: 处理丢失数据(空数据)的方法
- pyspark.sql.DataFrameStatFunctions: 统计功能的方法
-pyspark.sql.functions DataFrame:可用的内置函数
- pyspark.sql.types: 可用的数据类型列表
- pyspark.sql.Window: 用于处理窗口函数
1.pyspark.sql.SQLContext
class pyspark.sql.SQLContext(sparkContext,sqlContext=None)
"""
SQLContext可以用来创建DataFrame、注册DataFrame表、在表上执行SQL、读取parquet文件
1.sparkContext:支持sqlContext的sparkContext
2.sparkContext:等于None 会创建一个sparkContext实例,非None则调用传入的参数
"""
1.1
applySchema(rdd,schema):在1.3中已过时,使用createDataFrame()代替
1.2
cacheTable(tablename):缓存表到内存中
1.3
clearCache(tablename):从内存缓存中删除所有的缓存表
1.4
createDateFrame(data,schema=None,samplingRatio=None):从元组/列表RDD或者列表或者pandas.DataFrame创建DataFrame
"""
1.当模式是列名的列表是,每个列的类型会从数据中推断出来。
2.当模式没有时,将尝试从数据中推断模式(列名和类型),数据应该是行或者命名元组或者字典的RDD
3.如果模式推是必要的,samplingRratio用来确定用于模式推理的行比率。如果没有samplingRatio,将使用第一行
"""
"""
1.data:行或者元组或者列表或者字典的RDD、list、pandas.DataFrame
2.schema:一个结构化类型或者列名列表,默认为空
3.samplingRatio:用于推断的行样本比率
【返回】dataFrame
--------------------------------------------------------------------------------------------------------------------------------------------
【栗子】:
1.列表
a = [('Alice', 1)]
b = sqlContext.createDataFrame(a).collect()
-----------b: [Row(_1='Alice', _2=1)]---------------------------
b = sqlContext.createDataFrame(a,['name','age']).collect()
-----------b: [Row(age=1,name=u'Alice')]------------------------
2.字典
a=[{'name':'Alice','age':1}]
b=sqlContext.createDateFrame(d).collect()
-----------b: [Row(age=1,name=u'Alice')]------------------------
3.RDD
rdd = sparkContext.parallelize(a)
b = sqlContext.CreateDataFrame(rdd).collect()
-----------b: [Row(_1='Alice', _2=1)]---------------------------
b = sqlContext.CreateDataFrame(rdd,['name','age']).collect()
-----------b: [Row(age=1,name=u'Alice')]------------------------
4.pandas.DataFrame
c = sqlContext.CreateDataFrame(b.toPandas()).collect()
-----------c: [Row(_1='Alice', _2=1)]---------------------------
c = sqlContext.createDataFrame(pandas.DataFrame([[1, 2]])).collect()
-----------c: [Row(0=1, 1=2)]------------------------
"""
1.5
createExternalTable(tablename,path=None,soruce=None,schema=None,**options):创建基于数据源中的数据的外部表
"""
1.数据源由源和一组选项指定。如果为指定源,那么将使用由spark.sql.sources.default配置的默认的数据源配置,通常一个模式可以被提供作为返回的DataFrame的模式,然后创建外部表
【返回】:DataFrame
"""
1.6
dropTempTable(tablename):从目录中删除临时表
"""
sqlContext.registerDataFrameAsTable(b,'table1')
sqlContext.dropTempTable('table1')
"""
1.7
getConf(key,defaultValue):返回指定键的sparksqk配置属性值
1.8
inferSchema(rdd,samplingRatio=None):在1.3中已过时,使用createDataFrame()代替。
1.9
jsonRDD(rdd,schema=None,samplingRatio=1.0):从一个已经存在的RDD中加载数据,这个RDD中的每个元素均为一个json字符串。如果提供了模式,将给定的模式应用到这个json数据集。否则他根据数据的采样比例来确定模式
"""
json = sc.parallelize(
"""{"name":"jack","addr":{"city":"beijing","mail":"10001"}}""",
"""{"name":"jahn","addr":{"city":"shanghai","mail":"10002"}}""")
df1 = sqlContext.jsonRDD(json).collect()
-----df1:[Row(addr=Row(city=u'beijing', mail=u'10001'), name=u'jack'), Row(addr=Row(city=u'shanghai', mail=u'10002'), name=u'john')]------
df2 = sqlContext.jsonRDD(json,df1.schema).collect()
-----df2:[Row(addr=Row(city=u'beijing', mail=u'10001'), name=u'jack'), Row(addr=Row(city=u'shanghai', mail=u'10002'), name=u'john')]------
"""
1.10
jsonFile(path,schema=None,samplingRatio=1.0):从一个文本文件中加载数据,这个文件的每一行均为JSON字符转
注:在1.4中已过时,使用DataFrameReader.json()代替。
1.11
load(path,source=None,schema=None,**options):返回数据源中数据集为DataFrame
注:在1.4中已过时,使用DataFrameReader.load()代替。
1.12
newSession():返回一个新的SQLContext做为一个新的对话框,这个回话有单独的SQLConf,注册临时表和UDFs,但共享sparkcontext和缓存表
1.13
parquetFile(*paths):加载parquet文件,返回结果为DataFrame
注:在1.4中已过时,使用DataFrameReader.parquet()代替。
1.14
range(start,end=None,step=1,numPartitions=None):创建只有一个名为id的长类型列的DataFrame,包含从开始到结束的按照一定步长的独立元素。
1.start:开始值
2.end:结束值
3.step:增量值
4.numPartitions:DataFrame分区数
【返回】DataFrame
sqlContext.range(1,7,2).collect()
-----------[Row(id=1), Row(id=3), Row(id=5)]-----------
sqlContext.range(3).collect()
-----------[Row(id=0), Row(id=1), Row(id=2)]-----------
1.15
read(*paths):返回一个DataFrameReader,可用于读取数据为DataFrame.
1.16
registerDataFrameAsTable(DataFrame,tableName):注册给定的DataFrame作为目录中的临时表,临时表只在当前SQLContext实例有效期间存在。
"""
sqlContext.registerDataFrameAsTable(b,'table1')
"""
1.17
registerFunction(name,f,returnType=StringType):注册python方法包括()
"""
sqlContext.registerDataFrameAsTable(b,'table1')
"""
1.18
registerFunction(name,f,returnType=StringType):注册python方法(包括lambda方法),作为UDF,这样可以在SQL statements中使用。除了名称和函数本身之外,还可以选择性的指定返回类型。当返回类型没有指定时,默认自动转换为字符串
1.name:UDF名称
2.f:python方法
3.returnTyoe:数据类型对象
"""
sqlContext.registerFunction("stringLengthString",lambda s:len(x))
sqlContext.sql("SELECT stringLengthString('test')").collect()
-----------------[Row(_c0=u'4')]----------------------------------
from pyspark.sql.types import IntergerType
sqlContext.registerFunction("stringLengthInt",lambda x:len(x),IntegerType())
-----------------[Row(_c0=4)]------------------------------------
from pyspark.sql.types import IntegerType
sqlContext.udf.register("sringLengthInt",lambda x:len(x),IntegerType())
sqlContext.sql("SELECT stringLengthInt('test')").collect()
-----------------[Row(_c0=4)]-----------------------------------------------
"""
1.19
setConf(key,value):设置给定的SparkSQL配置属性
1.20
sql(sqlQUery):返回DataFrame代表给定的查询结果
1.sqlQuery:sql查询语句
"""
a = [(1,'row1'),(2,'row2'),(3,'row3')]
df = sqlContext.createDataFrame(a,['field1','field2'])
sqlContext.registerDataFrameAsTable(df,'table1')
df2 = sqlContext.sql('select field1 as f1, field2 as f2 from table2').collect()
--------------------[Row(f1=1, f2=u'row1'), Row(f1=2, f2=u'row2'), Row(f1=3, f2=u'row3')]---
"""
1.21
table(tableName):返回指定表的DataFrame
"""
a = [(1,'row1'),(2,'row2'),(3,'row3')]
df = sqlContext.createDataFrame(a,['field1','field2'])
sqlContext.registerDataFrameAsTable(df,'table1')
df2 = sqlContext.table('table1')
sorted(df.collect()) == sorted(df2.collect())
--------------------True---
"""
1.22
tableName(dbName=None):返回数据库的表名称列表,dbName:数据库名称,默认为当前数据库
"""
a = [(1,'row1'),(2,'row2'),(3,'row3')]
df = sqlContext.createDataFrame(a,['field1','field2'])
sqlContext.registerDataFrameAsTable(df,'table1')
"table1" in sqlContext.tableNames()
--------------------True---
"table1" in sqlContext.tableNames("db")
--------------------True---
"""
1.23
tables(dbName=None):从给定的数据库返回一个包含表名称的DataFrame,返回的DataFrame包含两列:表名称和是否临时表(一个BOOL类型的列,标识表是否为临时表)
"""
a = [(1,'row1'),(2,'row2'),(3,'row3')]
df = sqlContext.createDataFrame(a,['field1','field2'])
sqlContext.registerDataFrameAsTable(df,'table1')
df2 = sqlContext.tables()
df2.filter("tableName = 'table1'").first()
--------------------Row(tableName=u'table1', isTemporary=True)---
"""
1.24
udf:返回一个注册的UDF为UDFRegistration,
1.25
uncacheTable(tableName):从内存的缓存表中移除指定表
1.25
pyspark.sql.HiveContext(sparkContext,hiveContext=None):
引用原文
网友评论