美文网首页
Spark-SQL之DataSet操作实战

Spark-SQL之DataSet操作实战

作者: 张明洋_4b13 | 来源:发表于2019-03-10 11:34 被阅读0次

    数据集
    MovieLens 1M Dataset
    http://files.grouplens.org/datasets/movielens/ml-1m.zip

    users.dat
    UserID::Gender::Age::Occupation::Zip-code

    image

    movies.dat
    MovieID::Title::Genres

    image

    ratings.dat
    UserID::MovieID::Rating::Timestamp

    image

    maven

     <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_2.11</artifactId>
          <version>2.3.1</version>
        </dependency>
           <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-sql_2.11</artifactId>
          <version>2.3.1</version>
        </dependency>
    
    

    数据导入

      case class user(uid:String, gender:String, age:Int)
        case class rating(uid:String, mid:String, rat:Double)
        case class movie(mid:String, title:String)
        def main(args: Array[String]) {
    
      val spark = SparkSession
              .builder()
              .master("local[4]")
              .appName("test")
              .config("spark.sql.shuffle.partitions", "5")
              .getOrCreate()
    
            val sc = spark.sparkContext
            val sqlContext = spark.sqlContext
            import sqlContext.implicits._
    
            val root = "D:/ml-1m/"
            val userRdd=sc.textFile(root+"users.dat").map(_.split("::"))
            val userdf=userRdd.map(x=>user(x(0),x(1),x(2).toInt)).toDF
    
            val movieRdd=sc.textFile(root+"movies.dat").map(_.split("::"))
            val moviedf=movieRdd.map(x=>movie(x(0),x(1))).toDF
    
            val ratingRdd=sc.textFile(root+"ratings.dat").map(_.split("::"))
            val ratingdf=ratingRdd.map(x=>rating(x(0),x(1),x(2).toDouble)).toDF
    }
    
    

    年龄段在“18-24”的男性年轻人,最喜欢看哪10部电影

    val youngmale=userdf.filter("18<=age and gender='M'")
        val youngratting=ratingdf.select("uid","mid")
        val youngmovies=youngmale.join(youngratting, "uid").groupBy("mid").count.sort(-$"count").limit(10)
        youngmovies.join(moviedf,"mid").select("title").show(false)
    
    
    image

    得分最高的10部电影

      val top=ratingdf.groupBy("mid").agg("uid"->"count","rat"->"sum").withColumnRenamed("count(uid)","count").withColumnRenamed("sum(rat)","sum")//不改名则在SQL中不能用名来读到新增的这两列,不知为何
            top.createOrReplaceTempView("top")
            val top10=sqlContext.sql("select mid,(sum/count) as avgscore from top").sort(-$"avgscore").limit(10)
            top10.join(moviedf,"mid").select("title").show(false)
    
    
    image

    看过电影最多的前10个人

    ratingdf.groupBy("uid").count().sort(-$"count").show(10)
    
    
    image

    男性看过最多的10部电影

        val male=userdf.filter("gender='M'").select("uid")
        val allmovies=ratingdf.select("uid","mid")
        val maleid=male.join(allmovies,"uid").groupBy("mid").count.sort(-$"count").limit(10)//.repartition($"count")没用
        maleid.join(moviedf,"mid").select("title").show(false)
    
    
    image

    女性看多最多的10部电影

       val female =userdf.filter("gender='F'").select("uid")
            val femaleid=female.join(allmovies,"uid").groupBy("mid").count.sort(-$"count").limit(10)
            femaleid.join(moviedf,"mid").select("title").show(false)
    
    
    image

    相关文章

      网友评论

          本文标题:Spark-SQL之DataSet操作实战

          本文链接:https://www.haomeiwen.com/subject/cxdupqtx.html