dtypes, udf, drop, groupBy, agg, withColumn, dateFormat, select
import datetime
from pyspark.sql import Row
from pyspark.sql.functions import col
row = Row("date", "name", "production")
df = sc.parallelize([
row("08/01/2014", "Kim", 5),
row("08/02/2014", "Kim", 14),
row("08/01/2014", "Bob", 6),
row("08/02/2014", "Bob", 3),
row("08/01/2014", "Sue", 0),
row("08/02/2014", "Sue", 22),
row("08/01/2014", "Dan", 4),
row("08/02/2014", "Dan", 4),
row("08/01/2014", "Joe", 37),
row("09/01/2014", "Kim", 6),
row("09/02/2014", "Kim", 6),
row("09/01/2014", "Bob", 4),
row("09/02/2014", "Bob", 20),
row("09/01/2014", "Sue", 11),
row("09/02/2014", "Sue", 2),
row("09/01/2014", "Dan", 1),
row("09/02/2014", "Dan", 3),
row("09/02/2014", "Joe", 29)
]).toDF()
df.show()
+----------+----+----------+
| date|name|production|
+----------+----+----------+
|08/01/2014| Kim| 5|
|08/02/2014| Kim| 14|
|08/01/2014| Bob| 6|
|08/02/2014| Bob| 3|
|08/01/2014| Sue| 0|
|08/02/2014| Sue| 22|
|08/01/2014| Dan| 4|
|08/02/2014| Dan| 4|
|08/01/2014| Joe| 37|
|09/01/2014| Kim| 6|
|09/02/2014| Kim| 6|
|09/01/2014| Bob| 4|
|09/02/2014| Bob| 20|
|09/01/2014| Sue| 11|
|09/02/2014| Sue| 2|
|09/01/2014| Dan| 1|
|09/02/2014| Dan| 3|
|09/02/2014| Joe| 29|
+----------+----+----------+
df.dtypes
[('date', 'string'), ('name', 'string'), ('production', 'bigint')]
from pyspark.sql.functions import udf
def split_date(whole_date):
try:
mo, day, yr = whole_date.split('/')
except ValueError:
return 'error'
return mo + '/' + yr
udf_split_date = udf(split_date)
df_new = df.withColumn('month_year', udf_split_date('date'))
df_new.show()
+----------+----+----------+----------+
| date|name|production|month_year|
+----------+----+----------+----------+
|08/01/2014| Kim| 5| 08/2014|
|08/02/2014| Kim| 14| 08/2014|
|08/01/2014| Bob| 6| 08/2014|
|08/02/2014| Bob| 3| 08/2014|
|08/01/2014| Sue| 0| 08/2014|
|08/02/2014| Sue| 22| 08/2014|
|08/01/2014| Dan| 4| 08/2014|
|08/02/2014| Dan| 4| 08/2014|
|08/01/2014| Joe| 37| 08/2014|
|09/01/2014| Kim| 6| 09/2014|
|09/02/2014| Kim| 6| 09/2014|
|09/01/2014| Bob| 4| 09/2014|
|09/02/2014| Bob| 20| 09/2014|
|09/01/2014| Sue| 11| 09/2014|
|09/02/2014| Sue| 2| 09/2014|
|09/01/2014| Dan| 1| 09/2014|
|09/02/2014| Dan| 3| 09/2014|
|09/02/2014| Joe| 29| 09/2014|
+----------+----+----------+----------+
df_new = df_new.drop('date')
df_agg = df_new.groupBy('month_year', 'name').agg({'production' : 'sum'})
df_agg.show()
+----------+----+---------------+
|month_year|name|sum(production)|
+----------+----+---------------+
| 09/2014| Sue| 13|
| 09/2014| Kim| 12|
| 09/2014| Bob| 24|
| 09/2014| Joe| 29|
| 09/2014| Dan| 4|
| 08/2014| Kim| 19|
| 08/2014| Joe| 37|
| 08/2014| Dan| 8|
| 08/2014| Sue| 22|
| 08/2014| Bob| 9|
+----------+----+---------------+
from pyspark.sql.functions import udf
from pyspark.sql.types import DateType
from datetime import datetime
dateFormat = udf(lambda x: datetime.strptime(x, '%M/%d/%Y'), DateType())
df_d = df.withColumn('new_date', dateFormat(col('date')))
df_d.dtypes
[('date', 'string'),
('name', 'string'),
('production', 'bigint'),
('new_date', 'date')]
df_d.select('new_date').take(1)
[Row(new_date=datetime.date(2014, 1, 1))]
网友评论