import sys
reload(sys)
sys.setdefaultencoding("utf-8")
from pyspark.sql import SparkSession
import pyspark.sql.functions as func
import datetime
from pyspark.sql.types import *
def transform_time(timestamp):
return datetime.datetime.utcfromtimestamp(float(timestamp)).strftime("%Y%m")
def filter_func(row):
ss = row[1].FILM_TPYE.encode("utf-8").lower()
if row[0].lower() in ss:
return row
def sort_func(k,v):
tmp_list = []
for tu in v:
tmp_list.append(tu)
res_list = sorted(tmp_list,key=lambda x:x[2],reverse=True)
return res_list[0:10]
if __name__ == "__main__":
spark = SparkSession.builder \
.master("local") \
.appName("Dataloader") \
.getOrCreate()
mongo_read_uri = "mongodb://127.0.0.1:27017/raw_data"
mongo_Statistic_uri = "mongodb://127.0.0.1:27017/Statistic_rec"
rating_table = "ratings"
movies_table = "movies"
ratings_df = spark.read \
.option("uri",mongo_read_uri) \
.option("collection",rating_table) \
.format("com.mongodb.spark.sql") \
.load()
transform_time_udf = func.udf(transform_time,StringType())
ratings_df = ratings_df.withColumn("YearMonth",transform_time_udf("TIMESTAMP"))
movies_df = spark.read \
.option("uri",mongo_read_uri) \
.option("collection",movies_table) \
.format("com.mongodb.spark.sql") \
.load()
ratings_df.createOrReplaceTempView("ratings")
movies_df.createOrReplaceTempView("movies")
# sum the movie counts
RateMoreMovies = spark.sql("select MOVIE_ID, count(MOVIE_ID) as Count from ratings group by MOVIE_ID")
RateMoreMovies.write \
.format("com.mongodb.spark.sql") \
.option("uri",mongo_Statistic_uri) \
.option("collection","RateMoreMovies") \
.mode("overwrite") \
.save()
#sum the movie count mongthly
RateMoreRecentlyMovies = spark.sql("select MOVIE_ID,YearMonth,count(MOVIE_ID) from ratings group by YearMonth,MOVIE_ID")
RateMoreRecentlyMovies.write \
.format("com.mongodb.spark.sql") \
.option("uri",mongo_Statistic_uri) \
.option("collection","RateMoreRecentlyMovies") \
.mode("overwrite") \
.save()
#sum the avrage score of each film
AverageMovies = spark.sql("select MOVIE_ID,avg(RATING_SCORE) as Avg_score from ratings group by MOVIE_ID")
AverageMovies.write \
.format("com.mongodb.spark.sql") \
.option("uri",mongo_Statistic_uri) \
.option("collection","AverageMovies") \
.mode("overwrite") \
.save()
#sum the top10 films of each type
movieswithScore = movies_df.join(AverageMovies,"MOVIE_ID")
genres = ["Action","Adventure","Animals","Comedy","Crime","Documentary","Drama","Family","Romance","Science","Tv","Thriller","War","Western"]
genres_rdd = spark.sparkContext.parallelize(genres)
re_rdd = genres_rdd.cartesian(movieswithScore.rdd) \
.filter(filter_func) \
.map(lambda row:(row[0],row[1].MOVIE_ID,row[1].Avg_score)) \
.groupBy(lambda x:x[0]) \
.map(lambda (k,v):sort_func(k,v)) \
.flatMap(lambda tu:tu)
schema_label = [
("Genres",StringType()),
("MOVIE_ID",StringType()),
("Avg_score",StringType())
]
schema = StructType([StructField(e[0],e[1],False) for e in schema_label])
GenresTopMovies = spark.createDataFrame(re_rdd,schema)
GenresTopMovies.write \
.format("com.mongodb.spark.sql") \
.option("uri",mongo_Statistic_uri) \
.option("collection","GenresTopMovies") \
.mode("overwrite") \
.save()
#stop spark
spark.stop()
网友评论