美文网首页
Spark读写MongoDB数据并统计

Spark读写MongoDB数据并统计

作者: 枫隐_5f5f | 来源:发表于2019-05-04 12:59 被阅读0次
import sys
reload(sys)
sys.setdefaultencoding("utf-8")

from pyspark.sql import SparkSession
import pyspark.sql.functions as func
import datetime
from pyspark.sql.types import *


def transform_time(timestamp):
    return datetime.datetime.utcfromtimestamp(float(timestamp)).strftime("%Y%m")

def filter_func(row):
    ss = row[1].FILM_TPYE.encode("utf-8").lower()
    if row[0].lower() in ss:
        return row  

def sort_func(k,v):
    tmp_list = []
    for tu in v:
        tmp_list.append(tu)
    res_list = sorted(tmp_list,key=lambda x:x[2],reverse=True)
    return res_list[0:10]



if __name__ == "__main__":
    spark = SparkSession.builder \
        .master("local") \
        .appName("Dataloader") \
        .getOrCreate()
    
    
    mongo_read_uri = "mongodb://127.0.0.1:27017/raw_data"
    mongo_Statistic_uri = "mongodb://127.0.0.1:27017/Statistic_rec"
    rating_table = "ratings"
    movies_table = "movies"
    ratings_df = spark.read \
            .option("uri",mongo_read_uri) \
            .option("collection",rating_table) \
            .format("com.mongodb.spark.sql") \
            .load()
        
    transform_time_udf = func.udf(transform_time,StringType())
    ratings_df = ratings_df.withColumn("YearMonth",transform_time_udf("TIMESTAMP")) 
    
    
    movies_df = spark.read \
            .option("uri",mongo_read_uri) \
            .option("collection",movies_table) \
            .format("com.mongodb.spark.sql") \
            .load()
    
    
    ratings_df.createOrReplaceTempView("ratings")
    movies_df.createOrReplaceTempView("movies")
    
    # sum the movie counts
    RateMoreMovies = spark.sql("select MOVIE_ID, count(MOVIE_ID) as Count from ratings group by MOVIE_ID")
    RateMoreMovies.write \
        .format("com.mongodb.spark.sql") \
        .option("uri",mongo_Statistic_uri) \
        .option("collection","RateMoreMovies") \
        .mode("overwrite") \
        .save()

    #sum the movie count mongthly
    RateMoreRecentlyMovies = spark.sql("select MOVIE_ID,YearMonth,count(MOVIE_ID) from ratings group by YearMonth,MOVIE_ID")
    RateMoreRecentlyMovies.write \
                .format("com.mongodb.spark.sql") \
                .option("uri",mongo_Statistic_uri) \
                .option("collection","RateMoreRecentlyMovies") \
                .mode("overwrite") \
                .save()

    #sum the avrage score of each film 
        AverageMovies = spark.sql("select MOVIE_ID,avg(RATING_SCORE) as Avg_score from ratings group by MOVIE_ID")
    AverageMovies.write \
        .format("com.mongodb.spark.sql") \
        .option("uri",mongo_Statistic_uri) \
        .option("collection","AverageMovies") \
        .mode("overwrite") \
        .save()

    #sum the top10 films of each type
    movieswithScore = movies_df.join(AverageMovies,"MOVIE_ID")
    genres = ["Action","Adventure","Animals","Comedy","Crime","Documentary","Drama","Family","Romance","Science","Tv","Thriller","War","Western"]
    genres_rdd = spark.sparkContext.parallelize(genres)
    re_rdd = genres_rdd.cartesian(movieswithScore.rdd) \
        .filter(filter_func) \
        .map(lambda row:(row[0],row[1].MOVIE_ID,row[1].Avg_score)) \
        .groupBy(lambda x:x[0]) \
        .map(lambda (k,v):sort_func(k,v)) \
        .flatMap(lambda tu:tu)

    schema_label = [
        ("Genres",StringType()),
        ("MOVIE_ID",StringType()),
        ("Avg_score",StringType())
    ]

    schema = StructType([StructField(e[0],e[1],False) for e in schema_label])
    GenresTopMovies = spark.createDataFrame(re_rdd,schema)
    
    GenresTopMovies.write \
        .format("com.mongodb.spark.sql") \
        .option("uri",mongo_Statistic_uri) \
        .option("collection","GenresTopMovies") \
        .mode("overwrite") \
        .save()

    #stop spark
    spark.stop()

相关文章

网友评论

      本文标题:Spark读写MongoDB数据并统计

      本文链接:https://www.haomeiwen.com/subject/xlexoqtx.html