美文网首页
1.pyspark.sql.SQLContext

1.pyspark.sql.SQLContext

作者: 丫丫iii | 来源:发表于2019-01-21 10:53 被阅读0次

    Spark SQL和DataFrames重要的类有:

    • pyspark.sql.SQLContext: DataFrame和SQL方法的主入口
    • pyspark.sql.DataFrame: 将分布式数据集分组到指定列名的数据框中
    • pyspark.sql.Column :DataFrame中的列
    • pyspark.sql.Row: DataFrame数据的行
    • pyspark.sql.HiveContext: 访问Hive数据的主入口
    • pyspark.sql.GroupedData: 由DataFrame.groupBy()创建的聚合方法集
    • pyspark.sql.DataFrameNaFunctions: 处理丢失数据(空数据)的方法
    • pyspark.sql.DataFrameStatFunctions: 统计功能的方法
      -pyspark.sql.functions DataFrame:可用的内置函数
    • pyspark.sql.types: 可用的数据类型列表
    • pyspark.sql.Window: 用于处理窗口函数

    1.pyspark.sql.SQLContext

    class pyspark.sql.SQLContext(sparkContext,sqlContext=None)
    """
    SQLContext可以用来创建DataFrame、注册DataFrame表、在表上执行SQL、读取parquet文件
    1.sparkContext:支持sqlContext的sparkContext
    2.sparkContext:等于None 会创建一个sparkContext实例,非None则调用传入的参数
    """
    
    1.1
    applySchema(rdd,schema):在1.3中已过时,使用createDataFrame()代替
    
    1.2
    cacheTable(tablename):缓存表到内存中
    
    1.3
    clearCache(tablename):从内存缓存中删除所有的缓存表
    
    1.4
    createDateFrame(data,schema=None,samplingRatio=None):从元组/列表RDD或者列表或者pandas.DataFrame创建DataFrame
    """
    1.当模式是列名的列表是,每个列的类型会从数据中推断出来。
    2.当模式没有时,将尝试从数据中推断模式(列名和类型),数据应该是行或者命名元组或者字典的RDD
    3.如果模式推是必要的,samplingRratio用来确定用于模式推理的行比率。如果没有samplingRatio,将使用第一行
    """
    """
    1.data:行或者元组或者列表或者字典的RDD、list、pandas.DataFrame
    2.schema:一个结构化类型或者列名列表,默认为空
    3.samplingRatio:用于推断的行样本比率
    
    【返回】dataFrame
    
    --------------------------------------------------------------------------------------------------------------------------------------------
    【栗子】:
    1.列表
    a = [('Alice', 1)]
    b = sqlContext.createDataFrame(a).collect()
    -----------b: [Row(_1='Alice', _2=1)]---------------------------
    b = sqlContext.createDataFrame(a,['name','age']).collect()
    -----------b: [Row(age=1,name=u'Alice')]------------------------
    
    2.字典
    a=[{'name':'Alice','age':1}]
    b=sqlContext.createDateFrame(d).collect()
    -----------b: [Row(age=1,name=u'Alice')]------------------------
    
    3.RDD
    rdd = sparkContext.parallelize(a)
    b = sqlContext.CreateDataFrame(rdd).collect()
    -----------b: [Row(_1='Alice', _2=1)]---------------------------
    b = sqlContext.CreateDataFrame(rdd,['name','age']).collect()
    -----------b: [Row(age=1,name=u'Alice')]------------------------
    
    4.pandas.DataFrame
    c = sqlContext.CreateDataFrame(b.toPandas()).collect()
    -----------c: [Row(_1='Alice', _2=1)]---------------------------
    c = sqlContext.createDataFrame(pandas.DataFrame([[1, 2]])).collect()  
    -----------c: [Row(0=1, 1=2)]------------------------
    """
    
    1.5
    createExternalTable(tablename,path=None,soruce=None,schema=None,**options):创建基于数据源中的数据的外部表
    """
    1.数据源由源和一组选项指定。如果为指定源,那么将使用由spark.sql.sources.default配置的默认的数据源配置,通常一个模式可以被提供作为返回的DataFrame的模式,然后创建外部表
    
    【返回】:DataFrame
    """
    
    1.6
    dropTempTable(tablename):从目录中删除临时表
    """
    sqlContext.registerDataFrameAsTable(b,'table1')
    sqlContext.dropTempTable('table1')
    """
    
    1.7
    getConf(key,defaultValue):返回指定键的sparksqk配置属性值
    
    1.8
    inferSchema(rdd,samplingRatio=None):在1.3中已过时,使用createDataFrame()代替。
    
    1.9
    jsonRDD(rdd,schema=None,samplingRatio=1.0):从一个已经存在的RDD中加载数据,这个RDD中的每个元素均为一个json字符串。如果提供了模式,将给定的模式应用到这个json数据集。否则他根据数据的采样比例来确定模式
    """
    json = sc.parallelize(
    """{"name":"jack","addr":{"city":"beijing","mail":"10001"}}""",
    """{"name":"jahn","addr":{"city":"shanghai","mail":"10002"}}""")
    df1 = sqlContext.jsonRDD(json).collect()
    -----df1:[Row(addr=Row(city=u'beijing', mail=u'10001'), name=u'jack'), Row(addr=Row(city=u'shanghai', mail=u'10002'), name=u'john')]------
    df2 = sqlContext.jsonRDD(json,df1.schema).collect()
    -----df2:[Row(addr=Row(city=u'beijing', mail=u'10001'), name=u'jack'), Row(addr=Row(city=u'shanghai', mail=u'10002'), name=u'john')]------
    """
    
    1.10
    jsonFile(path,schema=None,samplingRatio=1.0):从一个文本文件中加载数据,这个文件的每一行均为JSON字符转
    注:在1.4中已过时,使用DataFrameReader.json()代替。
    
    1.11
    load(path,source=None,schema=None,**options):返回数据源中数据集为DataFrame
    注:在1.4中已过时,使用DataFrameReader.load()代替。
    
    1.12
    newSession():返回一个新的SQLContext做为一个新的对话框,这个回话有单独的SQLConf,注册临时表和UDFs,但共享sparkcontext和缓存表
    
    1.13
    parquetFile(*paths):加载parquet文件,返回结果为DataFrame
    注:在1.4中已过时,使用DataFrameReader.parquet()代替。
    
    1.14
    range(start,end=None,step=1,numPartitions=None):创建只有一个名为id的长类型列的DataFrame,包含从开始到结束的按照一定步长的独立元素。
    1.start:开始值
    2.end:结束值
    3.step:增量值
    4.numPartitions:DataFrame分区数
    【返回】DataFrame
    
    sqlContext.range(1,7,2).collect()
    -----------[Row(id=1), Row(id=3), Row(id=5)]-----------
    sqlContext.range(3).collect()
    -----------[Row(id=0), Row(id=1), Row(id=2)]-----------
    
    1.15
    read(*paths):返回一个DataFrameReader,可用于读取数据为DataFrame.
    
    1.16
    registerDataFrameAsTable(DataFrame,tableName):注册给定的DataFrame作为目录中的临时表,临时表只在当前SQLContext实例有效期间存在。
    """
    sqlContext.registerDataFrameAsTable(b,'table1')
    """
    
    1.17
    registerFunction(name,f,returnType=StringType):注册python方法包括()
    """
    sqlContext.registerDataFrameAsTable(b,'table1')
    """
    
    1.18
    registerFunction(name,f,returnType=StringType):注册python方法(包括lambda方法),作为UDF,这样可以在SQL statements中使用。除了名称和函数本身之外,还可以选择性的指定返回类型。当返回类型没有指定时,默认自动转换为字符串
    1.name:UDF名称
    2.f:python方法
    3.returnTyoe:数据类型对象
    """
    sqlContext.registerFunction("stringLengthString",lambda s:len(x))
    sqlContext.sql("SELECT stringLengthString('test')").collect()
    -----------------[Row(_c0=u'4')]----------------------------------
    from pyspark.sql.types import IntergerType
    sqlContext.registerFunction("stringLengthInt",lambda x:len(x),IntegerType())
    -----------------[Row(_c0=4)]------------------------------------
    from pyspark.sql.types import IntegerType
    sqlContext.udf.register("sringLengthInt",lambda x:len(x),IntegerType())
    sqlContext.sql("SELECT stringLengthInt('test')").collect()
    -----------------[Row(_c0=4)]-----------------------------------------------
    """
    
    1.19
    setConf(key,value):设置给定的SparkSQL配置属性
    
    1.20
    sql(sqlQUery):返回DataFrame代表给定的查询结果
    1.sqlQuery:sql查询语句
    """
    a = [(1,'row1'),(2,'row2'),(3,'row3')]
    df = sqlContext.createDataFrame(a,['field1','field2'])
    sqlContext.registerDataFrameAsTable(df,'table1')
    df2 = sqlContext.sql('select field1 as f1, field2 as f2 from table2').collect()
    --------------------[Row(f1=1, f2=u'row1'), Row(f1=2, f2=u'row2'), Row(f1=3, f2=u'row3')]---
    """
    
    1.21
    table(tableName):返回指定表的DataFrame
    """
    a = [(1,'row1'),(2,'row2'),(3,'row3')]
    df = sqlContext.createDataFrame(a,['field1','field2'])
    sqlContext.registerDataFrameAsTable(df,'table1')
    df2 = sqlContext.table('table1')
    sorted(df.collect()) == sorted(df2.collect())
    --------------------True---
    """
    
    1.22
    tableName(dbName=None):返回数据库的表名称列表,dbName:数据库名称,默认为当前数据库
    """
    a = [(1,'row1'),(2,'row2'),(3,'row3')]
    df = sqlContext.createDataFrame(a,['field1','field2'])
    sqlContext.registerDataFrameAsTable(df,'table1')
    "table1" in sqlContext.tableNames()
    --------------------True---
    "table1" in sqlContext.tableNames("db")
    --------------------True---
    """
    
    1.23
    tables(dbName=None):从给定的数据库返回一个包含表名称的DataFrame,返回的DataFrame包含两列:表名称和是否临时表(一个BOOL类型的列,标识表是否为临时表)
    """
    a = [(1,'row1'),(2,'row2'),(3,'row3')]
    df = sqlContext.createDataFrame(a,['field1','field2'])
    sqlContext.registerDataFrameAsTable(df,'table1')
    df2 = sqlContext.tables()
    df2.filter("tableName = 'table1'").first()
    --------------------Row(tableName=u'table1', isTemporary=True)---
    """
    
    1.24
    udf:返回一个注册的UDF为UDFRegistration,
    
    1.25
    uncacheTable(tableName):从内存的缓存表中移除指定表
    
    1.25
    pyspark.sql.HiveContext(sparkContext,hiveContext=None):
    

    引用原文

    相关文章

      网友评论

          本文标题:1.pyspark.sql.SQLContext

          本文链接:https://www.haomeiwen.com/subject/qanqjqtx.html