美文网首页推荐系统
推荐系统之SparkML实现协同过滤ALS

推荐系统之SparkML实现协同过滤ALS

作者: 勇于自信 | 来源:发表于2020-02-23 23:31 被阅读0次

    1.Spark MLlib概述

    1.是什么:spark MLLib 是基于Spark 引擎实现的机器学习算法库,spark开发应用都是基于RDD的,而MLLib是基于底层RDD实现的上层更高级的API
    2.特点:
    扩展性和容错性,扩展性完全是基于Spark底层结构以及Spark运行环境来实现的
    扩展性:可以扩展的机器学习库,每新发布一个版本都会新增一些机器学习算法
    3.算法库:它有一系列的机器学习算法和实用程序组成,目前涵盖了常见的分类、回归、聚类、协同过滤、关联规则等等算法,具体如下:

    • 存储:保存和加载算法、模型及管道
    • 基本统计:概括统计、相关性、分层取样、假设检验、随机数生成
    • 回归包括:线性回归、逻辑回归、岭回归、保序回归。
    • 分类算法:主要包括有贝叶斯分类、线性二元SVM分类、逻辑回归分类、决策树、随机森林、梯度增强树
    • 协同过滤: (ALS) (交替最小二乘法(ALS) )
    • 降维:(SVD) 奇异值分解、 (PCA) 主成分分析
    • 聚类:K-means聚类、LDA主题模型
    • 关联规则:FP-Growth。
    • 特征工程:特征提取、特征转换、特征选择以及降维。
    • 管道:构造、评估和调整的管道的工具。
    • 存储:保存和加载算法、模型及管道
    • 实用工具:线性代数,统计,数据处理等。
    • 优化部分:随机梯度下降、(L-BFGS) 短时记忆的BFGS (拟牛顿法中的一种,解决非线性问题)
    

    在这里,我们只讲推荐应用的ALS协同过滤及其实现。

    2.MLlib协同过滤算法:ALS

    2.1算法原理

    详细讲解:
    1.什么是ALS:ALS直接翻译过来就是交替最小二乘,目光更多关注在交替这两个字上
    2.回顾协同过滤:有基于user-base和item-base的。现在我们希望把这两个角度同时进行考虑,相当于得到的CF不仅可以学到一些用户背后一些隐含的特征,它也可以从物品的角度学习它的隐含特征,所以呢,把这个算法也叫作混合CF,主要是考虑把User和Item两个方向同时考虑进去了。
    我们平常见到的UI矩阵如下图:



    这个矩阵有很多空格,是个稀疏矩阵,假设这是一个M*N的矩阵,即左边是M个User,上边是N个Item的打分矩阵。我们知道,这个矩阵的尺寸是非常大的,因为对于一个中企来说动不动用户量就是上亿了。这里有个好奇的问题,中国大概才有13亿人口,为什么每家企业都会有上亿的用户?因为可能一个人会有多个账号,有可能手机一个账号,PC一个账号,有可能手机有多个微信,有可能不需要注册账号,每个浏览器背后都有一个账号,所以一个人可能对应着多个不同的账户。
    我们面对这么大的数据量呢,其实我们可以把它进行分解,分解成两个部分来组成,即两个小矩阵,一个是UXK的这么一个矩阵,还有一个是KXI这么个矩阵。这两个矩阵中,第一个矩阵的User维度不变,第二个矩阵的Item维度不变,而K可能就比较小,从几维,到几百维,甚至几千维这个尺寸就可以了,甚至再不济给几万维,对于User和Item来说也是小很多的。总之呢,是把UI这个矩阵拆分成UK和KI两个小矩阵相乘的方式。分解之后如下图:



    有了两个小矩阵,对于我们的计算来说,就得到了非常的便利和快速。这里,我们不管去看上边这个矩阵,还是下边这个矩阵,都可以方便地表达每个用户和物品。比如说对于左边的UK矩阵,随便给你个User,你都能在这个矩阵上映射到某一行,而且这一行呢,是由K个维度的向量来组成的,有K个特征,而且每个特征向量都是稠密,不像上边的矩阵向量是稀疏的,即每一个User,都可以用K个向量来表示。右边矩阵同理,我们可以对一个物品通过一个K维的向量来表示。
    假设说,如果user可以用一个向量来表示(当然它是稠密的),Item也可以用一个向量来表示(当然它也是稠密的),分别为Vector A和Vector B。那么这个user对这个item的打分应该如何计算?应该是AXB,相当于这两个向量做一个内积。每个维度相乘之后,应该把结果再相加。即sum(AXB)=AB
    目标

    我们得到了UK和KI两个矩阵,那么其实我们可以用两个小矩阵来还原最原始的UI矩阵,但是我们能完美的100%地还原吗?理论上是可以的,实际上有点难度。实际上我们得到这两个矩阵到底长什么样子,只要是维度定义好就可以,然后这两个矩阵相乘得到一个UI矩阵,比如我们真实的打分矩阵R,R'是预测得到一个矩阵,如果R和R'很相似的话,那么我们是不是可以把R'基本的替代R这个矩阵呢。所以我们的目标是尽可能让两个小矩阵相乘,变成一个大矩阵,这个大矩阵和真实的矩阵很相近就可以了。

    界定方式

    那么相似怎么来界定呢?这里就用误差来表示,两个矩阵的误差越小,代表他们之间越相似,相反越不相似。这个误差叫均方根误差(RMSE),RMSE相当于是两个矩阵,每一个cell之间相减的差别,得到一个距离,然后求平方开根号就可以了。

    目标求解

    我们的目标有了,界定方式也有了,接下来就是求A(UK)和B(KI)的两个小矩阵了。
    如果我们有了A和B,我们可以做什么呢?
    我们可以做物品推荐,就用到B矩阵,item-item=IKKI,同理也可以做用户推荐,就用到A矩阵,user-user=UKKU
    接下来我们就看如何得到A和B,我们知道K值是远小于M和N的,这样才能达到降维的目的。每一个User都有K个特征,每个特征代表什么含义呢,这个你不需要关心,你只需要认为我这个User是可以用这个向量来表示就可以了,每一个物品也可以用一个向量来表示就可以了。至于它的维度已经是个隐含因子,无法解释了。从两个小矩阵,我们得到一个式子:
    R_{m\times n} \approx X_{m\times k }Y_{n\times k}^T
    大写的R代表评分矩阵
    目标,真实矩阵和结果矩阵之间尽可能逼近
    将最小化平方误差作为损失函数:
    \mathop{min}\limits_{x_*,y_*} \sum \limits_{u,i is known}(r_{ui}-x_u^Ty_i)^2


    表示某一个用户对物品的一个打分具体单元值,真实值减预测值,我们希望目标越小越好,另外为了让我们的目标进一步稳定,我们可以引入一些正则项,后面加了一个L2正则:

    最终,得到结果两个矩阵相乘的方式:

    得到矩阵和

    为了使上式的损失最小,我们对损失函数求解最优值,求导=0,这里面有两个未知数:每一个x和y
    x_u求导,得到式子如下:


    另导数=0,得到式子1如下:

    y_i求导,得到式子2如下:


    从一开始就讲到ALS是交替最小二乘法,重点在于交替,在这里,有两个未知数,先固定x值,然后对y做下降,等它下降到一定程度后,再让y固定,让x下降。一直循环下去,直到满足收敛条件(损失最小的条件RMSE)。
    这里的x其实是代表user的向量,y代表item的向量,一开始我们只知道他们是两个向量,不知道它的值是什么,所以一开始我们给它随机初始化一下,随机给它一个向量,即求解步骤如下:
    1. 随机生成X、Y(初始化矩阵)
    2. 固定Y,更新X(公式1)
    3. 固定X,更新Y(公式2)
    4. 第2、3步循环,直到满足收敛条件(RMSE)
    2.2算法实现

    1)基于ALS(alternating least squares)的协同过滤算法,涉及参数如下:

    • numBlocks: 计算并行度(若为-1表示自动化配置)
    • Rank:模型中隐含影响因子,默认是10
    • Iterations:迭代次数,默认是10
    • Lambda:ALS中正则化参数
    • implicitPrefs:是否使用显式反馈变量或使用隐式反馈数据的变量
    • Alpha:ALS中的一个参数,作用于隐式反馈变量,控制基本的信心度
    

    2)构建流如下:

    1. 加载数据集
    2. 将数据集解析成ALS要求的格式
    3. 将数据集分割成两部分:训练集和测试集
    4. 运行ALS,产生并评估模型
    5. 将最终模型用于推荐

    3)采用数据集:使用MovieLens下的ml-1m电影数据集
    格式:



    实现代码:
    1.导入库

    import sqlContext.implicits._
    import org.apache.spark.sql.types._
    import org.apache.spark.mllib.recommendation.{ALS,MatrixFactorizationModel,Rating}
    

    2.定义类及转化rdd格式方法

    case class Movie(movieId:Int, title:String, genres:Seq[String])
    case class User(userId:Int, gender:String, age:Int, occupation:Int, zip:String)
    
    //Define parse function
    def parseMovie(str: String): Movie = {
      val fields=str.split("::")
      assert(fields.size==3)
      Movie(fields(0).toInt, fields(1).toString, Seq(fields(2)))
    }
    def parseUser(str: String): User = {
      val fields=str.split("::")
      assert(fields.size==5)
      User(fields(0).toInt, fields(1).toString, fields(2).toInt, fields(3).toInt, fields(4).toString)
    }
    def parseRating(str: String): Rating = {
      val fields=str.split("::")
      assert(fields.size==4)
      Rating(fields(0).toInt, fields(1).toInt, fields(2).toInt)
    }
    

    3.Ratings 数据简单查看

    val ratingText=sc.textFile("file:/root/data/ratings.dat")
    ratingText.first()
    val ratingRDD=ratingText.map(parseRating).cache()
    println("Total number of ratings: "+ratingRDD.count())
    println("Total number of movies rated: "+ratingRDD.map(_.product).distinct().count())
    println("Total number of users who rated movies: "+ratingRDD.map(_.user).distinct().count())
    

    打印结果如下图:


    4.创建 DataFrames,便于使用sql语句做简单分析

    val ratingDF=ratingRDD.toDF();
    val movieDF=sc.textFile("file:/root/data/movies.dat").map(parseMovie).toDF()
    val userDF=sc.textFile("file:/root/data/users.dat").map(parseUser).toDF()
    ratingDF.printSchema()
    movieDF.printSchema()
    userDF.printSchema()
    ratingDF.registerTempTable("ratings")
    movieDF.registerTempTable("movies")
    userDF.registerTempTable("users")
    

    5.利用sql语句,从rating数据集中找出评价次数最多和最少的物品情况

    val result=sqlContext.sql("""select title,rmax,rmin,ucnt
    from
    (select product, max(rating) as rmax, min(rating) as rmin, count(distinct user) as ucnt
    from ratings
    group by product) ratingsCNT
    join movies on product=movieId
    order by ucnt desc""")
    result.show()
    

    输出结果如下:


    6.利用sql查出前10个用户评价物品的次数

    val mostActiveUser=sqlContext.sql("""select user, count(*) as cnt
    from ratings group by user order by cnt desc limit 10""")
    mostActiveUser.show()
    

    输出结果如下:


    7.查询user=4169的用户,他所有评分里大于4分的评分物品的标题和评分多少

    val result=sqlContext.sql("""select distinct title, rating
    from ratings join movies on movieId=product
    where user=4169 and rating>4""")
    result.show()
    

    输出结果如下:


    8.ALS模型构建

    val splits=ratingRDD.randomSplit(Array(0.8,0.2), 0L)
    val trainingSet=splits(0).cache()
    val testSet=splits(1).cache()
    trainingSet.count()//结果见下方
    testSet.count()//结果见下方
    val model=(new ALS().setRank(20).setIterations(10).run(trainingSet))
    //简单测试模型
    val recomForTopUser=model.recommendProducts(4169,5)
    //打印出简单测试的物品标题
    val movieTitle=movieDF.map(array=>(array(0),array(1))).collectAsMap();
    val recomResult=recomForTopUser.map(rating=>(movieTitle(rating.product),rating.rating)).foreach(println)
    

    trainingSet.count()结果:799809
    testSet.count()结果:200400
    recomResult结果如下:


    9.测试数据准备

    val testUserProduct=testSet.map{
      case Rating(user,product,rating) => (user,product)
    }
    

    10.测试数据代入模型进行预测

    val testUserProductPredict=model.predict(testUserProduct)
    testUserProductPredict.take(10).mkString("\n")
    

    测试结果如下:


    11.使用公式原理,计算平均绝对误差

    val testSetPair=testSet.map{
      case Rating(user,product,rating) => ((user,product),rating)
    }
    val predictionsPair=testUserProductPredict.map{
      case Rating(user,product,rating) => ((user,product),rating)
    }
    
    val joinTestPredict=testSetPair.join(predictionsPair)
    val mae=joinTestPredict.map{
      case ((user,product),(ratingT,ratingP)) => 
      val err=ratingT-ratingP
      Math.abs(err)
    }.mean()
    

    结果如下:



    12.使用公式原理,负样本量(实际为假,预测为真),便于预估准确率

    //FP,ratingT<=1, ratingP>=4
    val fp=joinTestPredict.filter{
      case ((user,product),(ratingT,ratingP)) => 
      (ratingT <=1 & ratingP >=4)
    }
    fp.count()
    

    运行结果:
    res17: Long = 550
    在测试集为20多万的数据下,只有550个用户预测错误,模型准确率还是相对较高。
    12.使用MLlib自带库,计算均方根误差

    import org.apache.spark.mllib.evaluation._
    val ratingTP=joinTestPredict.map{
      case ((user,product),(ratingT,ratingP))=>
      (ratingP,ratingT)
    }
    val evalutor=new RegressionMetrics(ratingTP)
    evalutor.meanAbsoluteError
    evalutor.rootMeanSquaredError
    

    运行结果如下:



    从结果可以看出,平均绝对误差在0.72左右,而均方根有0.94,因为其原理,会比平均绝对误差差距会更大一些。

    相关文章

      网友评论

        本文标题:推荐系统之SparkML实现协同过滤ALS

        本文链接:https://www.haomeiwen.com/subject/ncreqhtx.html