美文网首页
Spark常用函数

Spark常用函数

作者: nlpming | 来源:发表于2020-09-13 10:43 被阅读0次

    1. 启动pyspark shell

    pyspark --queue xxx
    

    2. rdd 常用函数

    sortByKey函数、sortBy函数
    • 功能说明:根据key或者value对数据进行排序;
      (1)sortByKey 根据key进行排序;
      (2)sortBy 可根据key或value进行排序;
      (3)ascending 参数用于设定升序或者降序;
      (4)注意: sort之后需要用coalesce分区保存数据,repartition会把数据打乱;
    • 具体用法:
    rdd.sortByKey().first()
    rdd.sortByKey(True, 3)
    rdd.sortBy(lambda x: x[0]).collect()
    rdd.sortBy(lambda x: x[1]).collect()
    
    reduceByKey函数
    • 功能说明:合并具有相同key的数据;
    • 具体用法:
    from operator import add
    
    rdd.reduceByKey(lambda x,y: x+y)
    rdd.reduceByKey(add)
    
    flatMap函数
    • 功能说明:对rdd中的每个元素map操作,然后flat出来;
    • 具体用法:
    rdd.flatMap(lambda x: range(1,x))
    
    def test(items):
      for item in items:
        yield item+1
    rdd.flatMap(test)
    
    mapPartitions函数
    • 功能说明:将整个分区的数据全部读进来一起操作,即 mapPartitions 的输入是一个list;
    >>> rdd = sc.parallelize([1, 2, 3, 4], 2)
    >>> def f(iterator): yield sum(iterator)
    >>> rdd.mapPartitions(f).collect()
    [3, 7]
    
    groupByKey函数
    • 功能说明:对rdd进行分组,相同key的数据归为一组;
    • 具体用法:
    rdd.groupByKey().mapValues(list)   # 或者通过item.data得到list
    
    cogroup函数
    • 功能说明:首先分别将x,y中相同key group在一起,然后join x,y中相同的key;
    • 具体用法:
    cogroup函数.png
    keyBy函数
    • 功能说明:创建一个元组,定义某个函数f生成key;
    • 具体用法:
    >>> x = sc.parallelize(range(0,3)).keyBy(lambda x: x*x)
    >>> y = sc.parallelize(zip(range(0,5), range(0,5)))
    >>> [(x, list(map(list, y))) for x, y in sorted(x.cogroup(y).collect())]
    [(0, [[0], [0]]), (1, [[1], [1]]), (2, [[], [2]]), (3, [[], [3]]), (4, [[2], [4]])]
    
    collectAsMap函数
    • 功能说明:将rdd中的数据collect出来,rdd中元组第一个元素为key,第二个元素为value;
    • 具体用法:
    >>> m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap()
    >>> m[1]
    2
    >>> m[3]
    4
    
    join, leftOuterJoin函数
    • 功能说明:与sql中的join, leftouterjoin类似;
    • 具体用法:
    # join函数的使用
    >>> x = sc.parallelize([("a", 1), ("b", 4)])
    >>> y = sc.parallelize([("a", 2), ("a", 3)])
    >>> sorted(x.join(y).collect())
    [('a', (1, 2)), ('a', (1, 3))]
    
    # leftOuterJoin函数的使用
    >>> x = sc.parallelize([("a", 1), ("b", 4)])
    >>> y = sc.parallelize([("a", 2)])
    >>> sorted(x.leftOuterJoin(y).collect())
    [('a', (1, 2)), ('b', (4, None))]
    
    coalesce, repartition函数
    • 功能说明:将rdd中的数据重新分区;repartition会将原来的数据打乱分区,coalesce会保留原来的数据顺序不变;
    • 具体用法:注意,在保存数据的时候需要使用repartition/coalesce对数据进行分区;
    >>> rdd = sc.parallelize([1,2,3,4,5,6,7], 4)
    >>> sorted(rdd.glom().collect())
    [[1], [2, 3], [4, 5], [6, 7]]
    >>> len(rdd.repartition(2).glom().collect())
    2
    >>> len(rdd.repartition(10).glom().collect())
    10
    
    sample函数
    • 功能说明:对rdd中的数据进行随机采样;
    • 参数说明:
      (1)第一个参数:withReplacement 代表一个数据是否能被采样多次;
      (2)第二个参数:fraction 代表数据采样的比例;
    • 具体用法:
    >>> rdd = sc.parallelize(range(100), 4)
    >>> 6 <= rdd.sample(False, 0.1, 81).count() <= 14
    True
    

    3. DataFrame常用函数

    pyspark.sql.DataFrame.rdd 函数
    • 功能说明:将DataFrame转化成rdd,注意:访问使用x.key
    pyspark.sql.Row 函数
    • 功能说明:DataFrame中的一行
    • 具体使用:
    # 将rdd转成DataFrame,rdd中存放的是字典;
    rdd.map(lambda x: Row(**x)).toDF()
    
    pyspark.sql.DataFrame.select 函数
    • 功能说明:选择DataFrame中的某些列,生成一个新的DataFrame;
    • 具体使用:
    >>> df.select('*').collect()
    [Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
    >>> df.select('name', 'age').collect()
    [Row(name=u'Alice', age=2), Row(name=u'Bob', age=5)]
    >>> df.select(df.name, (df.age + 10).alias('age')).collect()
    [Row(name=u'Alice', age=12), Row(name=u'Bob', age=15)]
    
    pyspark.sql.DataFrame.show 函数
    • 功能说明:显示DataFrame中的内容;
    • 具体使用:
    >>> df
    DataFrame[age: int, name: string]
    >>> df.show()
    +---+-----+
    |age| name|
    +---+-----+
    |  2|Alice|
    |  5|  Bob|
    +---+-----+
    >>> df.show(2, truncate=3)
    +---+----+
    |age|name|
    +---+----+
    |  2| Ali|
    |  5| Bob|
    +---+----+
    
    pyspark.sql.DataFrame.agg 函数
    • 功能说明:对数据进行聚合操作;
    • 具体使用:
    # 求某列的最大值、最小值
    >>> df.agg({"age": "max"}).collect()
    [Row(max(age)=5)]
    >>> from pyspark.sql import functions as F
    >>> df.agg(F.min(df.age)).collect()
    [Row(min(age)=2)]
    
    pyspark.sql.DataFrame.withColumn 函数
    • 功能说明:在DataFrame中新增一列;
    • 具体使用:
    >>> df.withColumn('age2', df.age + 2).collect()
    [Row(age=2, name=u'Alice', age2=4), Row(age=5, name=u'Bob', age2=7)]
    
    pyspark.sql.DataFrame.withColumnRenamed 函数
    • 功能说明:对某列进行重命名;
    • 具体使用:
    >>> df.withColumnRenamed('age', 'age2').collect()
    [Row(age2=2, name=u'Alice'), Row(age2=5, name=u'Bob')]
    
    pyspark.sql.Column.like 函数
    • 功能说明:DataFrame中某一列近似匹配;
    • 具体使用:
    >>> df.filter(df.name.like('Al%')).collect()
    [Row(age=2, name=u'Alice')]
    
    pyspark.sql.Column.isNull、pyspark.sql.Column.isNotNull函数
    • 功能说明:判断列中的内容是否为空;
    • 具体使用:
    # isNotNull 函数
    >>> from pyspark.sql import Row
    >>> df2 = sc.parallelize([Row(name=u'Tom', height=80), Row(name=u'Alice', height=None)]).toDF()
    >>> df2.filter(df2.height.isNotNull()).collect()
    [Row(height=80, name=u'Tom')]
    
    # isNull 函数
    >>> from pyspark.sql import Row
    >>> df2 = sc.parallelize([Row(name=u'Tom', height=80), Row(name=u'Alice', height=None)]).toDF()
    >>> df2.filter(df2.height.isNull()).collect()
    [Row(height=None, name=u'Alice')]
    
    pyspark.sql.functions.row_number函数
    • 功能说明:给DataFrame中的行加上序号;
    • 具体用法:
    df.withColumn("No", row_number().over(Window.orderBy(desc("pa ratio"))))
    
    pyspark.sql.functions.udf 函数
    • 功能说明:用户自定义函数,用于满足特定的需求;
    • 具体用法:
    >>> from pyspark.sql.types import IntegerType
    >>> slen = udf(lambda s: len(s), IntegerType())
    >>> :udf
    ... def to_upper(s):
    ...     if s is not None:
    ...         return s.upper()
    ...
    >>> :udf(returnType=IntegerType())
    ... def add_one(x):
    ...     if x is not None:
    ...         return x + 1
    ...
    >>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
    >>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")).show()
    +----------+--------------+------------+
    |slen(name)|to_upper(name)|add_one(age)|
    +----------+--------------+------------+
    |         8|      JOHN DOE|          22|
    +----------+--------------+------------+
    
    pyspark.sql.Window 函数
    • 功能说明:定义窗口函数,用于对DataFrame进行操作;
      参考:How Do Spark Window Functions Work? A Practical Guide to PySpark Window Functions | PySpark Tutorial
      https://www.youtube.com/watch?v=wH6RaEWuiDQ
    对DataFrame进行简单统计分析
    # 简单统计分析
    df.groupBy("age").count().orderBy("count", ascending=False).show()
    
    # 稍复杂操作
    from pyspark.sql import SparkSession, Row, Window
    from pyspark.sql.functions import row_number, desc, asc, min, max
    
    df_sum = df.groupBy("xxx") \
        .agg({"'xxx": "sum", "xxx": "count"})
        
    df_out = df_sum.withColumn("road ratio", 100*df_sum["count(xxx)"]/100959) \
        .withColumn("pa ratio", 100*df_sum["sum(xxx)"]/3723430) \
        .withColumn("No", row_number().over(Window.orderBy(desc("pa ratio")))) \
        .withColumnRenamed("xxx", "xxx") \
        .withColumnRenamed("count(xxx)", "xxx") \
        .withColumnRenamed("sum(xxx)", "xxx")
        
    df_out.show()
    
    df_out.filter(df_out["序号"] <= 100).agg(min("xxx"), max("xxx")).show()
    

    参考资料

    相关文章

      网友评论

          本文标题:Spark常用函数

          本文链接:https://www.haomeiwen.com/subject/ohhisktx.html