Spark-SQL

作者: 安申 | 来源:发表于2020-05-13 16:58 被阅读0次

    1.在新版本中,SparkSession是Spark最新的SQL查询起点,实质上是SQLContext和HiveContext的组合。SparkSession内部封装了sparkContext,所以计算实际上是由sparkContext完成的。

    2.创建DataFrame有三种方式

    (1)通过Spark的数据源进行创建;

    val df:DataFrame = spark.read.csv("./temp/aaa.csv")

    (2)从一个存在的RDD进行转换;

    注意:如果需要RDD与DF或者DS之间操作,那么都需要引入import spark.implicits._  (spark不是包名,而是sparkSession对象的名称)

    (3)还可以从Hive Table进行查询返回。

    val frame1:DataFrame = spark.table("tablename")

    3.代码示例:

    (1)添加依赖:

    <dependency>

        <groupId>org.apache.spark</groupId>

        <artifactId>spark-sql_2.11</artifactId>

        <version>2.1.1</version>

    </dependency>

    (2)object HelloWorld {

                  def main(args: Array[String]) {

                    //创建SparkConf()并设置App名称

                     val spark = SparkSession.builder().appName("HelloWorld").master("local[*]").getOrCreate()

                    //导入隐式转换

                      import spark.implicits._

                    //读取本地文件,创建DataFrame

                    val df =spark.read.json("examples/src/main/resources/people.json")

                    //打印

                    df.show()

                    //DSL风格:查询年龄在21岁以上的

                    df.filter($"age"> 21).show()

                    //创建临时表

                    df.createOrReplaceTempView("persons")

                    //SQL风格:查询年龄在21岁以上的

                    spark.sql("SELECT * FROM persons where age > 21").show()

                    //关闭连接

                    spark.stop()

        }

    }

    4.Spark SQL数据的加载与保存

    加载数据:spark.read.文件格式

    保存数据:df.write.文件格式

    5.Spark SQL通过JDBC连接MySQL

    Spark SQL可以通过JDBC从关系型数据库中读取数据的方式创建DataFrame,通过对DataFrame一系列的计算后,还可以将数据再写回关系型数据库中。

    定义JDBC相关参数配置信息

    val connectionProperties = new Properties()

    connectionProperties.put("user","root")

    connectionProperties.put("password","000000")

    //使用read.jdbc加载数据

    val jdbcDF2 = spark.read.jdbc("jdbc:mysql://hadoop102:3306/rdd","rddtable", connectionProperties)

    //使用write.jdbc保存数据

    jdbcDF2.write.jdbc("jdbc:mysql://hadoop102:3306/mysql","db", connectionProperties)

    Spark SQL相关语法

    1.filter(condition):根据字段进行筛选

    和where使用条件相同

    jdbcDF .filter("id = 1 or c1 = 'b'" ).show()

    即:过滤出来满足condition条件的,注意,是满足条件的数据被过滤出来

    2.selectExpr:对指定字段进行特殊处理

    可以直接对指定字段调用UDF函数,或者指定别名等。即,对传入的字段进行特殊处理,可以转换形式,取别名等。

    示例,查询id字段,c3字段取别名time,c4字段四舍五入:

    jdbcDF .selectExpr("id" , "c3 as time" , "round(c4)" ).show()

    ps:众所周知,取别名的时候,可以用as,也可以省略。即:"c3 as time","c3 time"都对

    3.col:获取指定字段

    只能获取一个字段,返回对象为Column类型。

    val idCol = jdbcDF.col(“id”)

    4.drop:去除指定字段保留其他字段

    返回一个新的DataFrame对象,其中不包含去除的字段,一次只能去除一个字段。

    示例:jdbcDF.drop("id")

    5.还有就是两个DS或DF进行join,其实最后的结果是两个DS或DF的所有字段

    代码:

    val spark = SparkSession.builder().appName("JoinDemo").master("local[*]").getOrCreate()

    val context=spark.sqlContext

    val data1=context.createDataFrame(List(("b","Bob",36))).toDF("id","name","age")

    val data2 = context.createDataFrame(List(("staff","Bob",66666))).toDF("job","name","salary")

    data1.join(data2,data1.col("name").equalTo(data2.col("name"))).show()

    结果:

    +---+----+---+-----+----+------+

    | id|name|age|  job|name|salary|

    +---+----+---+-----+----+------+

    |  b| Bob| 36|staff| Bob| 66666|

    +---+----+---+-----+----+------+

    相关文章

      网友评论

          本文标题:Spark-SQL

          本文链接:https://www.haomeiwen.com/subject/cmpnnhtx.html