美文网首页大数据
Spark实时离线电影推荐系统

Spark实时离线电影推荐系统

作者: 艾剪疏 | 来源:发表于2018-12-04 09:42 被阅读442次

    1 项目介绍
    2 涉及的技术
    3 推荐流程图
    4 收获
    5 问题

    1 项目介绍

    1. 使用Spark框架实现电影推荐系统;
    2. 运用数据挖掘的算法产生模型,为用户精准推荐喜好的电影;
    3. 分别通过离线和实时两种方式实现电影推荐系统;

    2 涉及技术

    1. Spark:基于内存的分布式计算框架

    2. Hadoop:分布式离线计算框架

    3. Hive:基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供简单的sql查询功能,可以将sql语句转换为MapReduce任务进行运行

    4. Kafka:分布式高并发消息队列,负责缓存Flume采集的数据并为下游的各种计算提供高并发的数据处理

    5. Hbase:亿级行百万列并可毫秒级查询的数据库,可快速查询我们的计算数据

    6. Phoenix:是构建在HBase上的SQL中间层,Phoenix查询引擎会将SQL查询转换为一个或者多个HBase Scan,并行执行以生成标准的JDBC结果集。

    3 推荐流程图

    image.png

    解释如下:

    1. 加载HDFS数据,处理之后存储到Hive中;
    2. 离线推荐部分技术处理思路;
    • 从Hive中加载训练数据和测试数据
    • 使用SparkMLlib的ALS交替最小二乘法训练模型
    • 使用模型产生推荐结果
    • 将推荐结果写入到Mysql、Hive、Phoenix+Hbase中
    1. 实时推荐部分技术处理思路;
    • 从Hive中拿出数据
    • 取出测试数据集中数据,send到Kafka中。
    • 通过SparkStreaming主动Kafka消息队列获取数据,并根据用户是否为新用户制定推荐策略
    • 新用户,从训练数据集中取出浏览人数最多的电影的前5部作为推荐结果
    • 老用户,使用推荐模型为用户推荐5部电影
    image.png

    4 收获

    1 大数据环境搭建

    (1)单机版Hadoop、Spark、Hive、Mysql的搭建
    Spark处理HDFS数据,并将结果存储在Hive中
    配置一台Hive + Mysql元数据库

    2 数据初始预处理

    object ETL {
      def main(args: Array[String]): Unit = {
        val localClusterURL = "local[2]"
        val clusterMasterURL = "spark://s1:7077"
        val conf = new SparkConf().setAppName("ETL").setMaster(clusterMasterURL)
        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)
        val hc = new HiveContext(sc)
        import sqlContext.implicits._
        hc.sql("use moive_recommend")
    
        // 设置RDD的partition的数量一般以集群分配给应用的CPU核数的整数倍为宜。
        val minPartitions = 8
        // 通过case class来定义Links的数据结构,数据的schema,适用于schama已知的数据
        // 也可以通过StructType的方式,适用于schema未知的数据,具体参考文档:
        //http://spark.apache.org/docs/1.6.2/sql-programming-guide.html#programmatically-specifying-the-schema
        val links = sc.textFile("hdfs://localhost/home/spark/temp/moiveRec/links.txt",minPartitions).filter{ !_.endsWith(",")}.map(_.split(","))
          .map(x =>Links(x(0).trim.toInt,x(1).trim().toInt,x(2).trim().toInt)).toDF()
    
        val movies = sc.textFile("hdfs://localhost/home/spark/temp/moiveRec/movies.txt",minPartitions).filter{ !_.endsWith(",")}.map(_.split(","))
          .map(x =>Movies(x(0).trim.toInt,x(1).trim(),x(2).trim())).toDF()
    
        val ratings = sc.textFile("hdfs://localhost/home/spark/temp/moiveRec/ratings.txt",minPartitions).filter{ !_.endsWith(",")}.map(_.split(","))
          .map(x =>Ratings(x(0).trim.toInt,x(1).trim().toInt,x(2).trim().toDouble,x(3).trim().toInt)).toDF()
    
        val tags = sc.textFile("hdfs://localhost/home/spark/temp/moiveRec/tags.txt", minPartitions).filter { !_.endsWith(",") }.map(x=>rebuild(x))
          .map(_.split(",")).map(x => Tags(x(0).trim().toInt, x(1).trim().toInt, x(2).trim(), x(3).trim().toInt)).toDF()
    
    
        links.write.mode(SaveMode.Overwrite).parquet("/home/spark/temp/moiveRec/links")
        hc.sql("drop table if exists links")
        hc.sql("create table if not exists links(movieId int,imdbId int,tmdbId int) stored as parquet" )
        hc.sql("load data inpath '/home/spark/temp/moiveRec/links' overwrite into table links")
    
    
        movies.write.mode(SaveMode.Overwrite).parquet("/home/spark/temp/moiveRec/movies")
        hc.sql("drop table if exists movies")
        hc.sql("create table if not exists movies(movieId int,title string,genres string) stored as parquet" )
        hc.sql("load data inpath '/home/spark/temp/moiveRec/movies' overwrite into table movies")
    
    
        ratings.write.mode(SaveMode.Overwrite).parquet("/home/spark/temp/moiveRec/ratings")
        hc.sql("drop table if exists ratings")
        hc.sql("create table if not exists ratings(userId int,movieId int,rating double,timestamp int) stored as parquet" )
        hc.sql("load data inpath '/home/spark/temp/moiveRec/ratings' overwrite into table ratings")
    
    
        tags.write.mode(SaveMode.Overwrite).parquet("/home/spark/temp/moiveRec/tags")
        hc.sql("drop table if exists tags")
        hc.sql("create table if not exists tags(userId int,movieId int,tag string,timestamp int) stored as parquet")
        hc.sql("load data inpath '/home/spark/temp/moiveRec/tags' overwrite into table tags")
      }
    
      // tags中大部分数据格式如下:
      //    4208,260,Action-packed,1438012536
      // 但会出现如下的数据:
      //    4208,260,"Family,Action-packed",1438012562
      // 这样对数据split后插入hive中就会出错,需清洗数据:
      //    4208,260,"Family,Action-packed",1438012562 => 4208,260,FamilyAction-packed,1438012562
      private def rebuild(input:String):String = {
        val a = input.split(",")
        val head = a.take(2).mkString(",")//提取列表的前2个元素
        val tail = a.takeRight(1).mkString//提取列表的最后1个元素
        val b = a.drop(2).dropRight(1).mkString.replace("\"", "")
        val output = head + "," + b + "," + tail
        output
      }
    }
    
    

    3 Hive的使用

    配置一台Hive + Mysql元数据库

    4 SparkMLlib机器学习算法库的使用

    /**
      * KafkaProducer从测试数据集中取出数据
      */
    object Spark_MovieTraining extends AppConf {
      def main(args: Array[String]): Unit = {
        hc.sql("use moive_recommend")
        // 训练集,总数据集的60%
        val trainingData = hc.sql("select * from trainingData")
        val ratingRDD = hc.sql("select * from trainingData")
          .rdd.map(x => Rating(x.getInt(0), x.getInt(1), x.getDouble(2)))
    
        // Build the recommendation model using ALS
        val rank = 10
        val numIterations = 10
        val model = ALS.train(ratingRDD, rank, numIterations, 0.01)
    
        // Evaluate the model on rating data
        val training = ratingRDD.map {
          case Rating(userid, movieid, rating) => (userid, movieid)
        }
    
        ratingRDD.persist()
        training.persist()
    
        val predictions =
          model.predict(training).map {
            case Rating(userid, movieid, rating) => ((userid, movieid), rating)
          }
    
        val ratesAndPreds = ratingRDD.map { case Rating(userid, movieid, rating) =>
          ((userid, movieid), rating)
        }.join(predictions)
    
    
        val MSE = ratesAndPreds.map { case ((userid, movieid), (r1, r2)) =>
          val err = (r1 - r2)
          err * err
        }.mean()
    
        println(s"Mean Squared Error = $MSE")
    
        // Save and load model
        model.save(sc, s"/home/spark/temp/moiveRec/BestModel1/$MSE")
        //val sameModel = MatrixFactorizationModel.load(sc, "/home/spark/temp/moiveRec/BestModel/")
      }
    }
    

    5 实时推荐部分Kafka + Streaming + Phoenix+Hbase流处理

    object KafkaProducer extends AppConf {
    
      def main(args: Array[String]): Unit = {
        hc.sql("use moive_recommend")
        val testDF = hc.sql("select * from testData limit 10000")
        val prop = new Properties()
        // 指定kafka的 ip地址:端口号
        prop.put("bootstrap.servers", "s1:9092")
        // 设定ProducerRecord发送的key值为String类型
        prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
        // 设定ProducerRecord发送的value值为String类型
        prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
        val topic = "movie"
        val testData = testDF.map(
          x => (topic, x.getInt(0).toString() + "," + x.getInt(1).toString + "," + x.getDouble(2).toString())
        )
        val producer = new KafkaProducer[String, String](prop)
        // 如果服务器内存不够,会出现OOM错误
        val messages = testData.toLocalIterator
        while (messages.hasNext) {
          val message = messages.next()
          val record = new ProducerRecord[String, String](topic, message._1, message._2)
          println(record)
          producer.send(record)
          // 延迟10毫秒
          Thread.sleep(10)
        }
        producer.close()
      }
    }
    
    /**
      * 接收kafka产生的数据,进行处理
      */
    object SparkDirectStream {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("SparkDirectStream").setMaster("spark://s1:7077")
        // Duration对象中封装了时间的一个对象,它的单位是ms.
        val batchDuration = new Duration(5000)
        // batchDuration为时间间隔
        val ssc = new StreamingContext(conf, batchDuration)
        val hc = new HiveContext(ssc.sparkContext)
    
        // 训练数据中是否有该用户
        val validusers = hc.sql("select * from trainingData")
        val userlist = validusers.select("userId")
    
        val modelpath = "/home/spark/temp/moiveRec/BestModel1/0.5366434001808432"
        val broker = "s1:9092"
    
        // val topics = "movie".split(",").toSet
        val topics = Set("movie")
        // val kafkaParams = Map("bootstrap.servers" -> "spark1:9092")
        val kafkaParams = Map("metadata.broker.list" -> "s1:9092")
    
        def exist(u: Int): Boolean = {
          val userlist = hc.sql("select distinct(userid) from trainingdata").rdd.map(x => x.getInt(0)).toArray()
          userlist.contains(u)
        }
    
        // 为没有登录的用户推荐电影的策略:
        // 1.推荐观看人数较多的电影,采用这种策略
        // 2.推荐最新的电影
        val defaultrecresult = hc.sql("select * from pop5result").rdd.toLocalIterator
    
        // 创建SparkStreaming接收kafka消息队列数据的2种方式
        // 一种是Direct approache,通过SparkStreaming自己主动去Kafka消息队
        // 列中查询还没有接收进来的数据,并把他们拿到sparkstreaming中。
        val kafkaDirectStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
    
        val model = MatrixFactorizationModel.load(ssc.sparkContext, modelpath)
    
        val messages = kafkaDirectStream.foreachRDD { rdd =>
    //      println(rdd)
          val userrdd = rdd.map(x => x._2.split(",")).map(x => x(1)).map(_.toInt)
          val validusers = userrdd.filter(user => exist(user))
          val newusers = userrdd.filter(user => !exist(user))
          // 采用迭代器的方式来避开对象不能序列化的问题。
          // 通过对RDD中的每个元素实时产生推荐结果,将结果写入到redis,或者其他高速缓存中,来达到一定的实时性。
          // 2个流的处理分成2个sparkstreaming的应用来处理。
          val validusersIter = validusers.toLocalIterator
          val newusersIter = newusers.toLocalIterator
          while (validusersIter.hasNext) {
            val recresult = model.recommendProducts(validusersIter.next, 5)
            println("below movies are recommended for you :")
            println(recresult)
          }
          while (newusersIter.hasNext) {
            println("below movies are recommended for you :")
            for (i <- defaultrecresult) {
              println(i.getString(0))
            }
          }
        }
        ssc.start()
        ssc.awaitTermination()
      }
    }
    

    5 问题

    依旧不熟悉scala语言,在使用Spark时很多东西依旧不知道

    参考文献

    http://www.dajiangtai.com/course/56.do


    end

    相关文章

      网友评论

        本文标题:Spark实时离线电影推荐系统

        本文链接:https://www.haomeiwen.com/subject/eoipcqtx.html