美文网首页程序员
sparkMllib的ALS过滤算法调校参数

sparkMllib的ALS过滤算法调校参数

作者: 飞叔Brother | 来源:发表于2018-07-05 14:06 被阅读248次

对源程序的参数校调,下述如有不解,请到群内讨论:624108656

超级热心的群

源程序:源程序,上代码:


package als

import org.apache.log4j.{Level, Logger}

import org.apache.spark.mllib.recommendation.{ALS,MatrixFactorizationModel, Rating}

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

import org.joda.time.{DateTime, Duration}

/.*

* Created by Weipengfei on 2017/5/3 0003.

* ALS过滤算法调校参数

*/

object AlsEvaluation {

     /.*

     *设置日志及乱七八糟的配置

     */

     def SetLogger: Unit ={

          System.setProperty("hadoop.home.dir","E:\\John\\hadoop-2.7.3")

          Logger.getLogger("org").setLevel(Level.OFF)

          Logger.getLogger("com").setLevel(Level.OFF)

          System.setProperty("spark.ui.showConsoleProgress","false")

          Logger.getRootLogger.setLevel(Level.OFF)

     }

     /.*

     * 数据准备

     * @return返回(训练数据,评估数据,测试数据)

     */

     defPrepareData():(RDD[Rating],RDD[Rating],RDD[Rating])={

          val sc=newSparkContext(newSparkConf().setAppName("Recommend").setMaster("local[2]").set("spark.testing.memory","21474800000"))

          //创建用户评分数据

          print("开始读取用户评分数据中...")

          val rawUserData=sc.textFile("hdfs://v100:8020//weipengfei//data//u.data")

          val rawRatings=rawUserData.map(_.split("\t").take(3))

          val ratingsRDD=rawRatings.map{

               case Array(user,movie,rating) => Rating( user.toInt,movie.toInt,rating.toFloat)

          }

          println("共计:"+ratingsRDD.count().toString+"条评分")

          //创建电影ID和名称对应表

          print("开始读取电影数据中...")

          val itemRDD=sc.textFile("hdfs://v100:8020//weipengfei//data//u.item")

          val moiveTitle=itemRDD.map(_.split("\\|").take(2)).map(array=>(array(0).toInt,array(1))).collect().toMap

          //显示数据记录数

          val numRatings=ratingsRDD.count()

          val numUser=ratingsRDD.map(_.user).distinct().count()

          val numMoive=ratingsRDD.map(_.product).distinct().count()

          println("共计:评分"+numRatings+"条 用户"+numUser+"个电影"+numMoive+"个")

          //将数据分为三个部分并且返回

          print("将数据分为:")

          val Array(trainData,validationData,testData)=ratingsRDD.randomSplit(Array(0.8,0.1,0.1))

          println("训练数据:"+trainData.count()+"条评估数据:"+validationData.count()+"条 测试数据:"+testData.count()+"条")

          (trainData,validationData,testData)

     }

     /.*

     *计算RMSE值

     * @parammodel 训练模型

     * @paramvalidationData 评估数据

     * @returnRMSE值

     */

     def computeRmse(model: MatrixFactorizationModel,validationData: RDD[Rating]):(Double) ={

          val num=validationData.count();

          val predictedRDD=model.predict(validationData.map(r=>(r.user,r.product)))

          val predictedAndVali=predictedRDD.map(p=>((p.user,p.product),p.rating)).join(validationData.map(r=>((r.user,r.product),r.rating))).values

          math.sqrt(predictedAndVali.map(x=>(x._1-x._2)*(x._1-x._2)).reduce(_+_)/num)

     }

     /.*

     * 训练模型

     * @paramtrainData 训练数据

     * @paramvalidationData 评估数据

     * @paramrank 训练模型参数

     * @paramnumIterations 训练模型参数

     * @paramlambda 训练模型参数

     * @return模型返回的RMSE(该值越小,误差越小)值,训练模型所需要的时间

     */

     def trainModel(trainData: RDD[Rating],validationData: RDD[Rating], rank: Int, numIterations: Int, lambda:Double):(Double,Double)={

          val startTime=new DateTime()

          val model=ALS.train(trainData,rank,numIterations,lambda)

          val endTime=new DateTime()

          val Rmse=computeRmse(model,validationData)

          val duration=new Duration(startTime,endTime)

          println(f"训练参数:rank:$rank= 迭代次数:$numIterations%.2flambda:$lambda%.2f 结果 Rmse $Rmse%.2f"+"训练需要时间:"+duration.getMillis+"毫秒")

          (Rmse,duration.getStandardSeconds)

     }

     /.*

     *使用jfree.char评估单个参数,这里没有实现

     * @paramtrainData 训练数据

     * @paramvalidationData 评估数据

     * @paramevaluateParameter 评估参数名称

     * @paramrankArray rank参数数组

     * @paramnumIterationsArray 迭代次数参数数组

     * @paramlambdaArray lambda参数数组

     */

     def evaluateParameter(trainData: RDD[Rating],validationData: RDD[Rating], evaluateParameter: String, rankArray:Array[Int], numIterationsArray: Array[Int], lambdaArray:Array[Double]): Unit ={

          //替换不同的参数训练模型做评估,会返回RMSE值和时间。通过这两个值来用jfree.char绘制柱状图和折线图来评估单个参数

          //todo

     }

     /.*

     *三个参数交叉评估,找出最好的参数组合

     * @paramtrainData 训练数据

     * @paramvalidationData 评估数据

     * @paramrankArray rank参数数组

     * @paramnumIterationsArray 迭代次数参数数组

     * @paramlambdaArray lambda参数数组

     * @return返回由最好参数组合训练出的模型

     */

     def evaluateAllParameter(trainData: RDD[Rating],validationData: RDD[Rating], rankArray: Array[Int],numIterationsArray: Array[Int], lambdaArray: Array[Double]):MatrixFactorizationModel= {

          val evaluations=for(rank <- rankArray;numIterations <-numIterationsArray;lambda <- lambdaArray) yield {

               val (rmse,time)=trainModel(trainData,validationData,rank,numIterations,lambda)

               (rank,numIterations,lambda,rmse)

          }

          val Eval=(evaluations.sortBy(_._4))

          val bestEval=eval_r(0)

          println("最佳模型参数:rank:"+bestEval._1+" 迭代次数:"+bestEval._2+"lambda:"+bestEval._3+" 结果rmse:"+bestEval._4)

          val bestModel=ALS.train(trainData,bestEval._1,bestEval._2,bestEval._3)

          (bestModel)

     }

     /.*

     * 训练评估

     * @paramtrainData 训练数据

     * @paramvalidationData 评估数据

     * @return返回一个最理想的模型

     */

     def trainValidation(trainData:RDD[Rating],validationData:RDD[Rating]):MatrixFactorizationModel={

          println("------所有参数交叉评估找出最好的参数组合------")

          val

bestmodel=evaluateAllParameter(trainData,validationData,Array(5,10,15,20,50,100),Array(5,10,15,20,25),Array(0.01,0.05,0.1,1,5))

          bestmodel

     }

     def main(args: Array[String]) {

          SetLogger

          println("========================数据准备阶段========================")

          val (trainData,validationData,testData)=PrepareData()

          trainData.persist();validationData.persist();testData.persist()

          println("========================训练验证阶段========================")

          val bestModel=trainValidation(trainData,validationData)

          println("======================测试阶段===========================")

          val testRmse=computeRmse(bestModel,testData)

          println("使用bestModel测试testData,结果rmse="+testRmse)

     }

}

相关文章

  • sparkMllib的ALS过滤算法调校参数

    对源程序的参数校调,下述如有不解,请到群内讨论:624108656 源程序:源程序,上代码: package al...

  • sparkMllib的ALS过滤算法

    依托spark平台完成的电影推荐系统,上代码: 后续有对该程序的参数校调:参数校调程序 该程序应用到的数据:数据 ...

  • SparkMLlib ALS算法

    本次试验使用movieLens数据集的一千万行评分数据进行训练和预测,每个用户取预测评分最高的Top10存入HBa...

  • implicit 库 ALS 算法分析

    implicit 库 ALS 算法分析 ALS 算法 推荐模型基于“隐式反馈数据集的协同过滤”一文中描述的算法,其...

  • (9)推荐算法

    (1)ALS交替最小二乘算法 ALS指使用交替最小二乘法求解的协同过滤算法。通过观察到的所有用户给产品...

  • 协同过滤-ALS算法

    ALS算法应用场景 ALS属于数据挖掘,可以做推荐系统,比如电影推荐,商品推荐,广告推荐等. 原理就是给各个指标,...

  • 总结

    1.ALS 2.基于ALS算法的改进 3.实验结果分析 4.结论 1.ALS 1.1ALS算法的基本思想 ALS(...

  • pyspark协同过滤算法(ALS)

    ALS:Alternating Least Square,交替最小二乘法,用于推荐系统 算法原理 假设有一矩阵R,...

  • ALS协同过滤推荐算法【Spark MLlib】

    交替最小二乘法(ALS)是统计分析中最常用的逼近计算的一种算法,其交替计算结果使得最终结果尽可能地逼近真实结果。而...

  • spark自带的ALS算法实现协同过滤

    环境:spark1.6.0 scala2.11.4使用的数据集是tpch数据集 第一步进行文件的读取,将读取到的d...

网友评论

    本文标题:sparkMllib的ALS过滤算法调校参数

    本文链接:https://www.haomeiwen.com/subject/pihpuftx.html