美文网首页pyspark学习
案例2—RDD数据清洗

案例2—RDD数据清洗

作者: 7125messi | 来源:发表于2018-01-12 11:10 被阅读69次

    1 简单的RDD

    本文的用到的数据集地址:链接: https://pan.baidu.com/s/1pNfwvSv 密码: a3qs

    data = sc.textFile("file:///root/ydzhao/PySpark/data/UserPurchaseHistory.csv").\
        map(lambda line: line.split(",")).\
        map(lambda record: (record[0], record[1], record[2]))
    
    numPurchases = data.count()
    numPurchases
    > 5
    
    uniqueUsers = data.map(lambda record: record[0]).distinct().count()
    uniqueUsers
    > 4
    
    totalRevenue = data.map(lambda record: float(record[2])).sum()
    totalRevenue
    > 39.91
    
    products = data.map(lambda record: (record[1], 1)).reduceByKey(lambda a, b: a + b).collect()
    mostPopular = sorted(products, key=lambda x: x[1], reverse=True)[0]
    mostPopular
    > ('iPhone Cover', 2)
    
    print ("Total purchases:{0}".format(numPurchases))
    print ("Unique users:{0}".format(uniqueUsers))
    print ("Total revenue:{0}".format(totalRevenue))
    print ("Most popular product:{0} with {1} purchases".format(mostPopular[0], mostPopular[1]))
    > Total purchases:5
    Unique users:4
    Total revenue:39.91
    Most popular product:iPhone Cover with 2 purchases
    

    2 利用RDD进行数据的预处理

    数据预处理阶段较为关键,RDD灵活多变,但是较为麻烦

    2.1 Exploring the User Dataset

    user_data = sc.textFile("file:///root/ydzhao/PySpark/data/u.user")
    user_data.first()
    > '1|24|M|technician|85711'
    
    user_fields = user_data.map(lambda line: line.split("|"))
    num_users = user_fields.map(lambda fields: fields[0]).count()
    num_genders = user_fields.map(lambda fields: fields[2]).distinct().count()
    num_occupations = user_fields.map(lambda fields: fields[3]).distinct().count()
    num_zipcodes = user_fields.map(lambda fields: fields[4]).distinct().count()
    print ("Users: {0}, genders: {1}, occupations: {2}, ZIP codes: {3}".format(num_users, num_genders, num_occupations, num_zipcodes))
    > Users: 943, genders: 2, occupations: 21, ZIP codes: 795
    
    %matplotlib inline
    import matplotlib.pyplot as plt
    fig = plt.gcf()
    fig.set_size_inches(16, 10)
    
    ages = user_fields.map(lambda x: int(x[1])).collect()
    plt.hist(ages, bins=20, color='lightblue', normed=True)
    
    > (array([ 0.00064269,  0.00192808,  0.00449886,  0.0279572 ,  0.02956393,
             0.03374144,  0.04563129,  0.02538642,  0.02088756,  0.01863813,
             0.02088756,  0.01606735,  0.0170314 ,  0.01863813,  0.00674829,
             0.00482021,  0.0054629 ,  0.00192808,  0.00128539,  0.00128539]),
     array([  7. ,  10.3,  13.6,  16.9,  20.2,  23.5,  26.8,  30.1,  33.4,
             36.7,  40. ,  43.3,  46.6,  49.9,  53.2,  56.5,  59.8,  63.1,
             66.4,  69.7,  73. ]),
     <a list of 20 Patch objects>)
    
    image.png
    import numpy as np
    count_by_occupation = user_fields.map(lambda fields: (fields[3], 1)).reduceByKey(lambda x, y: x + y).collect()
    x_axis1 = np.array([c[0] for c in count_by_occupation])
    y_axis1 = np.array([c[1] for c in count_by_occupation])
    x_axis = x_axis1[np.argsort(y_axis1)]
    y_axis = y_axis1[np.argsort(y_axis1)]
    
    pos = np.arange(len(x_axis))
    width = 1.0
    
    ax = plt.axes()
    ax.set_xticks(pos + (width / 2))
    ax.set_xticklabels(x_axis)
    
    fig = plt.gcf()
    fig.set_size_inches(16, 10)
    plt.bar(pos, y_axis, width, color='lightblue')
    plt.xticks(rotation=30)
    > (array([  0.5,   1.5,   2.5,   3.5,   4.5,   5.5,   6.5,   7.5,   8.5,
              9.5,  10.5,  11.5,  12.5,  13.5,  14.5,  15.5,  16.5,  17.5,
             18.5,  19.5,  20.5]), <a list of 21 Text xticklabel objects>)
    
    image.png

    2.2 Exploring the Movie Dataset

    movie_data = sc.textFile("file:///root/ydzhao/PySpark/data/u.item")
    print (movie_data.first())
    num_movies = movie_data.count()
    print ("Movies: {0}".format(num_movies))
    
    > 1|Toy Story (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Toy%20Story%20(1995)|0|0|0|1|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0
    
    > Movies: 1682
    
    def convert_year(x):
        try:
            return int(x[-4:])
        except:
            return 1900 
    
    movie_fields = movie_data.map(lambda lines: lines.split("|"))
    years = movie_fields.map(lambda fields: fields[2]).map(lambda x: convert_year(x))
    
    #
    years_filtered = years.filter(lambda x: x != 1900)
    
    # plot the movie ages histogram
    movie_ages = years_filtered.map(lambda yr: 1998-yr).countByValue()
    values = movie_ages.values()
    bins = movie_ages.keys()
    fig = plt.gcf()
    fig.set_size_inches(16,10)
    plt.hist(list(values), bins=list(bins), color='lightblue', normed=True)
    
    > (array([ 0.        ,  0.07575758,  0.09090909,  0.09090909,  0.18181818,
             0.18181818,  0.04545455,  0.07575758,  0.07575758,  0.03030303,
             0.        ,  0.01515152,  0.01515152,  0.03030303,  0.        ,
             0.03030303,  0.        ,  0.        ,  0.        ,  0.        ,
             0.        ,  0.        ,  0.01515152,  0.        ,  0.01515152,
             0.        ,  0.        ,  0.        ,  0.        ,  0.        ,
             0.        ,  0.        ,  0.        ,  0.        ,  0.        ,
             0.        ,  0.        ,  0.01515152,  0.        ,  0.        ,
             0.        ,  0.        ,  0.        ,  0.        ,  0.        ,
             0.        ,  0.        ,  0.        ,  0.        ,  0.        ,
             0.        ,  0.        ,  0.        ,  0.        ,  0.        ,
             0.        ,  0.        ,  0.        ,  0.        ,  0.        ,
             0.        ,  0.        ,  0.        ,  0.        ,  0.        ,
             0.01515152,  0.        ,  0.        ,  0.        ,  0.        ]),
     array([ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11, 12, 13, 14, 15, 16,
            17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33,
            34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50,
            51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67,
            68, 72, 76]),
     <a list of 70 Patch objects>)
    
    image.png

    2.3 Exploring the Rating Dataset

    rating_data_raw = sc.textFile("file:///root/ydzhao/PySpark/data/u.data")
    print (rating_data_raw.first())
    num_ratings = rating_data_raw.count()
    print ("Ratings: {0}".format(num_ratings))
    > 196   242 3   881250949
    
    > Ratings: 100000
    
    rating_data = rating_data_raw.map(lambda line: line.split("\t"))
    ratings = rating_data.map(lambda fields: int(fields[2]))
    
    max_rating = ratings.reduce(lambda x, y: max(x, y))
    min_rating = ratings.reduce(lambda x, y: min(x, y))
    mean_rating = ratings.reduce(lambda x, y: x + y) / float(num_ratings)
    median_rating = np.median(ratings.collect())
    
    ratings_per_user = num_ratings / num_users
    ratings_per_movie = num_ratings / num_movies
    
    print ("Min rating: {0}".format(min_rating))
    print ("Max rating: {0}".format(max_rating))
    print ("Average rating: {0:.3}".format(mean_rating))
    print ("Median rating: {0}".format(median_rating))
    print ("Average # of ratings per user: {0:.5}".format(ratings_per_user))
    print ("Average # of ratings per movie:{0:.4}".format(ratings_per_movie))
    
    > Min rating: 1
    Max rating: 5
    Average rating: 3.53
    Median rating: 4.0
    Average # of ratings per user: 106.04
    Average # of ratings per movie:59.45
    
    ratings.stats()
    > (count: 100000, mean: 3.5298600000000024, stdev: 1.12566797076, max: 5.0, min: 1.0)
    

    2.4 Filling in Bad or Missing Values

    years_pre_processed = movie_fields.map(lambda fields: fields[2]).map(lambda x: convert_year(x)).filter(lambda yr: yr != 1900).collect()
    years_pre_processed_arr = np.array(years_pre_processed) 
    mean_year = np.mean(years_pre_processed_arr[years_pre_processed_arr!=1900])
    median_year = np.median(years_pre_processed_arr[years_pre_processed_arr!=1900])
    
    print (np.median(years_pre_processed_arr))
    print (np.mean(years_pre_processed_arr))
    #idx_bad_data = np.where(years_pre_processed_arr==1900)[0][0]
    idx_bad_data = np.where(years_pre_processed_arr==1900)[0]
    years_pre_processed_arr[idx_bad_data] = median_year
    print ("Mean year of release: {0}".format(mean_year))
    print ("Median year of release: {0}".format(median_year))
    print ("Index of '1900' after assigning median: {0}".format(np.where(years_pre_processed_arr == 1900)[0]))
    
    输出结果:
    1995.0
    1989.38607971
    Mean year of release: 1989.3860797144557
    Median year of release: 1995.0
    Index of '1900' after assigning median: []
    

    3 Feature Extraction

    Categorical Features: 1-of-k Encoding of User Occupation

    all_occupations = user_fields.map(lambda fields: fields[3]).distinct().collect()
    all_occupations.sort()
    
    idx = 0
    all_occupations_dict = {}
    for o in all_occupations:
        all_occupations_dict[o] = idx
        idx +=1
    print ("Encoding of 'doctor': {0}".format(all_occupations_dict['doctor']))
    print ("Encoding of 'programmer': {0}".format(all_occupations_dict['programmer']))
    
    输出结果:
    Encoding of 'doctor': 2
    Encoding of 'programmer': 14
    
    K = len(all_occupations_dict)
    binary_x = np.zeros(K)
    k_programmer = all_occupations_dict['programmer']
    binary_x[k_programmer] = 1
    print ("Binary feature vector: {0}".format(binary_x))
    print ("Length of binary vector: {0}".format(K))
    
    输出结果:
    Binary feature vector: [ 0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  1.  0.  0.  0.
      0.  0.  0.]
    Length of binary vector: 21
    

    Transforming Timestamps into Categorical Features

    # a function to extract the timestamps (in seconds) from the dataset
    def extract_datetime(ts):
        import datetime
        return datetime.datetime.fromtimestamp(ts)
        
    timestamps = rating_data.map(lambda fields: int(fields[3]))
    hour_of_day = timestamps.map(lambda ts: extract_datetime(ts).hour)
    hour_of_day.take(5)
    输出结果:
    [23, 3, 15, 13, 13]
    
    # a function for assigning "time-of-day" bucket given an hour of the day
    def assign_tod(hr):
        times_of_day = {
                    'morning' : range(7, 12),
                    'lunch' : range(12, 14),
                    'afternoon' : range(14, 18),
                    'evening' : range(18, 23),
                    'night' : range(23, 7)
                    }
        for k, v in times_of_day.items():
            if hr in v: 
                return k
    
    # now apply the "time of day" function to the "hour of day" RDD
    time_of_day = hour_of_day.map(lambda hr: assign_tod(hr))
    time_of_day.take(5)
    输出结果:
    [None, None, 'afternoon', 'lunch', 'lunch']
    

    Simple Text Feature Extraction(正则表达式re模块)

    # we define a function to extract just the title from the raw movie title, removing the year of release
    def extract_title(raw):
        import re
        grps = re.search("\((\w+)\)", raw) 
        if grps:
            return raw[:grps.start()].strip()
        else:
            return raw
    
    raw_titles = movie_fields.map(lambda fields: fields[1])
    for raw_title in raw_titles.take(5):
        print (extract_title(raw_title))
    
    输出结果:
    Toy Story
    GoldenEye
    Four Rooms
    Get Shorty
    Copycat
    
    movie_titles = raw_titles.map(lambda m: extract_title(m))
    title_terms = movie_titles.map(lambda t: t.split(" "))
    print (title_terms.take(5))
    > [['Toy', 'Story'], ['GoldenEye'], ['Four', 'Rooms'], ['Get', 'Shorty'], ['Copycat']]
    
    all_terms = title_terms.flatMap(lambda x: x).distinct().collect()
    # create a new dictionary to hold the terms, and assign the "1-of-k" indexes
    idx = 0
    all_terms_dict = {}
    for term in all_terms:
        all_terms_dict[term] = idx
        idx +=1
    num_terms = len(all_terms_dict)
    print ("Total number of terms: {0}".format(num_terms))
    print ("Index of term 'Dead': {0}".format(all_terms_dict['Dead']))
    print ("Index of term 'Rooms': {0}".format(all_terms_dict['Rooms']))
    
    输出结果:
    Total number of terms: 2645
    Index of term 'Dead': 1493
    Index of term 'Rooms': 96
    
    # we could also use Spark's 'zipWithIndex' RDD function to create the term dictionary
    all_terms_dict2 = title_terms.flatMap(lambda x: x).distinct().zipWithIndex().collectAsMap()
    print ("Index of term 'Dead': {0}".format(all_terms_dict2['Dead']))
    print ("Index of term 'Rooms': {0}".format(all_terms_dict2['Rooms']))
    
    输出结果:
    Index of term 'Dead': 1493
    Index of term 'Rooms': 96
    

    相关文章

      网友评论

        本文标题:案例2—RDD数据清洗

        本文链接:https://www.haomeiwen.com/subject/lqixoxtx.html