使用Spark创建搜索引擎

作者: 文哥的学习日记 | 来源:发表于2017-09-01 00:38 被阅读952次

    1、背景介绍

    1.1 项目背景

    假设有一个在线电影网站,会员可以付费在线观赏电影,公司希望能偶运用大数据分析推荐引擎,增加会员观看影片的次数以增加营业收入,主要有以下两个目的:
    针对用户推荐感兴趣的电影:可以针对每一个会员,定期发送短信或者E-mail或会员登录时,推荐给他/她可能会感兴趣的电影。
    针对电影推荐给感兴趣的用户:当想要促销某些电影时,也可以找出可能会对这些电影感兴趣的会员。

    2、数据背景

    我们使用https://grouplens.org/datasets/movielens/网站提供的电影评分数据,如下图所示,我们选择ml-100k数据。

    2、ALS算法介绍

    该算法介绍可以参考博客:http://blog.csdn.net/oucpowerman/article/details/49847979,这里就不再详细介绍。

    3、数据查看

    我们使用spark2.1和scala2.11版本来实现我们的推荐系统,我们使用spark-shell命令来进入我们Spark的交互环境,创建一个SparkSession:

    接下来,我们导入我们的评分数据,并查看数据的一些特征:

    可以看懂,我们的数据一共分为4列,分别为用户id,电影id,评分以及日期时间,我们可以看一下每个字段的统计特征:

    4、使用ALS训练模型

    我们查看了原始的训练数据,原始的数据有四列,但是我们真正需要的只是其中的前三列,所以这里需要进行一个转换,要想使用ALS进行模型的训练,我们需要进一步将数据转换为RDD[Rating]类型,最后,我们可以进行训练,得到我们的模型。

    引入相关类库

    提取数据的前三列
    我们这里使用一个map的方法,原始数据的一行是一个字符串,我们需要使用split进行分割,分割之后我们提取前三个字段即可:

    可以看到,这里我们使用了匿名函数和匿名参数的方式,_就代表我们传入的每一行数据。

    转换为RDD[Rating]
    接下来,我们仍然使用map方法,将RDD中的每一行进行一个转换:

    模型训练
    接下来我们就可以使用ALS.train方法进行模型的训练了,返回的模型的类型是org.apache.spark.mllib.recommendation.MatrixFactorizationModel

    train方法接收的参数如下表:

    参数 说明
    Ratings:RDD[Rating] 输入的训练数据,格式为:Rating(UserID,productID,rating)的RDD
    rank:Int rank指当进行矩阵分解时,将原本矩阵A(mn)分解为X(mrank) 和 Y(rank*n)中的rank值
    Iterations:Int ALS算法迭代的次数
    lambda:Double 建议值0.01

    针对用户推荐电影
    我们已经完成了数据训练,接下来我们就可以对用户推荐电影,比如我们要对id为196的用户推荐5部电影:

    可见,该方法返回的是一个数组,我们进一步使用foreach方法打印每一行,返回的一条是Rating类型的,第一个代表用户id,第二个代表电影id,第三个代表推荐评分,评分最高说明越值得推荐。

    查看针对用户推荐产品的评分
    我们可以查询系统对用户推荐产品的评分,使用predict方法:

    针对电影推荐给用户
    当我们想要促销某些电影时,可以找出可能会对这些电影感兴趣的会员,我们可以使用model.recommendUsers方法针对某一部电影推荐相关的会员:

    可以看到,返回数据跟给用户推荐电影的格式是相同的。

    将电影名称和id进行绑定
    首先我们读取数据:

    读取后的每一条数据仍然是一个字符串,我们需要使用split进行分割,由于|是scala正则表达式中的一个特殊字符,所以我们需要进行转义,最后使用collectAsMap方法,将RDD转换成一个类似Map的数据,这样我们就可以通过电影id来查找电影的名字。

    最后,我们只需要做一个简单的map运算,用电影的名称替换电影的id即可。

    5、Demo完整代码

    package mllib.ALSrecommend
    
    import org.apache.spark.mllib.recommendation.{ALS, Rating}
    import org.apache.spark.sql.SparkSession
    
    
    object RecommendSystem {
    
      def main(args:Array[String]) = {
        val spark = SparkSession
          .builder()
          .appName("Spark SQL basic example")
         // .enableHiveSupport()
          //.config("spark.some.config.option", "some-value")
          .getOrCreate()
        val rawUserData = spark.sparkContext.textFile("data/ml-100k/u.data")
        println(rawUserData)
        println(rawUserData.first())
        rawUserData.take(5).foreach(println)
        println(rawUserData.map(_.split("\t")(0).toDouble).stats())
        println(rawUserData.map(_.split("\t")(1).toDouble).stats())
        println(rawUserData.map(_.split("\t")(2).toDouble).stats())
        println(rawUserData.map(_.split("\t")(3).toDouble).stats())
    
        val rawRatings = rawUserData.map(_.split("\t").take(3))
        val ratingsRDD = rawRatings.map{case Array(user,movie,rating)=>Rating(user.toInt,movie.toInt,rating.toDouble)}
    
        val model = ALS.train(ratingsRDD,10,10,0.01)
        println(model)
    
        model.recommendProducts(196,5).foreach(println)
        println(model.predict(196,1643))
    
        model.recommendUsers(464,5).foreach(println)
    
        val itemRDD = spark.sparkContext.textFile("data/ml-100k/u.item")
        val movieTitle = itemRDD.map(line=>line.split("\\|").take(2)).map(array=>(array(0).toInt,array(1))).collectAsMap()
        movieTitle.take(5).foreach(println)
    
        println(movieTitle(146))
    
        model.recommendProducts(196,5).map(ratings=>(ratings.product,movieTitle(ratings.product),ratings.rating)).foreach(println)
    
      }
    }
    
    

    6、创建Recommend项目

    上面几部分从整体上介绍了如何通过Spark构造一个推荐引擎,但复用性不是很好,所以,我们将上面的代码进行整理,构造一个推荐系统的类,类中几个重要的函数如下:

    读取数据
    读取数据函数我们返回两部分,首先是RDD[Rating]类型的训练数据,其次是Map类型的电影id和电影名称的对应数据表。

     def PrepareData():(RDD[Rating],Map[Int,String])={
          val spark = SparkSession
            .builder()
            .appName("Spark SQL basic example")
            // .enableHiveSupport()
            //.config("spark.some.config.option", "some-value")
            .getOrCreate()
          print("开始读取用户评分数据")
          val rawUserData = spark.sparkContext.textFile("data/ml-100k/u.data")
          val ratings = rawUserData.map(_.split("\t").take(3))
          val ratingsRDD = ratings.map{case Array(user,movie,rating)=>Rating(user.toInt,movie.toInt,rating.toDouble)}
          println("共计"+ratingsRDD.count.toString()+"条ratings")
    
          print("开始读取电影数据")
          val itemRDD = spark.sparkContext.textFile("data/ml-100k/u.item")
          val movieTitle = itemRDD.map(line=>line.split("\\|").take(2)).map(array=>(array(0).toInt,array(1))).collect().toMap
          //显示数据记录数
          val numRatings = ratingsRDD.count()
          val numUsers = ratingsRDD.map(_.user).distinct().count()
          val numMovies = ratingsRDD.map(_.product).distinct().count()
          println("共计:ratings:"+numRatings+" User:"+numUsers+" Movies:"+numMovies)
          return (ratingsRDD,movieTitle)
        }
    

    电影推荐和用户推荐
    接下来是电影推荐和用户推荐的函数:

    def RecommendMovies(model:MatrixFactorizationModel,movieTitle:Map[Int,String],inputUserID:Int):Unit= {
          val RecommendMovie = model.recommendProducts(inputUserID,10)
          var i = 1
          println("针对用户"+inputUserID+" 推荐下列电影:")
          RecommendMovie.foreach{r => println(i.toString()+"."+movieTitle(r.product)+" 评分:"+r.rating.toString());i+=1}
        }
    
    def RecommendUsers(model:MatrixFactorizationModel,movieTitle:Map[Int,String],inputMovieID:Int):Unit= {
          val RecommendUser = model.recommendUsers(inputMovieID,10)
          var i = 1
          println("针对电影"+inputMovieID+" 推荐下列用户:")
          RecommendUser.foreach{r => println(i.toString()+"."+movieTitle(r.user)+" 评分:"+r.rating.toString());i+=1}
        }
    

    推荐入口

    def recommend(model:MatrixFactorizationModel,movieTitle:Map[Int,String]):Unit={
          var choose=""
          while(choose!="3"){
            print("请选择推荐类型,1:针对用户推荐电影,2:针对电影推荐给感兴趣的用户,3:离开")
            choose = scala.io.StdIn.readLine()
            if (choose=="1"){
              print("请输入用户id?")
              val inputUserID = scala.io.StdIn.readLine()
              RecommendMovies(model,movieTitle,inputUserID.toInt)
            }
            else if(choose=="2") {
              print("请输入电影的id?")
              val inputMovieID = scala.io.StdIn.readLine()
              RecommendUsers(model, movieTitle, inputMovieID.toInt)
            }
          }
    
        }
    

    主函数

    def main(args:Array[String]):Unit={
          setLogger
          val (ratings,movieTitle) = PrepareData()
          val model = ALS.train(ratings,5,20,0.1)
          recommend(model,movieTitle)
        }
    

    完整代码

    package mllib.ALSrecommend
    import org.apache.spark.mllib.recommendation.{ALS, Rating,MatrixFactorizationModel}
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.rdd.RDD
    import scala.io.StdIn
    import org.apache.log4j.Logger
    import org.apache.log4j.Level
    
    object Recommend {
    
      def setLogger ={
        Logger.getLogger("org").setLevel(Level.OFF)
        Logger.getLogger("com").setLevel(Level.OFF)
        System.setProperty("spark.ui.showConsoleProgress","false")
        Logger.getRootLogger().setLevel(Level.OFF)
      }
    
        def PrepareData():(RDD[Rating],Map[Int,String])={
          val spark = SparkSession
            .builder()
            .appName("Spark SQL basic example")
            // .enableHiveSupport()
            //.config("spark.some.config.option", "some-value")
            .getOrCreate()
          print("开始读取用户评分数据")
          val rawUserData = spark.sparkContext.textFile("data/ml-100k/u.data")
          val ratings = rawUserData.map(_.split("\t").take(3))
          val ratingsRDD = ratings.map{case Array(user,movie,rating)=>Rating(user.toInt,movie.toInt,rating.toDouble)}
          println("共计"+ratingsRDD.count.toString()+"条ratings")
    
          print("开始读取电影数据")
          val itemRDD = spark.sparkContext.textFile("data/ml-100k/u.item")
          val movieTitle = itemRDD.map(line=>line.split("\\|").take(2)).map(array=>(array(0).toInt,array(1))).collect().toMap
          //显示数据记录数
          val numRatings = ratingsRDD.count()
          val numUsers = ratingsRDD.map(_.user).distinct().count()
          val numMovies = ratingsRDD.map(_.product).distinct().count()
          println("共计:ratings:"+numRatings+" User:"+numUsers+" Movies:"+numMovies)
          return (ratingsRDD,movieTitle)
        }
    
        def RecommendMovies(model:MatrixFactorizationModel,movieTitle:Map[Int,String],inputUserID:Int):Unit= {
          val RecommendMovie = model.recommendProducts(inputUserID,10)
          var i = 1
          println("针对用户"+inputUserID+" 推荐下列电影:")
          RecommendMovie.foreach{r => println(i.toString()+"."+movieTitle(r.product)+" 评分:"+r.rating.toString());i+=1}
        }
    
        def RecommendUsers(model:MatrixFactorizationModel,movieTitle:Map[Int,String],inputMovieID:Int):Unit= {
          val RecommendUser = model.recommendUsers(inputMovieID,10)
          var i = 1
          println("针对电影"+inputMovieID+" 推荐下列用户:")
          RecommendUser.foreach{r => println(i.toString()+"."+movieTitle(r.user)+" 评分:"+r.rating.toString());i+=1}
        }
    
        def recommend(model:MatrixFactorizationModel,movieTitle:Map[Int,String]):Unit={
          var choose=""
          while(choose!="3"){
            print("请选择推荐类型,1:针对用户推荐电影,2:针对电影推荐给感兴趣的用户,3:离开")
            choose = scala.io.StdIn.readLine()
            if (choose=="1"){
              print("请输入用户id?")
              val inputUserID = scala.io.StdIn.readLine()
              RecommendMovies(model,movieTitle,inputUserID.toInt)
            }
            else if(choose=="2") {
              print("请输入电影的id?")
              val inputMovieID = scala.io.StdIn.readLine()
              RecommendUsers(model, movieTitle, inputMovieID.toInt)
            }
          }
    
        }
        def main(args:Array[String]):Unit={
          setLogger
          val (ratings,movieTitle) = PrepareData()
          val model = ALS.train(ratings,5,20,0.1)
          recommend(model,movieTitle)
        }
    
    }
    
    

    7、ALS.train方法参数调优

    机器学习中很重要的一个部分就是对代码参数进行调优,调优的两个重要衡量指标是代码的运行时间以及运行的结果好坏,前面我们已经介绍了,ALS.train方法主要涉及了三个参数,分别为:rank,iterations和lambda,那么这三个参数分别如何影响代码的性能,以及如何选择三者的组合来达到最好的效率呢,这是本节主要讨论的目的。

    准备数据
    和上面有所不同的是,这里我们要准备数据,将数据分为训练集,验证集和测试集,返回三个RDD[Rating]类型的数据:

    def PrepareData():(RDD[Rating],RDD[Rating],RDD[Rating])={
        val spark = SparkSession
          .builder()
          .appName("Spark SQL basic example")
          // .enableHiveSupport()
          //.config("spark.some.config.option", "some-value")
          .getOrCreate()
        print("开始读取用户评分数据")
        val rawUserData = spark.sparkContext.textFile("data/ml-100k/u.data")
        val ratings = rawUserData.map(_.split("\t").take(3))
        val ratingsRDD = ratings.map{case Array(user,movie,rating)=>Rating(user.toInt,movie.toInt,rating.toDouble)}
        println("共计"+ratingsRDD.count.toString()+"条ratings")
    
        println("开始读取电影数据")
        val itemRDD = spark.sparkContext.textFile("data/ml-100k/u.item")
        val movieTitle = itemRDD.map(line=>line.split("\\|").take(2)).map(array=>(array(0).toInt,array(1))).collect().toMap
        //显示数据记录数
        val numRatings = ratingsRDD.count()
        val numUsers = ratingsRDD.map(_.user).distinct().count()
        val numMovies = ratingsRDD.map(_.product).distinct().count()
        println("共计:ratings:"+numRatings+" User:"+numUsers+" Movies:"+numMovies)
    
        println("以随机数的方式将数据分为3个部分并返回")
        val Array(trainData,validationData,testData) = ratingsRDD.randomSplit(Array(0.8,0.8,0.1))
        println("trainData:"+trainData.count()+",validationData:"+validationData.count()+",testData:"+testData.count())
        return (trainData,validationData,testData)
      }
    

    训练评估

    def trainValidation(trainData:RDD[Rating],validationData:RDD[Rating]):MatrixFactorizationModel={
        println("------评估rank参数使用------")
        evaluateParameter(trainData,validationData,"rank",Array(5,10,15,20,50,100),Array(10),Array(0.1))
        println("------评估numIterations参数使用------")
        evaluateParameter(trainData,validationData,"numIterations",Array(10),Array(5,10,15,20,25),Array(0.1))
        println("------评估lambda参数使用------")
        evaluateParameter(trainData,validationData,"lambda",Array(10),Array(10),Array(0.05,0.1,1,5,10))
        println("------所有参数交叉评估找出最好的参数组合------")
        val bestModel = evaluateAllParameter(trainData,validationData,Array(5,10,15,20,50,100),Array(5,10,15,20,25),Array(0.05,0.1,1,5,10))
        return bestModel
    

    可以看到,训练评估函数中涉及了两个方法,分别用于对单个参数进行测试和三个参数整体进行调优,两个方法分别如下:

    def evaluateParameter(trainData:RDD[Rating],validationData:RDD[Rating],evaluateParameter:String,rankArray:Array[Int],numIterationsArray:Array[Int],lambdaArray:Array[Double])={
        val dataBarChart = new DefaultCategoryDataset()
        val dataLineChart = new DefaultCategoryDataset()
        for(rank <- rankArray;numIterations <- numIterationsArray;lambda <- lambdaArray){
          val (rmse,time) = trainModel(trainData,validationData,rank,numIterations,lambda)
          val parameterData = evaluateParameter match{
            case "rank" => rank;
            case "numIterations" => numIterations;
            case "lambda" => lambda
          }
          dataBarChart.addValue(rmse,evaluateParameter,parameterData.toString())
          dataLineChart.addValue(time,"Time",parameterData.toString())
    
        }
        Chart.plotBarLineChart("ALS evaluations " + evaluateParameter,evaluateParameter,"RMSE",0.58,5,"Time",dataBarChart,dataLineChart)
    
      }
    
    def evaluateAllParameter(trainData: RDD[Rating], validationData: RDD[Rating], rankArray: Array[Int], numIterationsArray: Array[Int], lambdaArray: Array[Double]):MatrixFactorizationModel={
        val evaluations = for(rank <- rankArray;numIterations <- numIterationsArray;lambda <- lambdaArray) yield {
          val (rmse,time) = trainModel(trainData,validationData,rank,numIterations,lambda)
          (rank,numIterations,lambda,rmse)
        }
        val Eval = (evaluations.sortBy(_._4))
        val BestEval = Eval(0)
        println("最佳model参数:rank:"+BestEval._1+",iterations:"+BestEval._2+"lambda:"+BestEval._3+"结果RMSE="+BestEval._4)
        val bestModel = ALS.train(trainData,BestEval._1,BestEval._2,BestEval._3)
        (bestModel)
      }
    
    

    其中,训练模型和计算RMSE的函数分别如下:

     def computeRmse(model:MatrixFactorizationModel,ratingRDD:RDD[Rating]):Double={
        val num = ratingRDD.count()
        val predictedRDD = model.predict(ratingRDD.map(line=>(line.user,line.product)))
        val kvRatingRDD = ratingRDD.map(r=>((r.user,r.product),r.rating))
        val predictedAndRatings = predictedRDD.map(p=>((p.user,p.product),p.rating)).join(kvRatingRDD).values
    
        math.sqrt(predictedAndRatings.map(x=>(x._1-x._2) * (x._1-x._2)).reduce((_+_))/num)
    
      }
    def trainModel(trainData:RDD[Rating],validationData:RDD[Rating],rank:Int,numIterations:Int,lambda:Double):(Double,Double) ={
        val startTime = new DateTime()
        val model = ALS.train(trainData,rank,numIterations,lambda)
        val endTime = new DateTime()
        val Rmse = computeRmse(model,validationData)
        val duration = new Duration(startTime,endTime)
        println(f"训练参数:rank:$rank%3d,iterations;$numIterations%.2f;lambda:$lambda%.2f 结果 RMSE=$Rmse%.2f"+"训练需要时间为:"+duration.getMillis+"毫秒")
        return (Rmse,duration.getStandardSeconds)
    
      }
    

    画图类
    在测试单个参数的性能时,我们引入了画图类,画图类的代码如下所示:

    package mllib.ALSrecommend
    
    import org.jfree.chart._
    import org.jfree.data.xy._
    import org.jfree.data.category.DefaultCategoryDataset
    import org.jfree.chart.axis.NumberAxis
    import org.jfree.chart.axis._
    import java.awt.Color
    import org.jfree.chart.renderer.category.LineAndShapeRenderer;
    import org.jfree.chart.plot.DatasetRenderingOrder;
    import org.jfree.chart.labels.StandardCategoryToolTipGenerator;
    import java.awt.BasicStroke
    
    object Chart {
      def plotBarLineChart(Title: String, xLabel: String, yBarLabel: String, yBarMin: Double, yBarMax: Double, yLineLabel: String, dataBarChart : DefaultCategoryDataset, dataLineChart: DefaultCategoryDataset): Unit = {
    
        //画出Bar Chart
        val chart = ChartFactory
          .createBarChart(
            "", // Bar Chart 标题
            xLabel, // X轴标题
            yBarLabel, // Bar Chart 标题 y轴标题l
            dataBarChart , // Bar Chart数据
            org.jfree.chart.plot.PlotOrientation.VERTICAL,//画图方向垂直
            true, // 包含 legend
            true, // 显示tooltips
            false // 不要URL generator
          );
        //取得plot
        val plot = chart.getCategoryPlot();
        plot.setBackgroundPaint(new Color(0xEE, 0xEE, 0xFF));
        plot.setDomainAxisLocation(AxisLocation.BOTTOM_OR_RIGHT);
        plot.setDataset(1, dataLineChart); plot.mapDatasetToRangeAxis(1, 1)
        //画直方图y轴
        val vn = plot.getRangeAxis(); vn.setRange(yBarMin, yBarMax);  vn.setAutoTickUnitSelection(true)
        //画折线图y轴
        val axis2 = new NumberAxis(yLineLabel); plot.setRangeAxis(1, axis2);
        val renderer2 = new LineAndShapeRenderer()
        renderer2.setToolTipGenerator(new StandardCategoryToolTipGenerator());
        //设置先画直方图,再画折线图以免折线图被盖掉
        plot.setRenderer(1, renderer2);plot.setDatasetRenderingOrder(DatasetRenderingOrder.FORWARD);
        //创建画框
        val frame = new ChartFrame(Title,chart); frame.setSize(500, 500);
        frame.pack(); frame.setVisible(true)
      }
    }
    

    主函数

    def main(args:Array[String]) = {
        setLogger
        println("========数据准备阶段========")
        val (trainData,validationData,testData) = PrepareData()
        trainData.persist()
        validationData.persist()
        testData.persist()
        println("========训练验证阶段========")
        val bestModel = trainValidation(trainData,validationData)
        println("========测试阶段========")
        val testRmse = computeRmse(bestModel,testData)
        println("使用testData测试bestModel的结果RMSE="+testRmse)
       //trainData.unpersist()
    //    validationData.unpersist()
       // testData.unpersist()
      }
    

    完整代码

    package mllib.ALSrecommend
    
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.mllib.recommendation.{ALS, Rating,MatrixFactorizationModel}
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.rdd.RDD
    import org.jfree.data.category.DefaultCategoryDataset
    import org.joda.time.format._
    import org.joda.time._
    import org.joda.time.Duration
    
    object AlsEvaluation {
      def setLogger ={
        Logger.getLogger("org").setLevel(Level.OFF)
        Logger.getLogger("com").setLevel(Level.OFF)
        System.setProperty("spark.ui.showConsoleProgress","false")
        Logger.getRootLogger().setLevel(Level.OFF)
      }
    
      def PrepareData():(RDD[Rating],RDD[Rating],RDD[Rating])={
        val spark = SparkSession
          .builder()
          .appName("Spark SQL basic example")
          // .enableHiveSupport()
          //.config("spark.some.config.option", "some-value")
          .getOrCreate()
        print("开始读取用户评分数据")
        val rawUserData = spark.sparkContext.textFile("data/ml-100k/u.data")
        val ratings = rawUserData.map(_.split("\t").take(3))
        val ratingsRDD = ratings.map{case Array(user,movie,rating)=>Rating(user.toInt,movie.toInt,rating.toDouble)}
        println("共计"+ratingsRDD.count.toString()+"条ratings")
    
        println("开始读取电影数据")
        val itemRDD = spark.sparkContext.textFile("data/ml-100k/u.item")
        val movieTitle = itemRDD.map(line=>line.split("\\|").take(2)).map(array=>(array(0).toInt,array(1))).collect().toMap
        //显示数据记录数
        val numRatings = ratingsRDD.count()
        val numUsers = ratingsRDD.map(_.user).distinct().count()
        val numMovies = ratingsRDD.map(_.product).distinct().count()
        println("共计:ratings:"+numRatings+" User:"+numUsers+" Movies:"+numMovies)
    
        println("以随机数的方式将数据分为3个部分并返回")
        val Array(trainData,validationData,testData) = ratingsRDD.randomSplit(Array(0.8,0.8,0.1))
        println("trainData:"+trainData.count()+",validationData:"+validationData.count()+",testData:"+testData.count())
        return (trainData,validationData,testData)
    
      }
    
      def computeRmse(model:MatrixFactorizationModel,ratingRDD:RDD[Rating]):Double={
        val num = ratingRDD.count()
        val predictedRDD = model.predict(ratingRDD.map(line=>(line.user,line.product)))
        val kvRatingRDD = ratingRDD.map(r=>((r.user,r.product),r.rating))
        val predictedAndRatings = predictedRDD.map(p=>((p.user,p.product),p.rating)).join(kvRatingRDD).values
    
        math.sqrt(predictedAndRatings.map(x=>(x._1-x._2) * (x._1-x._2)).reduce((_+_))/num)
    
      }
      def trainModel(trainData:RDD[Rating],validationData:RDD[Rating],rank:Int,numIterations:Int,lambda:Double):(Double,Double) ={
        val startTime = new DateTime()
        val model = ALS.train(trainData,rank,numIterations,lambda)
        val endTime = new DateTime()
        val Rmse = computeRmse(model,validationData)
        val duration = new Duration(startTime,endTime)
        println(f"训练参数:rank:$rank%3d,iterations;$numIterations%.2f;lambda:$lambda%.2f 结果 RMSE=$Rmse%.2f"+"训练需要时间为:"+duration.getMillis+"毫秒")
        return (Rmse,duration.getStandardSeconds)
    
      }
    
      def evaluateParameter(trainData:RDD[Rating],validationData:RDD[Rating],evaluateParameter:String,rankArray:Array[Int],numIterationsArray:Array[Int],lambdaArray:Array[Double])={
        val dataBarChart = new DefaultCategoryDataset()
        val dataLineChart = new DefaultCategoryDataset()
        for(rank <- rankArray;numIterations <- numIterationsArray;lambda <- lambdaArray){
          val (rmse,time) = trainModel(trainData,validationData,rank,numIterations,lambda)
          val parameterData = evaluateParameter match{
            case "rank" => rank;
            case "numIterations" => numIterations;
            case "lambda" => lambda
          }
          dataBarChart.addValue(rmse,evaluateParameter,parameterData.toString())
          dataLineChart.addValue(time,"Time",parameterData.toString())
    
        }
        Chart.plotBarLineChart("ALS evaluations " + evaluateParameter,evaluateParameter,"RMSE",0.58,5,"Time",dataBarChart,dataLineChart)
    
      }
    
      def evaluateAllParameter(trainData: RDD[Rating], validationData: RDD[Rating], rankArray: Array[Int], numIterationsArray: Array[Int], lambdaArray: Array[Double]):MatrixFactorizationModel={
        val evaluations = for(rank <- rankArray;numIterations <- numIterationsArray;lambda <- lambdaArray) yield {
          val (rmse,time) = trainModel(trainData,validationData,rank,numIterations,lambda)
          (rank,numIterations,lambda,rmse)
        }
        val Eval = (evaluations.sortBy(_._4))
        val BestEval = Eval(0)
        println("最佳model参数:rank:"+BestEval._1+",iterations:"+BestEval._2+"lambda:"+BestEval._3+"结果RMSE="+BestEval._4)
        val bestModel = ALS.train(trainData,BestEval._1,BestEval._2,BestEval._3)
        (bestModel)
      }
    
    
      def trainValidation(trainData:RDD[Rating],validationData:RDD[Rating]):MatrixFactorizationModel={
        println("------评估rank参数使用------")
        evaluateParameter(trainData,validationData,"rank",Array(5,10,15,20,50,100),Array(10),Array(0.1))
        println("------评估numIterations参数使用------")
        evaluateParameter(trainData,validationData,"numIterations",Array(10),Array(5,10,15,20,25),Array(0.1))
        println("------评估lambda参数使用------")
        evaluateParameter(trainData,validationData,"lambda",Array(10),Array(10),Array(0.05,0.1,1,5,10))
        println("------所有参数交叉评估找出最好的参数组合------")
        val bestModel = evaluateAllParameter(trainData,validationData,Array(5,10,15,20,50,100),Array(5,10,15,20,25),Array(0.05,0.1,1,5,10))
        return bestModel
    
      }
      def main(args:Array[String]) = {
        setLogger
        println("========数据准备阶段========")
        val (trainData,validationData,testData) = PrepareData()
        trainData.persist()
        validationData.persist()
        testData.persist()
        println("========训练验证阶段========")
        val bestModel = trainValidation(trainData,validationData)
        println("========测试阶段========")
        val testRmse = computeRmse(bestModel,testData)
        println("使用testData测试bestModel的结果RMSE="+testRmse)
       //trainData.unpersist()
    //    validationData.unpersist()
       // testData.unpersist()
      }
    }
    

    结果展示

    我们首先来看rank对模型训练的影响,可以看到,rank对模型的误差影响不大,但随着rank 的增加,模型训练时间会有所增加

    接下来我们看numIterations,该参数对模型的误差影响也不大,但随着参数数值的增大,模型运行时间变长。

    最后是lambda参数,lambda参数对模型的运行时间没有影响,但是随着lambda的增大,模型的训练误差逐渐增大。

    那么使用什么样的参数组合能够带来最佳的模型训练效果呢,答案如下:



    这里答案不是唯一的,每次训练可能得到不同的结果。

    8、代码知识点整理

    根据上面的两段完整的代码,我整理了一些我第一次接触到的,不是太熟悉的地方:

    设置打印输出
    spark运行时会显示很多log信息,信息太多会和正常信息混合在一起,所以可以根据如下代码设置不显示log信息:

    def setLogger ={
        Logger.getLogger("org").setLevel(Level.OFF)
        Logger.getLogger("com").setLevel(Level.OFF)
        System.setProperty("spark.ui.showConsoleProgress","false")
        Logger.getRootLogger().setLevel(Level.OFF)
      }
    

    randomSplit函数切分RDD
    使用randomSplit函数对RDD进行切分:

     val Array(trainData,validationData,testData) = ratingsRDD.randomSplit(Array(0.8,0.8,0.1))
    

    打印输出的花样

    println(f"训练参数:rank:$rank%3d,iterations;$numIterations%.2f;lambda:$lambda%.2f 结果 RMSE=$Rmse%.2f"+"训练需要时间为:"+duration.getMillis+"毫秒")
    

    scala中使用yield
    Scala中的yield的主要作用是记住每次迭代中的有关值,并逐一存入到一个数组中。用法如下:for {子句} yield {变量或表达式}

    val evaluations = for(rank <- rankArray;numIterations <- numIterationsArray;lambda <- lambdaArray) yield {
          val (rmse,time) = trainModel(trainData,validationData,rank,numIterations,lambda)
          (rank,numIterations,lambda,rmse)
        }
    

    相关文章

      网友评论

        本文标题:使用Spark创建搜索引擎

        本文链接:https://www.haomeiwen.com/subject/vgajdxtx.html