美文网首页
【记录|Spark】简单的电影推荐系统

【记录|Spark】简单的电影推荐系统

作者: XXX被用了 | 来源:发表于2017-10-29 11:09 被阅读0次

    为了学习spark,在实验楼上找到的一个spark入门课程,在此记录一下学习过程。

    我使用的Spark版本为Spark 2.2.0, 实验楼教程使用的是Spark 1.6.1

    流程和算法介绍

    这个简单的电影推荐系统是根据已有用户对电影的评价系统,针对特定用户输出其可能会感兴趣的电影,构成一个简单的电影推荐系统。

    主要步骤

    • 加载数据集,解析成特定格式
    • 划分数据集,分为训练集和测试集
    • 利用交替最小二乘法(ALS)算法,训练用户与电影之间的矩阵模型
    • 基于训练集进行预测,利用测试集来验证预测结果是否有效。

    实际上,上述步骤的第三四步是使用了协同过滤算法来推荐电影。
    引用知乎上的回答解释协同过滤

    举个简单的小例子
    我们已知道用户u1喜欢的电影是A,B,C
    用户u2喜欢的电影是A, C, E, F
    用户u3喜欢的电影是B,D
    我们需要解决的问题是:决定对u1是不是应该推荐F这部电影。
    基于内容的做法:要分析F的特征和u1所喜欢的A、B、C的特征,需要知道的信息是A(战争片),B(战争片),C(剧情片),如果F(战争片),那么F很大程度上可以推荐给u1,这是基于内容的做法,你需要对item进行特征建立和建模。

    协同过滤的办法:那么你完全可以忽略item的建模,因为这种办法的决策是依赖user和item之间的关系,也就是这里的用户和电影之间的关系。我们不再需要知道ABCF哪些是战争片,哪些是剧情片,我们只需要知道用户u1和u2按照item向量表示,他们的相似度比较高,那么我们可以把u2所喜欢的F这部影片推荐给u1。

    在Spark MLlib中,协同过滤算法是通过交替最小二乘法(ALS)实现的,具体算法实现在此并不关注。

    数据集

    数据集来自GroupLens,是一个名为MovieLens的数据集的数据,在此处选择数据量为一百万条的数据集,下载地址

    具体代码和分析

    1.导入包

    我们需要导入以下包

    import org.apache.spark.rdd._
    import org.apache.spark.sql._
    import org.apache.spark.mllib.recommendation.Rating
    import org.apache.spark.mllib.recommendation.ALS
    import org.apache.spark.mllib.recommendation.MatrixFactorizationModel
    

    mllib包是Spark中的机器学习包,我们这次导入的有ALS,MatrixFactorizationModel,Rating。ALS即为上文提到的交替最小二乘算法,在Spark中ALS算法的返回结果为MatrixFactorizationModel类,最后的Rating是Spark定义的评价Model,对应于我们数据中的Rating.dat中的内容,不用用户再自行定义

    然后,我们还需要导入implicits包,这个是Spark中的隐式转换包,可以自动地对一些数据类型进行转换,但是这个包需要在代码中动态导入

     val spark = SparkSession.builder.master("local").appName("Predict").getOrCreate()
    import spark.implicits._
    

    其中spark为SparkSession类,在Spark 2.2.0中用来代替SparkContext,作为整个程序的入口点

    2.数据处理

    • 定义电影、用户数据实体类,用来映射对应的数据
    case class Movie(movieId: Int, title: String)
    case class User(userId: Int, gender: String, age: Int, occupation: Int, zipCode: String)
    
    • 定义解析函数,将数据从文件中解析出来
    def parseMovieData(data: String): Movie = {
            val dataField = data.split("::")
            assert(dataField.size == 3)
            Movie(dataField(0).toInt, dataField(1))
        }
    
    def parseUserData(data: String): User = {
            val dataField = data.split("::")
            assert(dataField.size == 5)
            User(dataField(0).toInt, dataField(1).toString, dataField(2).toInt, dataField(3).toInt, dataField(4).toString)
        }
    
    def parseRatingData(data: String): Rating = {
            val dataField = data.split("::")
            Rating(dataField(0).toInt, dataField(1).toInt, dataField(2).toDouble)
        }
    
    
    • 导入数据
    var moviesData = spark.read.textFile("File:///home/hadoop/ml-1m/movies.dat").map(parseMovieData).cache()
            
    var usersData = spark.read.textFile("File:///home/hadoop/ml-1m/users.dat").map(parseUserData).cache()
    
    var ratingsData = spark.read.textFile("File:///home/hadoop/ml-1m/ratings.dat").map(parseRatingData).cache()
    

    3. 训练模型

    // convert to DataFrame
    val moviesDF = moviesData.toDF()
    val usersDF = usersData.toDF()
    val ratingsDF = ratingsData.toDF()
    
    // split to data set and test set
    val tempPartitions = ratingsData.randomSplit(Array(0.7, 0.3), 1024L)
    val trainingSetOfRatingsData = tempPartitions(0).cache().rdd
    val testSetOfRatingData = tempPartitions(1).cache().rdd
    
    // training model
    val recomModel = new ALS().setRank(20).setIterations(10).run(trainingSetOfRatingsData)
    

    按7:3的比例将数据集分为训练集和验证集,由于划分出来的数据集为DataSet类型,而ALS算法的run函数接收的参数为RDD类型,所以需要将DataSet转换为RDD,方法很简单,就加上”.rdd"就可以了,如果不转换会报错


    spark_error_5.PNG

    训练完之后可以调用模型进行推荐,比如要给用户ID为1000的用户推荐适合TA看的10部电影,就可以执行

    val recomResult = recomModel.recommendProducts(1000, 10)
    

    结果如下

    运行结果

    返回的结果包括用户ID,电影ID,和对应的相关性
    如果我们要显示电影名,可以执行以下代码

    val movieTitles = moviesDF.as[(Int, String)].rdd.collectAsMap()
    val recommendMoviesWithTitle = recomResult.map(rating =>(movieTitles(rating.product), rating.rating))
    println(recommendMoviesWithTitle.mkString("\n"))
    

    在Spark老版本中,可以直接使用

    val movieTitles = moviesDF.map(array => (array(0), array(1))).collectAsMap()
    

    将moviesDF转换为key为电影ID,value为电影名的map,但是在2.2.0中,如果这样写会提示DataSet没有collectAsMap()方法,错误截图如下

    错误截图

    经过一番搜索后,在StackOverflow上有人提到RDD有collectAsMap()方法,于是就要将moviesDF转换为RDD类型,即上文用到的方法

    打印出来的结果如图

    转换结果

    4.验证模型

    如何知道模型是否正确呢?可以用之前从数据集里面划分出来的验证集,通过调用模型得出预测结果,与验证集中的原数据进行对比,可以判断模型的效果如何

    val predictResultOfTestSet = recomModel.predict(testSetOfRatingData.map{
        case Rating(user, product, rating) => (user, product)
    })
    val formatResultOfTestSet = testSetOfRatingData.map{
        case Rating(user, product, rating) => ((user, product), rating)
    }
    val formatResultOfPredictionResult = predictResultOfTestSet.map {
        case Rating(user, product, rating) => ((user, product), rating)
    }
    val finalResultForComparison = formatResultOfPredictionResult.join(formatResultOfTestSet)
    val MAE = finalResultForComparison.map {
        case ((user, product), (ratingOfTest, ratingOfPrediction)) => 
        val error = (ratingOfTest - ratingOfPrediction)
        Math.abs(error)
    }.mean()
    

    在得到测试集的预测评分结果之后,我们用 map 操作和 join 操作将它与测试集的原始数据组合成为 ((用户ID, 电影ID), (测试集原有评分, 预测评分))的格式。这个格式是 Key-Value 形式的,Key 为 (user, product)。我们是要把这里的测试集原有评分与预测时得到的评分相比较,二者的联系就是 user 和 product 相同。

    上述代码中首先调用模型进行预测,然后将在测试集上的预测结果和测试集本身的数据都转换为 ((user,product), rating) 的格式,之后将两个数据组合在一起,计算两者之间的评价的差值的绝对值,然后求平均值,这种方法叫做计算平均绝对误差

    平均绝对误差( Mean Absolute Error )是所有单个观测值与算术平均值偏差的绝对值的平均。
    与平均误差相比,平均绝对误差由于离差被绝对值化,不会出现正负相抵消的情况,所以平均绝对误差能更好地反映预测值误差的实际情况。

    最终算出的结果为


    image.png

    效果还算可以,如果想继续优化可以通过增加ALS的迭代次数和特征矩阵的秩来提高准确率

    完整代码

    import org.apache.spark.rdd._
    import org.apache.spark.sql._
    import org.apache.spark.mllib.recommendation.Rating
    import org.apache.spark.mllib.recommendation.ALS
    import org.apache.spark.mllib.recommendation.MatrixFactorizationModel
    
    
    object PredictMovie {
    
        case class Movie(movieId: Int, title: String)
        case class User(userId: Int, gender: String, age: Int, occupation: Int, zipCode: String)
    
    
        def parseMovieData(data: String): Movie = {
            val dataField = data.split("::")
            assert(dataField.size == 3)
            Movie(dataField(0).toInt, dataField(1))
        }
    
        def parseUserData(data: String): User = {
            val dataField = data.split("::")
            assert(dataField.size == 5)
            User(dataField(0).toInt, dataField(1).toString, dataField(2).toInt, dataField(3).toInt, dataField(4).toString)
        }
    
        def parseRatingData(data: String): Rating = {
            val dataField = data.split("::")
            Rating(dataField(0).toInt, dataField(1).toInt, dataField(2).toDouble)
        }
    
        def main(args: Array[String]){
            
            val spark = SparkSession.builder.master("local").appName("Predict").getOrCreate()
            import spark.implicits._
    
            var moviesData = spark.read.textFile("File:///home/hadoop/ml-1m/movies.dat").map(parseMovieData _).cache()
            
            var usersData = spark.read.textFile("File:///home/hadoop/ml-1m/users.dat").map(parseUserData _).cache()
            var ratingsData = spark.read.textFile("File:///home/hadoop/ml-1m/ratings.dat").map(parseRatingData _).cache()
            
            // convert to DataFrame
            val moviesDF = moviesData.toDF()
            val usersDF = usersData.toDF()
            val ratingsDF = ratingsData.toDF()
    
            // split to data set and test set
            val tempPartitions = ratingsData.randomSplit(Array(0.7, 0.3), 1024L)
            val trainingSetOfRatingsData = tempPartitions(0).cache().rdd
            val testSetOfRatingData = tempPartitions(1).cache().rdd
    
            // training model
            val recomModel = new ALS().setRank(20).setIterations(10).run(trainingSetOfRatingsData)
    
            val recomResult = recomModel.recommendProducts(1000, 10)
            println(s"Recommend Movie to User ID 1000")
            println(recomResult.mkString("\n"))
    
            val movieTitles = moviesDF.as[(Int, String)].rdd.collectAsMap()
    
            val recommendMoviesWithTitle = recomResult.map(rating =>(movieTitles(rating.product), rating.rating))
            println(recommendMoviesWithTitle.mkString("\n"))
    
            val predictResultOfTestSet = recomModel.predict(testSetOfRatingData.map{
                case Rating(user, product, rating) => (user, product)
            })
    
            val formatResultOfTestSet = testSetOfRatingData.map{
                case Rating(user, product, rating) => ((user, product), rating)
            }
    
            val formatResultOfPredictionResult = predictResultOfTestSet.map {
                case Rating(user, product, rating) => ((user, product), rating)
            }
    
            val finalResultForComparison = formatResultOfPredictionResult.join(formatResultOfTestSet)
    
            val MAE = finalResultForComparison.map {
                case ((user, product), (ratingOfTest, ratingOfPrediction)) => 
                val error = (ratingOfTest - ratingOfPrediction)
                Math.abs(error)
            }.mean()
    
            println(s"mean error: $MAE")
            spark.stop()
        }
    
    
    }
    

    相关文章

      网友评论

          本文标题:【记录|Spark】简单的电影推荐系统

          本文链接:https://www.haomeiwen.com/subject/lvqhpxtx.html