美文网首页大数据学习
Spark Sql外部数据源

Spark Sql外部数据源

作者: TZX_0710 | 来源:发表于2020-03-17 20:13 被阅读0次

    Spark支持以下六个核心数据源,同时Spark提供了上百种数据源的读取方式。
    CSV、JSON、Parquet、ORC、JDBC/ODBC、Plain-text files
    SparkSql的StructField的dataType取值类型如下:
    ArrayType, BinaryType, BooleanType, CalendarIntervalType, DateType, HiveStringType, MapType, NullType, NumericType, ObjectType, StringType, StructType, TimestampType
    下图所需要的资源文件可以到百度云盘进行下载
    链接:https://pan.baidu.com/s/196haN0lWD9Px3MoJ6ybXww
    提取码:sgfs

    1. 读数据格式如下:
    spark.read.format("csv")
    .option("mode","FAILFAST")//读取模式
    .option("inferSchema","true")//是否自动推断schema
    .option("path","filePath")//文件路径
    .schema(schema)//使用预先定义的schema
    .load()
    读取模式取值有3个取值:
    1.permissive  当遇到损坏的记录时,将其所有字段设置为null,并将所有的损坏记录放在名为_conrruption
    2. dropMalformed 删除格式不正确的行
    3. failfast 遇到格式不正确的数据立即失败
    

    2.写数据格式

    dataframe.write.format("csv")//文件格式
    .mode("OVERWRITE")//写模式
    .option("dateFormat","yyyy-MM-dd")//日期格式
    .option("path","filePath")//文件地址  
    .save() 
    写模式有4种可选项:
    1.ErrorIfExists  如果给定的路径已经存在文件,则抛出异常。默认的模式
    2. Append 数据以追加的方式写入
    3. Ovverwrite 数据以覆盖的方式写入
    4. Ignore  如果给定的路径已经存在文件,则忽略。
    
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.types._
    
    object spark_sqlsource {
    
      def main(args: Array[String]): Unit = {
        //spark数据源读取
       var spark=SparkSession.builder().appName("sparkSqlSource").master("local").getOrCreate()
       val dataFrame=spark.read.format("csv")
          .option("header","false")//文件的第一行是否为列的名称
          .option("mode","FAILFAST")//是否快速失败
          .option("inferSchema","true")//自动推断schema
          .load("C:\\Users\\reality\\Desktop\\Api\\spark_demo\\src\\dept.csv")
         .show()
        //预定义类型
        val deptSchema=StructType(Array(
          StructField("deptNo",LongType,nullable = false),
          StructField("dname",StringType,nullable = true),
          StructField("loc",StringType,nullable = true),
          StructField("dateTime",DateType,nullable = true)))
    
       val df=spark.read.format("csv")
         .option("mode","failfast")
          .schema(deptSchema)
          .load("C:\\Users\\reality\\Desktop\\Api\\spark_demo\\src\\dept.csv")
    
        df.show()
    
      //写入csv文件
        df.write.format("csv")
        .mode("OVERWRITE")//如果存在则重写
          //日期数据格式化写入
        .option("dateFormat","yyyy")
        .save("C:\\Users\\reality\\Desktop\\Api\\spark_demo\\src\\dept2")
    
        //读取json文件
        val dfjson=spark.read.format("json")
          .option("mode","failfast")
          .load("C:\\Users\\reality\\Desktop\\Api\\spark_demo\\src\\dept.json")
        //写入json文件
        dfjson.write.format("json")
          .mode(saveMode = "overwrite")
          .save("C:\\Users\\reality\\Desktop\\Api\\spark_demo\\src\\dept2")
    
        //读取parquet文件 parquet是一个开源的面向列的存储数据
        spark.read.format("parquet")
          .option("mode","failfast")
          .load("C:\\Users\\reality\\Desktop\\Api\\spark_demo\\src\\dept.parquet")
          .show()
        //写入parquet文件
        dfjson.write.format("parquet")
          .mode("overwrite")
          .save("C:\\Users\\reality\\Desktop\\Api\\spark_demo\\src\\emp")
        //链接jdbc查询数据
        val sqlQuery= "(SELECT * FROM help_keyword WHERE help_keyword_id <20) AS help_keywords"
        spark.read.format("jdbc")
          .option("url","jdbc:mysql://127.0.0.1:3306/mysql")
          .option("driver","com.mysql.jdbc.Driver")
          .option("user","root")
          .option("password","root")
          .option("dbtable",sqlQuery)//查询sql语句
          .option("numPartitions","10")//设置并行度
          .load()
          .show()
        //写入数据 向jdbc 并且创建表
       val djson= spark.read.format("json").option("mode","failfast")
          .load("C:\\Users\\reality\\Desktop\\Api\\spark_demo\\src\\dept.json")
        djson.write.format("jdbc")
          .mode("append")
          .option("url","jdbc:mysql://127.0.0.1:3306/mysql")
          .option("driver","com.mysql.jdbc.Driver")
          .option("user","root")
          .option("password","root")
          .option("dbtable","emp")
          .save()
      }
    }
    //写入文件的时候可能会报错
     (null) entry in command string: null chmod 0644
    解决方案:把hadoop.dll  拷贝至C:\\windows\\system32目录下
    

    image.png
    写入文件

    option参数配置表:

    csv配置表
    CSV配置表
    json配置表
    jdbc配置表

    sparksql也有正常数据库操作的一些常用函数、如count、avg、distinct
    当然也可以自定义函数 ,包括一些左连接等查询 下面是参考资料
    spark-sql函数
    spark-sql联接查询

    相关文章

      网友评论

        本文标题:Spark Sql外部数据源

        本文链接:https://www.haomeiwen.com/subject/kpioehtx.html