美文网首页
Spark SQL程序

Spark SQL程序

作者: 羋学僧 | 来源:发表于2020-10-16 20:42 被阅读0次

    在IDEA中开发Spark SQL程序

    1、指定Schema格式

    数据E:/RmDownloads/student.txt


    SparkSQLDemo1.scala
    //指定schema,创建DataFrame
    object SparkSQLDemo1 {
      def main(args: Array[String]): Unit = {
        //创建SparkSession
        val spark = SparkSession.builder().master("local").appName("SparkSQLDemo1").getOrCreate()
    
        //从指定文件中读取数据,生成对应的RDD
        val studentRDD = spark.sparkContext.textFile("E:\\RmDownloads\\student.txt").map(_.split(" "))
    
        //创建schema,通过StructType
        val schema =types.StructType(
          List(
            StructField("id",IntegerType,true),
            StructField("name",StringType,true),
            StructField("age",IntegerType,true)
          )
        )
        //将RDD映射到RowRDD行的数据上
        val rowRDD = studentRDD.map(student => Row(student(0).toInt, student(1), student(2).toInt))
    
        //生成DataFrame
        val studentDF =spark.createDataFrame(rowRDD,schema)
    
        //将DF注册成表/视图
        studentDF.createOrReplaceTempView("student")
    
        //执行SQL
        val result = spark.sql("select * from student").show()
    
        //释放资源
        spark.stop()
    
      }
    
    }
    
    

    2、使用case class

    SparkSQLDemo2.scala

    //使用case class
    object SparkSQLDemo2 {
      def main(args: Array[String]): Unit = {
        //创建SparkSession
        val spark = SparkSession.builder().master("local").appName("SparkSQLDemo2").getOrCreate()
    
        //从指定文件中读取数据,生成对应的RDD
        val studentRDD = spark.sparkContext.textFile("E:\\RmDownloads\\student.txt").map(_.split(" "))
    
        //将数据的RDD和case class关联起来
        val dataRDD = studentRDD.map(x => Student(x(0).toInt, x(1), x(2).toInt))
    
        //生成DataFrame,通过RDD生成DataFrame,导入隐私转换
        import spark.sqlContext.implicits._
    
        val studentDF = dataRDD.toDF()
    
        //注册表,视图
        studentDF.createOrReplaceTempView("student")
    
        //执行sql
        spark.sql("select * from student").show()
    
        //释放资源
        spark.stop()
      }
    
    }
    //定义case class代表schema结构
    case class Student(stuID:Int,stuName:String,stuAge:Int)
    

    就数据保存到数据库

    SparkSQLDemo3.scala
    lib文件里添加mysql-connector-java-5.1.40-bin.jar

    import org.apache.spark.sql.{Row, SparkSession}
    import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
    
    object SparkSQLDemo3 {
    
      def main(args: Array[String]): Unit = {
        //创建SparkSession
        val spark = SparkSession.builder().master("local").appName("SparkSQLDemo3").getOrCreate()
    
        //从指定的文件中读取数据,生成对应的RDD
        val studentRDD = spark.sparkContext.textFile("E:\\RmDownloads\\student.txt").map(_.split(" "))
    
        //创建schema ,通过StructType
        val schema = StructType(
          List(
            StructField("id",IntegerType,true),
            StructField("name",StringType,true),
            StructField("age",IntegerType,true)
          )
        )
    
        //将RDD映射到Row RDD 行的数据上
        val rowRDD = studentRDD.map(student => Row(student(0).toInt, student(1), student(2).toInt))
        //生成DataFrame
        val studentDF = spark.createDataFrame(rowRDD,schema)
    
        //将DF注册成表
        studentDF.createOrReplaceTempView("students")
    
        //执行SQL
        val result = spark.sql("select * from students")
    
        //显示
        result.show()
    
        //将结果保存到mysql中
        val mysqlprops = new Properties()
    
        // Class.forName("com.mysql.jdbc.Driver")
    
        mysqlprops.setProperty("user","bigdata")
    
        mysqlprops.setProperty("password","123456")
    
        result.write.jdbc("jdbc:mysql://bigdata02:3306/sqoopdb","students",mysqlprops)
    
        //如果表已经存在,append的方式数据
    //    result.write.mode("append").jdbc("jdbc:mysql://bigdata02:3306/sqoopdb","students",mysqlprops)
    
        //停止spark context
        spark.stop()
      }
    }
    
    

    相关文章

      网友评论

          本文标题:Spark SQL程序

          本文链接:https://www.haomeiwen.com/subject/rurtmktx.html