美文网首页
Spark开发 之 SQL简介

Spark开发 之 SQL简介

作者: 诺之林 | 来源:发表于2021-04-23 22:06 被阅读0次

    本文基于Spark架构

    目录

    引入

    • 早期大数据开发者都是从Web转型而来 SQL又是Web开发者必备技能

    • Spark SQL提供了Data Frame 以简化RDD开发

    定义

    • Data Frame = 以RDD为基础的分布式数据集

    • Data Frame = RDD + Scheme

    image.png

    特点

    整合RDD和SQL

    cat /opt/services/spark/examples/src/main/resources/people.txt
    # Michael, 29
    # Andy, 30
    # Justin, 19
    
    /opt/services/spark/bin/spark-shell
    
    case class People(name: String, age: Long)
    
    val rdd = sc.textFile("/opt/services/spark/examples/src/main/resources/people.txt")
    
    val mapRDD = rdd.map(_.split(",")).map(attributes => People(attributes(0), attributes(1).trim.toInt))
    
    val filterRDD = mapRDD.filter(_.age > 20)
    
    filterRDD.foreach(p => println(s"${p.name} ${p.age}"))
    

    关于Scala字符串插值 可以参考Scala字符串插值

    case class People(name: String, age: Long)
    
    val rdd = sc.textFile("/opt/services/spark/examples/src/main/resources/people.txt")
    
    # import spark.implicits._
    val df = rdd.map(_.split(",")).map(attributes => People(attributes(0), attributes(1).trim.toInt)).toDF()
    
    df.createOrReplaceTempView("people")
    
    spark.sql("SELECT * FROM people WHERE age > 20").show()
    
    +-------+---+
    |   name|age|
    +-------+---+
    |Michael| 29|
    |   Andy| 30|
    +-------+---
    

    统一数据访问

    cat /opt/services/spark/examples/src/main/resources/people.json
    # {"name":"Michael"}
    # {"name":"Andy", "age":30}
    # {"name":"Justin", "age":19}
    
    /opt/services/spark/bin/spark-shell
    
    val df = spark.read.json("/opt/services/spark/examples/src/main/resources/people.json")
    
    df.createOrReplaceTempView("people")
    
    spark.sql("SELECT * FROM people WHERE age > 20").show()
    
    +---+----+
    |age|name|
    +---+----+
    | 30|Andy|
    +---+----+
    

    标准数据连接

    docker run --name spark-mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 -d mysql:5.7.17
    
    docker exec -it spark-mysql /bin/bash
    
    mysql -uroot -p123456
    
    CREATE DATABASE IF NOT EXISTS db_spark DEFAULT CHARSET utf8 COLLATE utf8_general_ci;
    
    USE db_spark;
    
    CREATE TABLE users ( \
      id int(10) unsigned NOT NULL AUTO_INCREMENT, \
      name varchar(20) DEFAULT NULL COMMENT '用户名', \
      PRIMARY KEY (`id`) \
    );
    
    INSERT INTO users VALUES (1, 'XiaoWang');
    
    INSERT INTO users VALUES (2, 'XiaoMing');
    
    # cd /opt/services
    # wget https://mirror.tuna.tsinghua.edu.cn/mysql/downloads/Connector-J/mysql-connector-java-5.1.49.tar.gz
    # tar xf mysql-connector-java-5.1.49.tar.gz
    /opt/services/spark/bin/spark-shell --jars /opt/services/mysql-connector-java-5.1.49/mysql-connector-java-5.1.49-bin.jar
    
    # :paste
    val df = spark.read.format("jdbc")
        .option("url", "jdbc:mysql://localhost:3306/db_spark")
        .option("driver", "com.mysql.jdbc.Driver")
        .option("user", "root")
        .option("password", "123456")
        .option("dbtable", "users")
        .load()
    # Ctrl + D
    
    df.createOrReplaceTempView("users")
    
    val sqlDF = spark.sql("SELECT * FROM users WHERE name = 'XiaoMing'")
    
    sqlDF.show()
    
    +---+--------+
    | id|    name|
    +---+--------+
    |  2|XiaoMing|
    +---+--------+
    

    参考

    相关文章

      网友评论

          本文标题:Spark开发 之 SQL简介

          本文链接:https://www.haomeiwen.com/subject/bebcqltx.html