PySpark NoteBook-5

作者: 7125messi | 来源:发表于2018-01-11 21:28 被阅读19次

    dtypes, udf, drop, groupBy, agg, withColumn, dateFormat, select

    import datetime
    from pyspark.sql import Row
    from pyspark.sql.functions import col
    
    row = Row("date", "name", "production")
    
    df = sc.parallelize([
        row("08/01/2014", "Kim", 5),
        row("08/02/2014", "Kim", 14),
        row("08/01/2014", "Bob", 6),
        row("08/02/2014", "Bob", 3),
        row("08/01/2014", "Sue", 0),
        row("08/02/2014", "Sue", 22),
        row("08/01/2014", "Dan", 4),
        row("08/02/2014", "Dan", 4),
        row("08/01/2014", "Joe", 37),
        row("09/01/2014", "Kim", 6),
        row("09/02/2014", "Kim", 6),
        row("09/01/2014", "Bob", 4),
        row("09/02/2014", "Bob", 20),
        row("09/01/2014", "Sue", 11),
        row("09/02/2014", "Sue", 2),
        row("09/01/2014", "Dan", 1),
        row("09/02/2014", "Dan", 3),
        row("09/02/2014", "Joe", 29)
        ]).toDF()
    df.show()
    +----------+----+----------+
    |      date|name|production|
    +----------+----+----------+
    |08/01/2014| Kim|         5|
    |08/02/2014| Kim|        14|
    |08/01/2014| Bob|         6|
    |08/02/2014| Bob|         3|
    |08/01/2014| Sue|         0|
    |08/02/2014| Sue|        22|
    |08/01/2014| Dan|         4|
    |08/02/2014| Dan|         4|
    |08/01/2014| Joe|        37|
    |09/01/2014| Kim|         6|
    |09/02/2014| Kim|         6|
    |09/01/2014| Bob|         4|
    |09/02/2014| Bob|        20|
    |09/01/2014| Sue|        11|
    |09/02/2014| Sue|         2|
    |09/01/2014| Dan|         1|
    |09/02/2014| Dan|         3|
    |09/02/2014| Joe|        29|
    +----------+----+----------+
    df.dtypes
    [('date', 'string'), ('name', 'string'), ('production', 'bigint')]
    
    from pyspark.sql.functions import udf
    def split_date(whole_date):
        try:
            mo, day, yr = whole_date.split('/')
        except ValueError:
            return 'error'
        return mo + '/' + yr
    
    udf_split_date = udf(split_date)
    
    df_new = df.withColumn('month_year', udf_split_date('date'))
    
    df_new.show()
    
    +----------+----+----------+----------+
    |      date|name|production|month_year|
    +----------+----+----------+----------+
    |08/01/2014| Kim|         5|   08/2014|
    |08/02/2014| Kim|        14|   08/2014|
    |08/01/2014| Bob|         6|   08/2014|
    |08/02/2014| Bob|         3|   08/2014|
    |08/01/2014| Sue|         0|   08/2014|
    |08/02/2014| Sue|        22|   08/2014|
    |08/01/2014| Dan|         4|   08/2014|
    |08/02/2014| Dan|         4|   08/2014|
    |08/01/2014| Joe|        37|   08/2014|
    |09/01/2014| Kim|         6|   09/2014|
    |09/02/2014| Kim|         6|   09/2014|
    |09/01/2014| Bob|         4|   09/2014|
    |09/02/2014| Bob|        20|   09/2014|
    |09/01/2014| Sue|        11|   09/2014|
    |09/02/2014| Sue|         2|   09/2014|
    |09/01/2014| Dan|         1|   09/2014|
    |09/02/2014| Dan|         3|   09/2014|
    |09/02/2014| Joe|        29|   09/2014|
    +----------+----+----------+----------+
    
    df_new = df_new.drop('date')
    
    df_agg = df_new.groupBy('month_year', 'name').agg({'production' : 'sum'})
    
    df_agg.show()
    +----------+----+---------------+
    |month_year|name|sum(production)|
    +----------+----+---------------+
    |   09/2014| Sue|             13|
    |   09/2014| Kim|             12|
    |   09/2014| Bob|             24|
    |   09/2014| Joe|             29|
    |   09/2014| Dan|              4|
    |   08/2014| Kim|             19|
    |   08/2014| Joe|             37|
    |   08/2014| Dan|              8|
    |   08/2014| Sue|             22|
    |   08/2014| Bob|              9|
    +----------+----+---------------+
    
    from pyspark.sql.functions import udf
    from pyspark.sql.types import DateType
    from datetime import datetime
    
    dateFormat = udf(lambda x: datetime.strptime(x, '%M/%d/%Y'), DateType())
        
    df_d = df.withColumn('new_date', dateFormat(col('date')))
    df_d.dtypes
    [('date', 'string'),
     ('name', 'string'),
     ('production', 'bigint'),
     ('new_date', 'date')]
    
    df_d.select('new_date').take(1)
    [Row(new_date=datetime.date(2014, 1, 1))]
    

    相关文章

      网友评论

        本文标题:PySpark NoteBook-5

        本文链接:https://www.haomeiwen.com/subject/keotoxtx.html