美文网首页
sparkSql外部数据源

sparkSql外部数据源

作者: Aluha_f289 | 来源:发表于2020-06-02 21:19 被阅读0次

    1、读取json

    2、读取csv和tsv

    3、ObjectFile

    4、读取hdfs中的数据

    5、读取Parquet文件

    6、读取Hive 和mysql

    读取json文件

    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[*]")
          .setAppName(this.getClass.getName)
        val sc = new SparkContext(conf)
        val inputJsonFile = sc.textFile("D:\\studyplace\\sparkBook\\chapter4\\data\\chapter4_3_2.json")
    
        val content = inputJsonFile.map(JSON.parseFull)
    
        println(content.collect.mkString(","))
        //遍历
        content.foreach(
          {
            case Some(map : Map[String,Any]) => println(map)
            case None => println("无效的JSON")
            case _ => println("其他异常...")
          }
        )
        sc.stop()
      }
    

    注意:json文件中必须是完整的json字符串,并且是同一个文件

    读取csv和tsv文件

    csv文件为逗号分隔符,tsv为制表符分隔符

    val inputFile = sc.textFile("文件路径")
    inputFile.flatMap(_.split("分隔符"))
    

    读取SequenceFile

    只有键值对的数据才能用sequenceFile格式存储,类比java中Map,scala中Tuple2
    sequenceFile可以逐条压缩数据,也可以压缩整个数据块,默认不启用压缩

    val inputFile = sc.sequenceFile[String,String]("文件路径")
    

    泛型为读取出的key和value的数据类型

    读取ObjectFile格式的数据

    spark可以读取Object格式的数据生成RDD,RDD每一个元素都可以被还原成之前的对象
    定义一个类

    package chapter4
    
    case class Person(name: String, age: Int)
    

    读取数据

    import chapter4.Person
    import org.apache.spark.{SparkConf, SparkContext}
    object chapte4_3_5 {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
          .setAppName(this.getClass.getName)
          .setMaster("local[*]")
        val sc = new SparkContext(conf)
        val rddData = sc.objectFile[Person]("D:\\studyplace\\sparkBook\\chapter4\\data\\chapter4_3_5.object")
        println(rddData.collect.toList)
        sc.stop()
      }
    }
    

    对象序列化为数据,保留对象的原始信息,包括包名,因此泛型Person必须一致

    读取hdfs中的数据(显式调用hadoopAPI)

    import org.apache.hadoop.io.{LongWritable, Text}
    import org.apache.hadoop.mapred.TextInputFormat
    import org.apache.spark.{SparkConf, SparkContext}
    
    object chapter4_3_6 {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
          .setMaster("local[*]")
          .setAppName("chapter4_3_6")
        val sc = new SparkContext(conf)
    
        val path = "hdfs://ip:8020/路径"
        val inputHadoopFile = sc.newAPIHadoopFile[LongWritable,Text,TextInputFormat](path)
    
        val result = inputHadoopFile.map(_._2.toString).collect()
        println(result.mkString(","))
        sc.stop()
      }
    }
    

    对于 newAPIHadoopFile[LongWritable,Text,TextInputFormat] 第一个泛型LongWritable 是hadoop读取文件的偏移量,Text是偏移量对应的数据内容,TextInputFormat
    直接对inputHadoopFile.collect.mkString(",")会报序列化错误,
    Writable的子类型(LongWritable,IntWritable,Text)需要通过inputHadoopFile.map(_._2.toString) j进行序列化

    读取mysql中的数据

    导入依赖

    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.40</version>
    </dependency>
    
    package chapter4
    
    import java.sql.DriverManager
    
    import org.apache.spark.rdd.JdbcRDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    object chapter4_3_7 {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("chapter4_3_7").setMaster("local[*]")
        val sc = new SparkContext(conf)
    
        val inputMysql = new JdbcRDD(sc, () => {
          Class.forName("com.mysql.jdbc.Driver")
          DriverManager.getConnection("jdbc:mysql://localhost:3306/spark?" +
            "useUnicode=true&characterEncoding=utf-8", "root", "123456")
        },
          "select * from person where id >= ? and id <= ?;",
          1,  //查询条件上界
          3, //查询条件下界
          1,  //分区数
          r => (r.getInt(1), r.getString(2), r.getInt(3)))
    
        println("查询到的记录条目数:"+inputMysql.count)
        inputMysql.foreach(println)
        sc.stop()
      }
    }
    
    

    操作Parquet文件

    package com.imooc.spark
    
    import org.apache.spark.sql.SparkSession
    
    /**
     * Parquet文件操作
     */
    object ParquetApp {
    
      def main(args: Array[String]) {
    
        val spark = SparkSession.builder().appName("SparkSessionApp")
          .master("local[2]").getOrCreate()
    
    
        /**
         * spark.read.format("parquet").load 这是标准写法
         */
        val userDF = spark.read.format("parquet").load("file:///home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/users.parquet")
    
        userDF.printSchema()
        userDF.show()
    
        userDF.select("name","favorite_color").show
    
        userDF.select("name","favorite_color").write.format("json").save("file:///home/hadoop/tmp/jsonout")
    
        spark.read.load("file:///home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/users.parquet").show
    
        //会报错,因为sparksql默认处理的format就是parquet
        spark.read.load("file:///home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json").show
    
        spark.read.format("parquet").option("path","file:///home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/users.parquet").load().show
        spark.stop()
      }
    
    }
    

    读取Hive 和mysql

    package com.imooc.spark
    
    import org.apache.spark.sql.SparkSession
    
    /**
     * 使用外部数据源综合查询Hive和MySQL的表数据
     */
    object HiveMySQLApp {
    
      def main(args: Array[String]) {
        val spark = SparkSession.builder().appName("HiveMySQLApp")
          .master("local[2]").getOrCreate()
    
        // 加载Hive表数据
        val hiveDF = spark.table("emp")
    
        // 加载MySQL表数据
        val mysqlDF = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306").option("dbtable", "spark.DEPT").option("user", "root").option("password", "root").option("driver", "com.mysql.jdbc.Driver").load()
    
        // JOIN
        val resultDF = hiveDF.join(mysqlDF, hiveDF.col("deptno") === mysqlDF.col("DEPTNO"))
        resultDF.show
    
    
        resultDF.select(hiveDF.col("empno"),hiveDF.col("ename"),
          mysqlDF.col("deptno"), mysqlDF.col("dname")).show
    
        spark.stop()
      }
    
    }
    

    参考:http://www.mamicode.com/info-detail-2214729.html

    相关文章

      网友评论

          本文标题:sparkSql外部数据源

          本文链接:https://www.haomeiwen.com/subject/fxbpzhtx.html