美文网首页
Hadoop - Spark & PySpark

Hadoop - Spark & PySpark

作者: Xiangyuan_Ren | 来源:发表于2018-01-02 15:28 被阅读0次

    Spark - Concept:

    image.png
    image.png
    • Component :


      image.png
    • Python vs. Scala


      image.png

    RDD Concept:

    • SparkContext:


      image.png
    image.png
    • Transform RDD's


      image.png
    • Example:

    rdd= sc.parallelize([1,2,3,4])
    squareRDD = rdd.map(lambda x:x*x)

    • Lazy evaluation:
      No changes until actions been called

    • Config:


      image.png
    • PySpark -> Worst Movie:

    from pyspark import SparkConf, SparkContext
    
    # This function just creates a Python "dictionary" we can later
    # use to convert movie ID's to movie names while printing out
    # the final results.
    def loadMovieNames():
        movieNames = {}
        with open("ml-100k/u.item") as f:
            for line in f:
                fields = line.split('|')
                movieNames[int(fields[0])] = fields[1]
        return movieNames
    
    # Take each line of u.data and convert it to (movieID, (rating, 1.0))
    # This way we can then add up all the ratings for each movie, and
    # the total number of ratings for each movie (which lets us compute the average)
    def parseInput(line):
        fields = line.split()
        return (int(fields[1]), (float(fields[2]), 1.0))
    
    if __name__ == "__main__":
        # The main script - create our SparkContext
        conf = SparkConf().setAppName("WorstMovies")
        sc = SparkContext(conf = conf)
    
        # Load up our movie ID -> movie name lookup table
        movieNames = loadMovieNames()
    
        # Load up the raw u.data file
        lines = sc.textFile("hdfs:///user/maria_dev/ml-100k/u.data")
    
        # Convert to (movieID, (rating, 1.0))
        movieRatings = lines.map(parseInput)
    
        # Reduce to (movieID, (sumOfRatings, totalRatings))
        ratingTotalsAndCount = movieRatings.reduceByKey(lambda movie1, movie2: ( movie1[0] + movie2[0], movie1[1] + movie2[1] ) )
    
        # Map to (rating, averageRating)
        averageRatings = ratingTotalsAndCount.mapValues(lambda totalAndCount : totalAndCount[0] / totalAndCount[1])
    
        # Sort by average rating
        sortedMovies = averageRatings.sortBy(lambda x: x[1])
    
        # Take the top 10 results
        results = sortedMovies.take(10)
    
        # Print them out:
        for result in results:
            print(movieNames[result[0]], result[1])
    

    Then Submit:

    spark-submit LowestRatedMovie.py

    Result:

    ('3 Ninjas: High Noon At Mega Mountain (1998)', 1.0)
    ('Beyond Bedlam (1993)', 1.0)
    ('Power 98 (1995)', 1.0)
    ('Bloody Child, The (1996)', 1.0)
    ('Amityville: Dollhouse (1996)', 1.0)
    ('Babyfever (1994)', 1.0)
    ('Homage (1995)', 1.0)
    ('Somebody to Love (1994)', 1.0)
    ('Crude Oasis, The (1995)', 1.0)
    ('Every Other Weekend (1990)', 1.0)

    相关文章

      网友评论

          本文标题:Hadoop - Spark & PySpark

          本文链接:https://www.haomeiwen.com/subject/cabfnxtx.html