美文网首页大数据
Spark实时离线电影推荐系统

Spark实时离线电影推荐系统

作者: 艾剪疏 | 来源:发表于2018-12-04 09:42 被阅读442次

1 项目介绍
2 涉及的技术
3 推荐流程图
4 收获
5 问题

1 项目介绍

  1. 使用Spark框架实现电影推荐系统;
  2. 运用数据挖掘的算法产生模型,为用户精准推荐喜好的电影;
  3. 分别通过离线和实时两种方式实现电影推荐系统;

2 涉及技术

  1. Spark:基于内存的分布式计算框架

  2. Hadoop:分布式离线计算框架

  3. Hive:基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供简单的sql查询功能,可以将sql语句转换为MapReduce任务进行运行

  4. Kafka:分布式高并发消息队列,负责缓存Flume采集的数据并为下游的各种计算提供高并发的数据处理

  5. Hbase:亿级行百万列并可毫秒级查询的数据库,可快速查询我们的计算数据

  6. Phoenix:是构建在HBase上的SQL中间层,Phoenix查询引擎会将SQL查询转换为一个或者多个HBase Scan,并行执行以生成标准的JDBC结果集。

3 推荐流程图

image.png

解释如下:

  1. 加载HDFS数据,处理之后存储到Hive中;
  2. 离线推荐部分技术处理思路;
  • 从Hive中加载训练数据和测试数据
  • 使用SparkMLlib的ALS交替最小二乘法训练模型
  • 使用模型产生推荐结果
  • 将推荐结果写入到Mysql、Hive、Phoenix+Hbase中
  1. 实时推荐部分技术处理思路;
  • 从Hive中拿出数据
  • 取出测试数据集中数据,send到Kafka中。
  • 通过SparkStreaming主动Kafka消息队列获取数据,并根据用户是否为新用户制定推荐策略
  • 新用户,从训练数据集中取出浏览人数最多的电影的前5部作为推荐结果
  • 老用户,使用推荐模型为用户推荐5部电影
image.png

4 收获

1 大数据环境搭建

(1)单机版Hadoop、Spark、Hive、Mysql的搭建
Spark处理HDFS数据,并将结果存储在Hive中
配置一台Hive + Mysql元数据库

2 数据初始预处理

object ETL {
  def main(args: Array[String]): Unit = {
    val localClusterURL = "local[2]"
    val clusterMasterURL = "spark://s1:7077"
    val conf = new SparkConf().setAppName("ETL").setMaster(clusterMasterURL)
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val hc = new HiveContext(sc)
    import sqlContext.implicits._
    hc.sql("use moive_recommend")

    // 设置RDD的partition的数量一般以集群分配给应用的CPU核数的整数倍为宜。
    val minPartitions = 8
    // 通过case class来定义Links的数据结构,数据的schema,适用于schama已知的数据
    // 也可以通过StructType的方式,适用于schema未知的数据,具体参考文档:
    //http://spark.apache.org/docs/1.6.2/sql-programming-guide.html#programmatically-specifying-the-schema
    val links = sc.textFile("hdfs://localhost/home/spark/temp/moiveRec/links.txt",minPartitions).filter{ !_.endsWith(",")}.map(_.split(","))
      .map(x =>Links(x(0).trim.toInt,x(1).trim().toInt,x(2).trim().toInt)).toDF()

    val movies = sc.textFile("hdfs://localhost/home/spark/temp/moiveRec/movies.txt",minPartitions).filter{ !_.endsWith(",")}.map(_.split(","))
      .map(x =>Movies(x(0).trim.toInt,x(1).trim(),x(2).trim())).toDF()

    val ratings = sc.textFile("hdfs://localhost/home/spark/temp/moiveRec/ratings.txt",minPartitions).filter{ !_.endsWith(",")}.map(_.split(","))
      .map(x =>Ratings(x(0).trim.toInt,x(1).trim().toInt,x(2).trim().toDouble,x(3).trim().toInt)).toDF()

    val tags = sc.textFile("hdfs://localhost/home/spark/temp/moiveRec/tags.txt", minPartitions).filter { !_.endsWith(",") }.map(x=>rebuild(x))
      .map(_.split(",")).map(x => Tags(x(0).trim().toInt, x(1).trim().toInt, x(2).trim(), x(3).trim().toInt)).toDF()


    links.write.mode(SaveMode.Overwrite).parquet("/home/spark/temp/moiveRec/links")
    hc.sql("drop table if exists links")
    hc.sql("create table if not exists links(movieId int,imdbId int,tmdbId int) stored as parquet" )
    hc.sql("load data inpath '/home/spark/temp/moiveRec/links' overwrite into table links")


    movies.write.mode(SaveMode.Overwrite).parquet("/home/spark/temp/moiveRec/movies")
    hc.sql("drop table if exists movies")
    hc.sql("create table if not exists movies(movieId int,title string,genres string) stored as parquet" )
    hc.sql("load data inpath '/home/spark/temp/moiveRec/movies' overwrite into table movies")


    ratings.write.mode(SaveMode.Overwrite).parquet("/home/spark/temp/moiveRec/ratings")
    hc.sql("drop table if exists ratings")
    hc.sql("create table if not exists ratings(userId int,movieId int,rating double,timestamp int) stored as parquet" )
    hc.sql("load data inpath '/home/spark/temp/moiveRec/ratings' overwrite into table ratings")


    tags.write.mode(SaveMode.Overwrite).parquet("/home/spark/temp/moiveRec/tags")
    hc.sql("drop table if exists tags")
    hc.sql("create table if not exists tags(userId int,movieId int,tag string,timestamp int) stored as parquet")
    hc.sql("load data inpath '/home/spark/temp/moiveRec/tags' overwrite into table tags")
  }

  // tags中大部分数据格式如下:
  //    4208,260,Action-packed,1438012536
  // 但会出现如下的数据:
  //    4208,260,"Family,Action-packed",1438012562
  // 这样对数据split后插入hive中就会出错,需清洗数据:
  //    4208,260,"Family,Action-packed",1438012562 => 4208,260,FamilyAction-packed,1438012562
  private def rebuild(input:String):String = {
    val a = input.split(",")
    val head = a.take(2).mkString(",")//提取列表的前2个元素
    val tail = a.takeRight(1).mkString//提取列表的最后1个元素
    val b = a.drop(2).dropRight(1).mkString.replace("\"", "")
    val output = head + "," + b + "," + tail
    output
  }
}

3 Hive的使用

配置一台Hive + Mysql元数据库

4 SparkMLlib机器学习算法库的使用

/**
  * KafkaProducer从测试数据集中取出数据
  */
object Spark_MovieTraining extends AppConf {
  def main(args: Array[String]): Unit = {
    hc.sql("use moive_recommend")
    // 训练集,总数据集的60%
    val trainingData = hc.sql("select * from trainingData")
    val ratingRDD = hc.sql("select * from trainingData")
      .rdd.map(x => Rating(x.getInt(0), x.getInt(1), x.getDouble(2)))

    // Build the recommendation model using ALS
    val rank = 10
    val numIterations = 10
    val model = ALS.train(ratingRDD, rank, numIterations, 0.01)

    // Evaluate the model on rating data
    val training = ratingRDD.map {
      case Rating(userid, movieid, rating) => (userid, movieid)
    }

    ratingRDD.persist()
    training.persist()

    val predictions =
      model.predict(training).map {
        case Rating(userid, movieid, rating) => ((userid, movieid), rating)
      }

    val ratesAndPreds = ratingRDD.map { case Rating(userid, movieid, rating) =>
      ((userid, movieid), rating)
    }.join(predictions)


    val MSE = ratesAndPreds.map { case ((userid, movieid), (r1, r2)) =>
      val err = (r1 - r2)
      err * err
    }.mean()

    println(s"Mean Squared Error = $MSE")

    // Save and load model
    model.save(sc, s"/home/spark/temp/moiveRec/BestModel1/$MSE")
    //val sameModel = MatrixFactorizationModel.load(sc, "/home/spark/temp/moiveRec/BestModel/")
  }
}

5 实时推荐部分Kafka + Streaming + Phoenix+Hbase流处理

object KafkaProducer extends AppConf {

  def main(args: Array[String]): Unit = {
    hc.sql("use moive_recommend")
    val testDF = hc.sql("select * from testData limit 10000")
    val prop = new Properties()
    // 指定kafka的 ip地址:端口号
    prop.put("bootstrap.servers", "s1:9092")
    // 设定ProducerRecord发送的key值为String类型
    prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    // 设定ProducerRecord发送的value值为String类型
    prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    val topic = "movie"
    val testData = testDF.map(
      x => (topic, x.getInt(0).toString() + "," + x.getInt(1).toString + "," + x.getDouble(2).toString())
    )
    val producer = new KafkaProducer[String, String](prop)
    // 如果服务器内存不够,会出现OOM错误
    val messages = testData.toLocalIterator
    while (messages.hasNext) {
      val message = messages.next()
      val record = new ProducerRecord[String, String](topic, message._1, message._2)
      println(record)
      producer.send(record)
      // 延迟10毫秒
      Thread.sleep(10)
    }
    producer.close()
  }
}
/**
  * 接收kafka产生的数据,进行处理
  */
object SparkDirectStream {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("SparkDirectStream").setMaster("spark://s1:7077")
    // Duration对象中封装了时间的一个对象,它的单位是ms.
    val batchDuration = new Duration(5000)
    // batchDuration为时间间隔
    val ssc = new StreamingContext(conf, batchDuration)
    val hc = new HiveContext(ssc.sparkContext)

    // 训练数据中是否有该用户
    val validusers = hc.sql("select * from trainingData")
    val userlist = validusers.select("userId")

    val modelpath = "/home/spark/temp/moiveRec/BestModel1/0.5366434001808432"
    val broker = "s1:9092"

    // val topics = "movie".split(",").toSet
    val topics = Set("movie")
    // val kafkaParams = Map("bootstrap.servers" -> "spark1:9092")
    val kafkaParams = Map("metadata.broker.list" -> "s1:9092")

    def exist(u: Int): Boolean = {
      val userlist = hc.sql("select distinct(userid) from trainingdata").rdd.map(x => x.getInt(0)).toArray()
      userlist.contains(u)
    }

    // 为没有登录的用户推荐电影的策略:
    // 1.推荐观看人数较多的电影,采用这种策略
    // 2.推荐最新的电影
    val defaultrecresult = hc.sql("select * from pop5result").rdd.toLocalIterator

    // 创建SparkStreaming接收kafka消息队列数据的2种方式
    // 一种是Direct approache,通过SparkStreaming自己主动去Kafka消息队
    // 列中查询还没有接收进来的数据,并把他们拿到sparkstreaming中。
    val kafkaDirectStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

    val model = MatrixFactorizationModel.load(ssc.sparkContext, modelpath)

    val messages = kafkaDirectStream.foreachRDD { rdd =>
//      println(rdd)
      val userrdd = rdd.map(x => x._2.split(",")).map(x => x(1)).map(_.toInt)
      val validusers = userrdd.filter(user => exist(user))
      val newusers = userrdd.filter(user => !exist(user))
      // 采用迭代器的方式来避开对象不能序列化的问题。
      // 通过对RDD中的每个元素实时产生推荐结果,将结果写入到redis,或者其他高速缓存中,来达到一定的实时性。
      // 2个流的处理分成2个sparkstreaming的应用来处理。
      val validusersIter = validusers.toLocalIterator
      val newusersIter = newusers.toLocalIterator
      while (validusersIter.hasNext) {
        val recresult = model.recommendProducts(validusersIter.next, 5)
        println("below movies are recommended for you :")
        println(recresult)
      }
      while (newusersIter.hasNext) {
        println("below movies are recommended for you :")
        for (i <- defaultrecresult) {
          println(i.getString(0))
        }
      }
    }
    ssc.start()
    ssc.awaitTermination()
  }
}

5 问题

依旧不熟悉scala语言,在使用Spark时很多东西依旧不知道

参考文献

http://www.dajiangtai.com/course/56.do


end

相关文章

网友评论

    本文标题:Spark实时离线电影推荐系统

    本文链接:https://www.haomeiwen.com/subject/eoipcqtx.html