美文网首页
中文文档 pyspark.sql.DataFrame

中文文档 pyspark.sql.DataFrame

作者: cassie_xs | 来源:发表于2020-03-28 14:36 被阅读0次

    class pyspark.sql.DataFrame(jdf, sql_ctx)

    分布式的收集数据分组到命名列中。

    一个DataFrame相当于在Spark SQL中一个相关的表,可在SQLContext使用各种方法创建,

    2.1 agg(*exprs)

    没有组的情况下聚集整个DataFrame (df.groupBy.agg()的简写)。

    >>>l=[('cassie',5),('beiwang',4),('xs',2)]

    >>>df = sqlContext.createDataFrame(l,['name','age'])

    >>>df.agg({"age": "max"}).collect()[Row(max(age)=5)]

    >>>from pyspark.sql importfunctions as F

    >>> df.agg(F.min(df.age)).collect()

    [Row(min(age)=2)]

    2.2 alias(alias)

    In [57]: l = [('cassie',2), ('beiwang',3)]

    In [58]: df = sqlContext.createDataFrame(l,['name', 'age'])

    In [59]: from pyspark.sql.functions import *

    In [60]: df1 = df.alias('df1')

    In [61]: df2 = df.alias('df2')

    In [62]: join_df = df1.join(df2, col("df1.name")==col("df2.name"), 'inner')

    In [63]: join_df.select("df1.name")

    Out[63]: DataFrame[name: string]

    In [64]: join_df.select(col("df1.name"))

    Out[64]: DataFrame[name: string]

    In [65]: join_df.select(col("df1.name")).collect()

    Out[65]: [Row(name=u'beiwang'), Row(name=u'cassie')]

    In [66]: join_df.select("df1.name").collect()

    Out[66]: [Row(name=u'beiwang'), Row(name=u'cassie')]

    2.3 cache()

    用默认的存储级别缓存数据(MEMORY_ONLY_SER).

    2.4 coalesce(numPartitions)

    返回一个有确切的分区数的分区的新的DataFrame。

    与在一个RDD上定义的合并类似, 这个操作产生一个窄依赖。 如果从1000个分区到100个分区,不会有shuffle过程, 而是每100个新分区会需要当前分区的10个。

    2.5 collect()

    返回所有的记录数为行的列表。

    >>> df.collect()

    [Row(name=u'cassie', age=2), Row(name=u'beiwang', age=3)]

    2.6 columns

    返回所有列名的列表。

    >>> df.columns

    ['age','name']

    2.7 corr(col1, col2, method=None)

    计算一个DataFrame相关的两列为double值。通常只支持皮尔森相关系数。DataFrame.corr()和DataFrameStatFunctions.corr()类似。

    参数:●  col1 – 第一列的名称

        ●  col2 – 第二列的名称

        ●  method – 相关方法.当前只支持皮尔森相关系数

    df.stat.corr('age','hobby')

    2.8 count()

    返回DataFrame的行数。

    >>> df.count()2

    2.9 cov(col1, col2)

    计算由列名指定列的样本协方差为double值。DataFrame.cov()和DataFrameStatFunctions.cov()类似。

    参数:●  col1 – 第一列的名称

    ●  col2 – 第二列的名称

    df.stat.cov('hobby','age')

    2.10 crosstab(col1, col2)

    计算给定列的分组频数表,也称为相关表。每一列的去重值的个数应该小于1e4.最多返回1e6个非零对.每一行的第一列会是col1的去重值,列名称是col2的去重值。第一列的名称是$col1_$col2. 没有出现的配对将以零作为计数。DataFrame.crosstab() and DataFrameStatFunctions.crosstab()类似。

    参数:●  col1 – 第一列的名称. 去重项作为每行的第一项。

    ●  col2 – 第二列的名称. 去重项作为DataFrame的列名称。

    df.stat.crosstab("hobby", "age").show()

    +---------+---+---+

    |hobby_age|  2|  3|

    +---------+---+---+

    |        5|  0|  1|

    |      10|  1|  0|

    +---------+---+---+

    2.11 cube(*cols)

    创建使用指定列的当前DataFrame的多维立方体,这样可以聚合这些数据。

     df.cube('hobby', df.age).count().show()

    +-----+----+-----+

    |hobby| age|count|

    +-----+----+-----+

    |    5|null|    1|

    | null|null|    2|

    |  10|null|    1|

    |  10|  2|    1|

    | null|  2|    1|

    |    5|  3|    1|

    | null|  3|    1|

    +-----+----+-----+

    2.12 describe(*cols)

    计算数值列的统计信息。

    包括计数,平均,标准差,最小和最大。如果没有指定任何列,这个函数计算统计所有数值列。

    2.13 distinct()

    返回行去重的新的DataFrame。

    df.distinct().count()

    2.14 drop(col)

    返回删除指定列的新的DataFrame

    df.drop('hobby').collect()

    2.15 dropDuplicates(subset=None)

    返回去掉重复行的一个新的DataFrame,通常只考虑某几列。

    drop_duplicates()和dropDuplicates()类似。

    df.dropDuplicates().show()

    >>>df.dropDuplicates(['name','height']).show()

    2.16 drop_duplicates(subset=None)

    与以上相同。

    2.17 dropna(how='any', thresh=None, subset=None)

    返回一个删除null值行的新的DataFrame。dropna()和dataframenafunctions.drop()类似。

    参数:●  how – 'any'或者'all'。如果'any',删除包含任何空值的行。如果'all',删除所有值为null的行。

    ● thresh – int,默认为None,如果指定这个值,删除小于阈值的非空值的行。这个会重写'how'参数。

    ●  subset – 选择的列名称列表。

    df.na.drop().show()

    dfnew.na.drop(how='all',thresh=2).show()

    2.18 dtypes

    返回所有列名及类型的列表。

    >>> df.dtypes

    [('age','int'), ('name','string')]

    2.19 explain(extended=False)

    将(逻辑和物理)计划打印到控制台以进行调试。

    参数:●  extended – boolean类型,默认为False。如果为False,只打印物理计划。

    df.explain(True)

    2.20 fillna(value, subset=None)

    替换空值,和na.fill()类似,DataFrame.fillna()和dataframenafunctions.fill()类似。

    参数:●  value - 要代替空值的值有int,long,float,string或dict.如果值是字典,subset参数将被忽略。值必须是要替换的列的映射,替换值必须是int,long,float或者string.

    ●  subset - 要替换的列名列表。在subset指定的列,没有对应数据类型的会被忽略。例如,如果值是字符串,subset包含一个非字符串的列,这个非字符串的值会被忽略。

    dfnew.na.fill(50).show()

    dfnew.na.fill({'age': 50, 'name': 'unknown'}).show()

    2.21 filter(condition)

    用给定的条件过滤行。

    where()和filter()类似。

    参数:●  条件 - 一个列的bool类型或字符串的SQL表达式。

    df.where(df.age == 2).collect()

    df.filter(df.age == 2).collect()

    2.22 first()

    返回第一行。

    >>> df.first()

    Row(age=2, name=u'Alice')

    2.23 flatMap(f)

    返回在每行应用F函数后的新的RDD,然后将结果压扁。

    是df.rdd.flatMap()的简写。

    >>>df.rdd.flatMap(lambda p: p.name).collect()

    [u'A', u'l', u'i', u'c', u'e', u'B', u'o', u'b']

    2.24 foreach(f)

    应用f函数到DataFrame的所有行。

    是df.rdd.foreach()的简写。

    def f(person):

         print(person.name)

    >>> df.foreach(f)

    Alice

    2.25 foreachPartition(f)

    应用f函数到DataFrame的每一个分区。

    是 df.rdd.foreachPartition()的缩写。

    >>>def f(people):

    ...    forpersonin people:

    ...        print(person.name)>>> df.foreachPartition(f)

    2.26  freqItems(cols, support=None)

    参数:●  cols – 要计算重复项的列名,为字符串类型的列表或者元祖。

    ●  support – 要计算频率项的频率值。默认是1%。参数必须大于1e-4.

    df.stat.freqItems(['name']).collect()

    2.27 groupBy(*cols)

    使用指定的列分组DataFrame,这样可以聚合计算。可以从GroupedData查看所有可用的聚合方法。

    groupby()和groupBy()类似。

    参数:●  cols – 分组依据的列。每一项应该是一个字符串的列名或者列的表达式。

    df.groupBy(['name', df.age]).count().collect()

    2.28 groupby(*cols)

    和以上一致

    2.29 head(n=None)

    返回前n行

    参数:●  n – int类型,默认为1,要返回的行数。

    返回值: 如果n大于1,返回行列表,如果n为1,返回单独的一行。

    2.30 insertInto(tableName, overwrite=False)

    插入DataFrame内容到指定表。

    注:在1.4中已过时,使用DataFrameWriter.insertInto()代替。

    2.31 intersect(other)

    返回新的DataFrame,包含仅同时在当前框和另一个框的行。

    相当于SQL中的交集。

    df.intersect(df8)

    如果collect()和take()方法可以运行在本地(不需要Spark executors)那么返回True

    2.32 join(other, on=None, how=None)

    使用给定的关联表达式,关联另一个DataFrame。

    以下执行df1和df2之间完整的外连接。

    参数:● other – 连接的右侧

    ● on – 一个连接的列名称字符串, 列名称列表,一个连接表达式(列)或者列的列表。如果on参数是一个字符串或者字符串列表,表示连接列的名称,这些名称必须同时存在join的两个表中, 这样执行的是一个等价连接。

    ● how – 字符串,默认'inner'。inner,outer,left_outer,right_outer,leftsemi之一。

    df.join(df8,on='name',how='inner').show()

    2.33 limit(num)

    将结果计数限制为指定的数字。

    df.limit(1).collect()

    2.34 map(f)

    通过每行应用f函数返回新的RDD。

    是 df.rdd.map()的缩写。

    >>>df.rdd.map(lambda p: p.name).collect()

    2.35 mapPartitions(f, preservesPartitioning=False)

    通过每个分区应用f函数返回新的RDD

    是df.rdd.mapPartitions()的缩写。

    >>>rdd = sc.parallelize([1, 2, 3, 4], 4)

    >>>def f(iterator):yield 1

    >>> rdd.mapPartitions(f).sum() 4

    2.36 na

    返回DataFrameNaFunctions用于处理缺失值

    df.na.drop(how='all').show()

    2.37 orderBy(*cols, **kwargs)

    返回按照指定列排序的新的DataFrame。

    参数:● cols – 用来排序的列或列名称的列表。

    ● ascending – 布尔值或布尔值列表(默认 True). 升序排序与降序排序。指定多个排序顺序的列表。如果指定列表, 列表的长度必须等于列的长度。

    df.orderBy('age',ascending=False).show()

    df.orderBy(['age','height'],ascending=[1,0]).show()

    2.38 persist(storageLevel=StorageLevel(False, True, False, False, 1))

    设置存储级别以在第一次操作运行完成后保存其值。这只能用来分配新的存储级别,如果RDD没有设置存储级别的话。如果没有指定存储级别,默认为(memory_only_ser)。

    from pyspark import StorageLevel

    df.persist(StorageLevel.MEMORY_ONLY)

    print StorageLevel.MEMORY_ONLY

    2.39 printSchema()

    打印schema以树的格式

    df.printSchema()

    splits = rdd.randomSplit([0.7, 0.3],42)

     splits[0].collect(): [0, 1, 2, 3, 4, 5, 7, 9, 10, 12, 13, 14, 16, 18, 19]

    splits[1].collect(): [6, 8, 11, 15, 17]

    2.40 rdd:返回内容为行的RDD。

    2.41 registerAsTable(name): 在1.4中已过时,使用registerTempTable()代替。

    2.42 registerTempTable(name)

    使用给定的名字注册该RDD为临时表

    这个临时表的有效期与用来创建这个DataFrame的SQLContext相关

    df.registerTempTable("people")

     df2 = sqlContext.sql("select * from people")

     df2.show()

    +---+------+-----+

    |age|height| name|

    +---+------+-----+

    |  5|    80|Alice|

    |  5|    80|Alice|

    | 10|    80|Alice|

    +---+------+-----+

    2.43 repartition(numPartitions, *cols)

    按照给定的分区表达式分区,返回新的DataFrame。产生的DataFrame是哈希分区。

    numPartitions参数可以是一个整数来指定分区数,或者是一个列。如果是一个列,这个列会作为第一个分区列。如果没有指定,将使用默认的分区数。

    1.6版本修改: 添加可选参数可以指定分区列。如果分区列指定的话,numPartitions也是可选的。

    2.44 replace(to_replace, value, subset=None)

    返回用另外一个值替换了一个值的新的DataFrame。DataFrame.replace() 和 DataFrameNaFunctions.replace() 类似。

    参数:● to_replace – 整形,长整形,浮点型,字符串,或者列表。要替换的值。如果值是字典,那么值会被忽略,to_replace必须是一个从列名(字符串)到要替换的值的映射。要替换的值必须是一个整形,长整形,浮点型,或者字符串。

    ● value – 整形,长整形,浮点型,字符串或者列表。要替换为的值。要替换为的值必须是一个整形,长整形,浮点型,或者字符串。如果值是列表或者元组,值应该和to_replace有相同的长度。

    ● subset – 要考虑替换的列名的可选列表。在subset指定的列如果没有匹配的数据类型那么将被忽略。例如,如果值是字符串,并且subset参数包含一个非字符串的列,那么非字符串的列被忽略。

    l4=[('Alice',10,80),('Bob',5,None),('Tom',None,None),(None,None,None)]

    df4 = sqlContext.createDataFrame(l4,['name','age','height'])

     df4.replace(10, 20).show() #把 10替换20 加不加na都可以

    df4.na.replace(['Alice', 'Bob'], ['A', 'B'], 'name').show()+----+----+------+

    2.45 rollup(*cols)

    使用指定的列为当前的DataFrame创建一个多维汇总, 这样可以聚合这些数据。

    In [240]: l=[('Alice',2,80),('Bob',5,None)]

    In [241]: df = sqlContext.createDataFrame(l,['name','age','height'])

    In [242]: df.show()

    +-----+---+------+

    | name|age|height|

    +-----+---+------+

    |Alice|  2|    80|

    |  Bob|  5|  null|

    +-----+---+------+

    In [243]:  df.rollup('name', df.age).count().show()

    +-----+----+-----+

    | name| age|count|

    +-----+----+-----+

    | null|null|    2|

    |  Bob|  5|    1|

    |Alice|  2|    1|

    |  Bob|null|    1|

    |Alice|null|    1|

    +-----+----+-----+

    2.46 sample(withReplacement, fraction, seed=None)

    sample(是否放回, fraction, seed)

    withReplacement:true抽取放回,false抽取不放回。

    fraction:1)false抽取不放回的情况下,抽取的概率(0-1)。0-全不抽1-全抽2)true抽取放回的情况下,抽取的次数。seed:随机数种子。

    返回DataFrame的子集采样。

    >>>df.sample(False, 0.5, 42).count()

    2.47 sampleBy(col, fractions, seed=None)

    根据每个层次上给出的分数,返回没有替换的分层样本。

    返回没有替换的分层抽样 基于每层给定的一小部分 在给定的每层的片段

    参数:● col – 定义层的列

    ● fractions – 每层的抽样数。如果没有指定层, 将其数目视为0.

    ● seed – 随机数

    返回值: 返回代表分层样本的新的DataFrame

    sampled = dataset.sampleBy("key", fractions={0: 1, 1: 0}, seed=0) 表示key取的比例

    2.48 save(path=None, source=None, mode='error', **options)

    保存DataFrame的数据到数据源。

    注:在1.4中已过时,使用DataFrameWriter.save()代替。

    2.49 saveAsParquetFile(path)

    保存内容为一个Parquet文件,代表这个schema。

    注:在1.4中已过时,使用DataFrameWriter.parquet() 代替。

    2.50 saveAsTable(tableName, source=None, mode='error', **options)

    将此DataFrame的内容作为表保存到数据源。

    注:在1.4中已过时,使用DataFrameWriter.saveAsTable() 代替。

    2.51  schema

    返回DataFrame的schema为types.StructType。

    >>>l=[('Alice',2),('Bob',5)]>>>df = sqlContext.createDataFrame(l,['name','age'])>>> df.schema

    StructType(List(StructField(name,StringType,true),StructField(age,LongType,true)))

    2.52  select(*cols)

    提供一组表达式并返回一个新的DataFrame。

    参数:●cols – 列名(字符串)或表达式(列)列表。 如果其中一列的名称为“*”,那么该列将被扩展为包括当前DataFrame中的所有列。

    In [275]: df.select('name','age').show()

    +-----+---+

    | name|age|

    +-----+---+

    |Alice|  2|

    |  Bob|  5|

    +-----+---+

    2.53 show(n=20, truncate=True) 

    将前n行打印到控制台。

    参数:●n – 要显示的行数。

    ● truncate – 是否截断长字符串并对齐单元格。

    2.54 sort(*cols, **kwargs)

    返回按指定列排序的新DataFrame。

    参数:●cols – 要排序的列或列名称列表。

    ● ascending – 布尔值或布尔值列表(默认为True)。 排序升序降序。 指定多个排序顺序的列表。 如果指定了列表,列表的长度必须等于列的长度。

    df.sort(df.age.desc()).collect()

    2.55 sortWithinPartitions(*cols, **kwargs)

    返回一个新的DataFrame,每个分区按照指定的列排序。

    参数:●cols – 要排序的列或列名称列表。

    ascending – 布尔值或布尔值列表(默认为True)。 排序升序降序。 指定多个排序顺序的列表。 如果指定了列表,列表的长度必须等于列的长度。

    df.sortWithinPartitions("age", ascending=False).show()

    2.56 stat 

    返回统计功能的DataFrameStatFunctions。

    2.57 subtract(other)

    返回一个新的DataFrame,这个DataFrame中包含的行不在另一个DataFrame中。

    这相当于SQL中的EXCEPT。

    In [278]: df.subtract(df4).show()

    +-----+---+------+

    | name|age|height|

    +-----+---+------+

    |Alice|  2|    80|

    +-----+---+------+

    2.58 take(num)

    返回前num行的行列表

    >>>df.take(2)

    2.59 toDF(*cols)

    返回一个新类:具有新的指定列名称的DataFrame。

    参数:● cols – 新列名列表(字符串)。

    In [280]: df.toDF('a','b','c').show()

    +-----+---+----+

    |    a|  b|  c|

    +-----+---+----+

    |Alice|  2|  80|

    |  Bob|  5|null|

    +-----+---+----+

    In [281]: df.show()

    +-----+---+------+

    | name|age|height|

    +-----+---+------+

    |Alice|  2|    80|

    |  Bob|  5|  null|

    +-----+---+------+

    2.60 toJSON(use_unicode=True)

    将DataFrame转换为字符串的RDD。

    每行都将转换为JSON格式作为返回的RDD中的一个元素。

    In [284]: df.toJSON().take(2)

    Out[284]: [u'{"name":"Alice","age":2,"height":80}', u'{"name":"Bob","age":5}']

    2.61 toPandas()

    将此DataFrame的内容返回为Pandas pandas.DataFrame。

    这只有在pandas安装和可用的情况下才可用。

    表:In [286]: df.toPandas()

    Out[286]:

        name  age  height

    0  Alice    2    80.0

    1    Bob    5    NaN

    2.62 unionAll(other)

    返回包含在这个frame和另一个frame的行的联合的新DataFrame。

    这相当于SQL中的UNION ALL。

    In [297]: df.unionAll(df4).show()

    +-----+----+------+

    | name| age|height|

    +-----+----+------+

    |Alice|  2|    80|

    |  Bob|  5|  null|

    |Alice|  10|    80|

    |  Bob|  5|  null|

    |  Tom|null|  null|

    | null|null|  null|

    +-----+----+------+

    2.63 unpersist(blocking=True)

    将DataFrame标记为非持久性,并从内存和磁盘中删除所有的块。

    2.64 where(condition)

    使用给定表达式过滤行。

    where()是filter()的别名。

    2.65 withColumn(colName, col)

    通过添加列或替换具有相同名称的现有列来返回新的DataFrame。

    参数:● colName – 字符串,新列的名称

    ● col – 新列的列表达式

    >>>df.withColumn('age2', df.age + 2).collect()

    [Row(name=u'Alice', age=2, age2=4), Row(name=u'Bob', age=5, age2=7)]

    2.66 withColumnRenamed(existing, new)

    通过重命名现有列来返回新的DataFrame。

    参数:● existing – 字符串,要重命名的现有列的名称

    ● col – 字符串,列的新名称

    >>>df.withColumnRenamed('age', 'age2').collect()

    [Row(name=u'Alice', age2=2), Row(name=u'Bob', age2=5)]

    2.67 write

    用于将DataFrame的内容保存到外部存储的接口。

    返回:DataFrameWriter

    相关文章

      网友评论

          本文标题:中文文档 pyspark.sql.DataFrame

          本文链接:https://www.haomeiwen.com/subject/ejlruhtx.html