美文网首页
Spark MLlib ALS 推荐系统

Spark MLlib ALS 推荐系统

作者: 博弈史密斯 | 来源:发表于2018-06-06 16:35 被阅读0次

Spark 机器学习库从 1.2 版本以后被分为两个包:

  • spark.mllib包含基于RDD的原始算法API。Spark MLlib 历史比较长,在1.0 以前的版本即已经包含了,提供的算法实现都是基于原始的 RDD。
  • spark.ml 则提供了基于DataFrames 高层次的API,可以用来构建机器学习工作流(PipeLine)。ML Pipeline 弥补了原始 MLlib 库的不足,向用户提供了一个基于 DataFrame 的机器学习工作流式 API 套件。

从Spark2.0开始,基于RDD的API进入维护模式(即不增加任何新的特性),并预期于3.0版本的时候被移除出MLLib。因此,我们将以ml包为主进行介绍。

一个典型的机器学习过程从数据收集开始,要经历多个步骤,才能得到需要的输出。这非常类似于流水线式工作,即通常会包含源数据ETL(抽取、转化、加载),数据预处理,指标提取,模型训练与交叉验证,新数据预测等步骤。

在介绍工作流之前,我们先来了解几个重要概念:

  • DataFrame:使用Spark SQL中的DataFrame作为数据集,它可以容纳各种数据类型。 较之 RDD,包含了 schema 信息,更类似传统数据库中的二维表格。它被 ML Pipeline 用来存储源数据。例如,DataFrame中的列可以是存储的文本,特征向量,真实标签和预测的标签等。

  • Transformer:翻译成转换器,是一种可以将一个DataFrame转换为另一个DataFrame的算法。比如一个模型就是一个 Transformer。它可以把 一个不包含预测标签的测试数据集 DataFrame 打上标签,转化成另一个包含预测标签的 DataFrame。技术上,Transformer实现了一个方法transform(),它通过附加一个或多个列将一个DataFrame转换为另一个DataFrame。

  • Estimator:翻译成估计器或评估器,它是学习算法或在训练数据上的训练方法的概念抽象。在 Pipeline 里通常是被用来操作 DataFrame 数据并生产一个 Transformer。从技术上讲,Estimator实现了一个方法fit(),它接受一个DataFrame并产生一个转换器。如一个随机森林算法就是一个 Estimator,它可以调用fit(),通过训练特征数据而得到一个随机森林模型。

  • Parameter:Parameter 被用来设置 Transformer 或者 Estimator 的参数。现在,所有转换器和估计器可共享用于指定参数的公共API。ParamMap是一组(参数,值)对。

  • PipeLine:翻译为工作流或者管道。工作流将多个工作流阶段(转换器和估计器)连接在一起,形成机器学习的工作流,并获得结果输出。

工作流如何工作

要构建一个 Pipeline工作流,首先需要定义 Pipeline 中的各个工作流阶段PipelineStage,(包括转换器和评估器),比如指标提取和转换模型训练等。有了这些处理特定问题的转换器和 评估器,就可以按照具体的处理逻辑有序的组织PipelineStages 并创建一个Pipeline。比如:

val pipeline = new Pipeline().setStages(Array(stage1,stage2,stage3,…))

然后就可以把训练数据集作为输入参数,调用 Pipeline 实例的 fit 方法来开始以流的方式来处理源训练数据。这个调用会返回一个 PipelineModel 类实例,进而被用来预测测试数据的标签。更具体的说,工作流的各个阶段按顺序运行,输入的DataFrame在它通过每个阶段时被转换。 对于Transformer阶段,在DataFrame上调用transform()方法。 对于估计器阶段,调用fit()方法来生成一个转换器(它成为PipelineModel的一部分或拟合的Pipeline),并且在DataFrame上调用该转换器的transform()方法。


ml-pipeline

上面,顶行表示具有三个阶段的流水线。 前两个(Tokenizer和HashingTF)是Transformers(蓝色),第三个(LogisticRegression)是Estimator(红色)。 底行表示流经管线的数据,其中圆柱表示DataFrames。 在原始DataFrame上调用Pipeline.fit()方法,它具有原始文本文档和标签。 Tokenizer.transform()方法将原始文本文档拆分为单词,向DataFrame添加一个带有单词的新列。 HashingTF.transform()方法将字列转换为特征向量,向这些向量添加一个新列到DataFrame。 现在,由于LogisticRegression是一个Estimator,Pipeline首先调用LogisticRegression.fit()产生一个LogisticRegressionModel。 如果流水线有更多的阶段,则在将DataFrame传递到下一个阶段之前,将在DataFrame上调用LogisticRegressionModel的transform()方法。

SparkSession是spark2.0的全新切入点,用以替代 sparkcontext ,StreamingContext,sqlContext,HiveContext。

直接隐式反馈。

al Array(training, test) = ratings.randomSplit(Array(0.8, 0.2))

val alsImplicit = new ALS().setMaxIter(5).setRegParam(0.01).setImplicitPrefs(true). 
    setUserCol("userId").setItemCol("movieId").setRatingCol("rating")

​ 在 ML 中的实现有如下的参数:

  • numBlocks 是用于并行化计算的用户和商品的分块个数 (默认为10)。
  • rank 是模型中隐义因子的个数(默认为10)。
  • maxIter 是迭代的次数(默认为10)。
  • regParam 是ALS的正则化参数(默认为1.0)。
  • implicitPrefs 决定了是用显性反馈ALS的版本还是用适用隐性反馈数据集的版本(默认是false,即用显性反馈)。
  • alpha 是一个针对于隐性反馈 ALS 版本的参数,这个参数决定了偏好行为强度的基准(默认为1.0)。
  • nonnegative 决定是否对最小二乘法使用非负的限制(默认为false)。

可以调整这些参数,不断优化结果,使均方差变小。比如:maxIter越大,regParam越 小,均方差会越小,推荐结果较优。

  • userCol:用户列的名字,String类型。对应于后续调用fit()操作时输入的- Dataset<Row>入参时用户id所在schema中的name
  • itemCol:item列的名字,String类型。对应于后续调用fit()操作时输入的Dataset<Row>入参时item id所在schema中的name
  • ratingCol:rating列的名字,String类型。对应于后续调用fit()操作时输入的Dataset<Row>入参时rating值所在schema中的name
  • predictionCol:String类型。做transform()操作时输出的预测值在Dataset<Row>结果的schema中的name,默认是“prediction”
  • coldStartStrategy:String类型。有两个取值"nan" or "drop"。这个参数指示用在prediction阶段时遇到未知或者新加入的user或item时的处理策略。尤其是在交叉验证或者生产场景中,遇到没有在训练集中出现的user/item id时。"nan"表示对于未知id的prediction结果为NaN。"drop"表示对于transform()的入参DataFrame中出现未知ids的行,将会在包含prediction的返回DataFrame中被drop。默认值是"nan"

接下来,把推荐模型放在训练数据上训练:
val modelImplicit = alsImplicit.fit(training)

使用训练好的推荐模型对测试集中的用户商品进行预测评分,得到预测评分的数据集:
val predictionsImplicit = modelImplicit.transform(test)

结果:

predictionsImplicit.show()
 
+------+-------+------+----------+-----------+
|userId|movieId|rating| timestamp| prediction|
+------+-------+------+----------+-----------+
|    13|     31|   1.0|1424380312| 0.33150947|
|     5|     31|   1.0|1424380312|-0.24669354|
|    24|     31|   1.0|1424380312|-0.22434244|
|    29|     31|   1.0|1424380312| 0.15776125|
|     0|     31|   1.0|1424380312| 0.51940984|
|    28|     85|   1.0|1424380312| 0.88610375|
|    13|     85|   1.0|1424380312| 0.15872183|
|    20|     85|   2.0|1424380312| 0.64086926|
|     4|     85|   1.0|1424380312|-0.06314563|
|     8|     85|   5.0|1424380312|  0.2783457|
|     7|     85|   4.0|1424380312|  0.1618208|
|    29|     85|   1.0|1424380312|-0.19970453|
|    19|     65|   1.0|1424380312| 0.11606887|
|     4|     65|   1.0|1424380312|0.068018675|
|     2|     65|   1.0|1424380312| 0.28533924|
|    12|     53|   1.0|1424380312| 0.42327875|
|    20|     53|   3.0|1424380312| 0.17345423|
|    19|     53|   2.0|1424380312| 0.33321634|
|     8|     53|   5.0|1424380312| 0.10090684|
|    23|     53|   1.0|1424380312| 0.06724724|
+------+-------+------+----------+-----------+
only showing top 20 rows                   
模型评估

​ 通过计算模型的均方根误差来对模型进行评估,均方根误差越小,模型越准确:

val evaluator = new RegressionEvaluator().setMetricName("rmse").setLabelCol("rating"). 
    setPredictionCol("prediction")

val rmseImplicit = evaluator.evaluate(predictionsImplicit)
//  rmseImplicit: Double = 1.8011620822359165

可以看到打分的均方差值为1.69和1.80左右。由于本例的数据量很少,预测的结果和实际相比有一定的差距。

使用推荐系统

ALS fit 方法返回 ALSModel,其有 recommendForAllUsers(numItems: Int) 和 recommendForAllItems(numUsers: Int) 方法,用于推荐。

例子
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.recommendation.ALS

case class Rating(userId: Int, movieId: Int, rating: Float, timestamp: Long)
def parseRating(str: String): Rating = {
  val fields = str.split("::")
  assert(fields.size == 4)
  Rating(fields(0).toInt, fields(1).toInt, fields(2).toFloat, fields(3).toLong)
}

val ratings = spark.read.textFile("data/mllib/als/sample_movielens_ratings.txt")
  .map(parseRating)
  .toDF()
val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2))

// Build the recommendation model using ALS on the training data
val als = new ALS()
  .setMaxIter(5)
  .setRegParam(0.01)
  .setUserCol("userId")
  .setItemCol("movieId")
  .setRatingCol("rating")
val model = als.fit(training)

// Evaluate the model by computing the RMSE on the test data
// Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
model.setColdStartStrategy("drop")
val predictions = model.transform(test)

val evaluator = new RegressionEvaluator()
  .setMetricName("rmse")
  .setLabelCol("rating")
  .setPredictionCol("prediction")
val rmse = evaluator.evaluate(predictions)
println(s"Root-mean-square error = $rmse")

// Generate top 10 movie recommendations for each user
val userRecs = model.recommendForAllUsers(10)
// Generate top 10 user recommendations for each movie
val movieRecs = model.recommendForAllItems(10)

// Generate top 10 movie recommendations for a specified set of users
val users = ratings.select(als.getUserCol).distinct().limit(3)
val userSubsetRecs = model.recommendForUserSubset(users, 10)
// Generate top 10 user recommendations for a specified set of movies
val movies = ratings.select(als.getItemCol).distinct().limit(3)
val movieSubSetRecs = model.recommendForItemSubset(movies, 10)

相关文章

网友评论

      本文标题:Spark MLlib ALS 推荐系统

      本文链接:https://www.haomeiwen.com/subject/xaybsftx.html