1 简单的RDD
本文的用到的数据集地址:链接: https://pan.baidu.com/s/1pNfwvSv 密码: a3qs
data = sc.textFile("file:///root/ydzhao/PySpark/data/UserPurchaseHistory.csv").\
map(lambda line: line.split(",")).\
map(lambda record: (record[0], record[1], record[2]))
numPurchases = data.count()
numPurchases
> 5
uniqueUsers = data.map(lambda record: record[0]).distinct().count()
uniqueUsers
> 4
totalRevenue = data.map(lambda record: float(record[2])).sum()
totalRevenue
> 39.91
products = data.map(lambda record: (record[1], 1)).reduceByKey(lambda a, b: a + b).collect()
mostPopular = sorted(products, key=lambda x: x[1], reverse=True)[0]
mostPopular
> ('iPhone Cover', 2)
print ("Total purchases:{0}".format(numPurchases))
print ("Unique users:{0}".format(uniqueUsers))
print ("Total revenue:{0}".format(totalRevenue))
print ("Most popular product:{0} with {1} purchases".format(mostPopular[0], mostPopular[1]))
> Total purchases:5
Unique users:4
Total revenue:39.91
Most popular product:iPhone Cover with 2 purchases
2 利用RDD进行数据的预处理
数据预处理阶段较为关键,RDD灵活多变,但是较为麻烦
2.1 Exploring the User Dataset
user_data = sc.textFile("file:///root/ydzhao/PySpark/data/u.user")
user_data.first()
> '1|24|M|technician|85711'
user_fields = user_data.map(lambda line: line.split("|"))
num_users = user_fields.map(lambda fields: fields[0]).count()
num_genders = user_fields.map(lambda fields: fields[2]).distinct().count()
num_occupations = user_fields.map(lambda fields: fields[3]).distinct().count()
num_zipcodes = user_fields.map(lambda fields: fields[4]).distinct().count()
print ("Users: {0}, genders: {1}, occupations: {2}, ZIP codes: {3}".format(num_users, num_genders, num_occupations, num_zipcodes))
> Users: 943, genders: 2, occupations: 21, ZIP codes: 795
%matplotlib inline
import matplotlib.pyplot as plt
fig = plt.gcf()
fig.set_size_inches(16, 10)
ages = user_fields.map(lambda x: int(x[1])).collect()
plt.hist(ages, bins=20, color='lightblue', normed=True)
> (array([ 0.00064269, 0.00192808, 0.00449886, 0.0279572 , 0.02956393,
0.03374144, 0.04563129, 0.02538642, 0.02088756, 0.01863813,
0.02088756, 0.01606735, 0.0170314 , 0.01863813, 0.00674829,
0.00482021, 0.0054629 , 0.00192808, 0.00128539, 0.00128539]),
array([ 7. , 10.3, 13.6, 16.9, 20.2, 23.5, 26.8, 30.1, 33.4,
36.7, 40. , 43.3, 46.6, 49.9, 53.2, 56.5, 59.8, 63.1,
66.4, 69.7, 73. ]),
<a list of 20 Patch objects>)
image.png
import numpy as np
count_by_occupation = user_fields.map(lambda fields: (fields[3], 1)).reduceByKey(lambda x, y: x + y).collect()
x_axis1 = np.array([c[0] for c in count_by_occupation])
y_axis1 = np.array([c[1] for c in count_by_occupation])
x_axis = x_axis1[np.argsort(y_axis1)]
y_axis = y_axis1[np.argsort(y_axis1)]
pos = np.arange(len(x_axis))
width = 1.0
ax = plt.axes()
ax.set_xticks(pos + (width / 2))
ax.set_xticklabels(x_axis)
fig = plt.gcf()
fig.set_size_inches(16, 10)
plt.bar(pos, y_axis, width, color='lightblue')
plt.xticks(rotation=30)
> (array([ 0.5, 1.5, 2.5, 3.5, 4.5, 5.5, 6.5, 7.5, 8.5,
9.5, 10.5, 11.5, 12.5, 13.5, 14.5, 15.5, 16.5, 17.5,
18.5, 19.5, 20.5]), <a list of 21 Text xticklabel objects>)
image.png
2.2 Exploring the Movie Dataset
movie_data = sc.textFile("file:///root/ydzhao/PySpark/data/u.item")
print (movie_data.first())
num_movies = movie_data.count()
print ("Movies: {0}".format(num_movies))
> 1|Toy Story (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Toy%20Story%20(1995)|0|0|0|1|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0
> Movies: 1682
def convert_year(x):
try:
return int(x[-4:])
except:
return 1900
movie_fields = movie_data.map(lambda lines: lines.split("|"))
years = movie_fields.map(lambda fields: fields[2]).map(lambda x: convert_year(x))
#
years_filtered = years.filter(lambda x: x != 1900)
# plot the movie ages histogram
movie_ages = years_filtered.map(lambda yr: 1998-yr).countByValue()
values = movie_ages.values()
bins = movie_ages.keys()
fig = plt.gcf()
fig.set_size_inches(16,10)
plt.hist(list(values), bins=list(bins), color='lightblue', normed=True)
> (array([ 0. , 0.07575758, 0.09090909, 0.09090909, 0.18181818,
0.18181818, 0.04545455, 0.07575758, 0.07575758, 0.03030303,
0. , 0.01515152, 0.01515152, 0.03030303, 0. ,
0.03030303, 0. , 0. , 0. , 0. ,
0. , 0. , 0.01515152, 0. , 0.01515152,
0. , 0. , 0. , 0. , 0. ,
0. , 0. , 0. , 0. , 0. ,
0. , 0. , 0.01515152, 0. , 0. ,
0. , 0. , 0. , 0. , 0. ,
0. , 0. , 0. , 0. , 0. ,
0. , 0. , 0. , 0. , 0. ,
0. , 0. , 0. , 0. , 0. ,
0. , 0. , 0. , 0. , 0. ,
0.01515152, 0. , 0. , 0. , 0. ]),
array([ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16,
17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33,
34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50,
51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67,
68, 72, 76]),
<a list of 70 Patch objects>)
image.png
2.3 Exploring the Rating Dataset
rating_data_raw = sc.textFile("file:///root/ydzhao/PySpark/data/u.data")
print (rating_data_raw.first())
num_ratings = rating_data_raw.count()
print ("Ratings: {0}".format(num_ratings))
> 196 242 3 881250949
> Ratings: 100000
rating_data = rating_data_raw.map(lambda line: line.split("\t"))
ratings = rating_data.map(lambda fields: int(fields[2]))
max_rating = ratings.reduce(lambda x, y: max(x, y))
min_rating = ratings.reduce(lambda x, y: min(x, y))
mean_rating = ratings.reduce(lambda x, y: x + y) / float(num_ratings)
median_rating = np.median(ratings.collect())
ratings_per_user = num_ratings / num_users
ratings_per_movie = num_ratings / num_movies
print ("Min rating: {0}".format(min_rating))
print ("Max rating: {0}".format(max_rating))
print ("Average rating: {0:.3}".format(mean_rating))
print ("Median rating: {0}".format(median_rating))
print ("Average # of ratings per user: {0:.5}".format(ratings_per_user))
print ("Average # of ratings per movie:{0:.4}".format(ratings_per_movie))
> Min rating: 1
Max rating: 5
Average rating: 3.53
Median rating: 4.0
Average # of ratings per user: 106.04
Average # of ratings per movie:59.45
ratings.stats()
> (count: 100000, mean: 3.5298600000000024, stdev: 1.12566797076, max: 5.0, min: 1.0)
2.4 Filling in Bad or Missing Values
years_pre_processed = movie_fields.map(lambda fields: fields[2]).map(lambda x: convert_year(x)).filter(lambda yr: yr != 1900).collect()
years_pre_processed_arr = np.array(years_pre_processed)
mean_year = np.mean(years_pre_processed_arr[years_pre_processed_arr!=1900])
median_year = np.median(years_pre_processed_arr[years_pre_processed_arr!=1900])
print (np.median(years_pre_processed_arr))
print (np.mean(years_pre_processed_arr))
#idx_bad_data = np.where(years_pre_processed_arr==1900)[0][0]
idx_bad_data = np.where(years_pre_processed_arr==1900)[0]
years_pre_processed_arr[idx_bad_data] = median_year
print ("Mean year of release: {0}".format(mean_year))
print ("Median year of release: {0}".format(median_year))
print ("Index of '1900' after assigning median: {0}".format(np.where(years_pre_processed_arr == 1900)[0]))
输出结果:
1995.0
1989.38607971
Mean year of release: 1989.3860797144557
Median year of release: 1995.0
Index of '1900' after assigning median: []
3 Feature Extraction
Categorical Features: 1-of-k Encoding of User Occupation
all_occupations = user_fields.map(lambda fields: fields[3]).distinct().collect()
all_occupations.sort()
idx = 0
all_occupations_dict = {}
for o in all_occupations:
all_occupations_dict[o] = idx
idx +=1
print ("Encoding of 'doctor': {0}".format(all_occupations_dict['doctor']))
print ("Encoding of 'programmer': {0}".format(all_occupations_dict['programmer']))
输出结果:
Encoding of 'doctor': 2
Encoding of 'programmer': 14
K = len(all_occupations_dict)
binary_x = np.zeros(K)
k_programmer = all_occupations_dict['programmer']
binary_x[k_programmer] = 1
print ("Binary feature vector: {0}".format(binary_x))
print ("Length of binary vector: {0}".format(K))
输出结果:
Binary feature vector: [ 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0.
0. 0. 0.]
Length of binary vector: 21
Transforming Timestamps into Categorical Features
# a function to extract the timestamps (in seconds) from the dataset
def extract_datetime(ts):
import datetime
return datetime.datetime.fromtimestamp(ts)
timestamps = rating_data.map(lambda fields: int(fields[3]))
hour_of_day = timestamps.map(lambda ts: extract_datetime(ts).hour)
hour_of_day.take(5)
输出结果:
[23, 3, 15, 13, 13]
# a function for assigning "time-of-day" bucket given an hour of the day
def assign_tod(hr):
times_of_day = {
'morning' : range(7, 12),
'lunch' : range(12, 14),
'afternoon' : range(14, 18),
'evening' : range(18, 23),
'night' : range(23, 7)
}
for k, v in times_of_day.items():
if hr in v:
return k
# now apply the "time of day" function to the "hour of day" RDD
time_of_day = hour_of_day.map(lambda hr: assign_tod(hr))
time_of_day.take(5)
输出结果:
[None, None, 'afternoon', 'lunch', 'lunch']
Simple Text Feature Extraction(正则表达式re模块)
# we define a function to extract just the title from the raw movie title, removing the year of release
def extract_title(raw):
import re
grps = re.search("\((\w+)\)", raw)
if grps:
return raw[:grps.start()].strip()
else:
return raw
raw_titles = movie_fields.map(lambda fields: fields[1])
for raw_title in raw_titles.take(5):
print (extract_title(raw_title))
输出结果:
Toy Story
GoldenEye
Four Rooms
Get Shorty
Copycat
movie_titles = raw_titles.map(lambda m: extract_title(m))
title_terms = movie_titles.map(lambda t: t.split(" "))
print (title_terms.take(5))
> [['Toy', 'Story'], ['GoldenEye'], ['Four', 'Rooms'], ['Get', 'Shorty'], ['Copycat']]
all_terms = title_terms.flatMap(lambda x: x).distinct().collect()
# create a new dictionary to hold the terms, and assign the "1-of-k" indexes
idx = 0
all_terms_dict = {}
for term in all_terms:
all_terms_dict[term] = idx
idx +=1
num_terms = len(all_terms_dict)
print ("Total number of terms: {0}".format(num_terms))
print ("Index of term 'Dead': {0}".format(all_terms_dict['Dead']))
print ("Index of term 'Rooms': {0}".format(all_terms_dict['Rooms']))
输出结果:
Total number of terms: 2645
Index of term 'Dead': 1493
Index of term 'Rooms': 96
# we could also use Spark's 'zipWithIndex' RDD function to create the term dictionary
all_terms_dict2 = title_terms.flatMap(lambda x: x).distinct().zipWithIndex().collectAsMap()
print ("Index of term 'Dead': {0}".format(all_terms_dict2['Dead']))
print ("Index of term 'Rooms': {0}".format(all_terms_dict2['Rooms']))
输出结果:
Index of term 'Dead': 1493
Index of term 'Rooms': 96
网友评论