美文网首页
Spark SQL 操作MySQL

Spark SQL 操作MySQL

作者: 歌哥居士 | 来源:发表于2019-03-28 08:47 被阅读0次
准备数据
mysql> create database if not exists testdb default charset utf8;
mysql> use testdb;
mysql> create table testTable(
    id integer not null,
    name varchar(20) not null,
    age integer not null
)ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
mysql> insert into testTable values(1,'baozi',10),(2,'baozi2',20),(3,'baozi3',30);
mysql> select * from testTable;
+----+--------+-----+
| id | name   | age |
+----+--------+-----+
|  1 | baozi  |  10 |
|  2 | baozi2 |  20 |
|  3 | baozi3 |  30 |
+----+--------+-----+
如果是在idea中开发需要添加pom,如果是spark-shell中就不需要了,因为spark/jars中放了mysql的jar包
<!-- mysql驱动连接 -->
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.37</version>
</dependency>

方式1

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object MySQLApp {
  def main(args: Array[String]): Unit = {
    // ---------------------------
    val spark = SparkSession.builder()
      .master("local[2]")
      .config(new SparkConf().set("spark.driver.host", "localhost"))
      .getOrCreate()


    // 1. 数据源 -> DataFrame
    val jdbcDF = spark.read.format("jdbc")
      .option("url", "jdbc:mysql://localhost:3306?useSSL=false")
      .option("dbtable", "testdb.testTable")
      .option("user", "root")
      .option("password", "root")
      .option("driver", "com.mysql.jdbc.Driver")
      .load()

    // 2. 操作DataFrame
    jdbcDF.show()


    // 3. DataFrame -> 数据源
    jdbcDF.write.format("jdbc")
      .option("url", "jdbc:mysql://localhost:3306?useSSL=false")
      .option("dbtable", "testdb.testTable2") // 写出到新的表
      .option("user", "root")
      .option("password", "root")
      .option("driver", "com.mysql.jdbc.Driver")
      .save()
    

    // ---------------------------
    spark.stop()
  }
}

方式2

import java.util.Properties

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object MySQLApp {
  def main(args: Array[String]): Unit = {
    // ---------------------------
    val spark = SparkSession.builder()
      .master("local[2]")
      .config(new SparkConf().set("spark.driver.host", "localhost"))
      .getOrCreate()


    // 1. 数据源 -> DataFrame
    val connProperties = new Properties()
    connProperties.put("user", "root")
    connProperties.put("password","root")
    connProperties.put("driver","com.mysql.jdbc.Driver")
    val jdbcDF = spark.read.jdbc(
      "jdbc:mysql://localhost:3306?useSSL=false",
      "testdb.testTable",
      connProperties
    )

    // 2. 操作DataFrame
    jdbcDF.show()

    // 3. DataFrame -> 数据源
    jdbcDF.write.jdbc(
      "jdbc:mysql://localhost:3306?useSSL=false",
      "testdb.testTable3", // 同样是写出到新的表
      connProperties
    )

    // ---------------------------
    spark.stop()
  }
}

方式3

// 1. 创建临时表
CREATE TEMPORARY VIEW jdbcTable
USING org.apache.spark.sql.jdbc
OPTIONS (
    url "jdbc:mysql://localhost:3306?useSSL=false",
    dbtable "testdb.testTable",
    user 'root',
    password 'root',
    driver 'com.mysql.jdbc.Driver'
);

// 2. 操作
spark-sql> select * from jdbcTable;

// 3. 插入新数据(这里是插入新数据到表中,而上面两个例子是写入到一个新的表中)
spark-sql> insert into jdbcTable values(4,'baozi4',40);

相关文章

网友评论

      本文标题:Spark SQL 操作MySQL

      本文链接:https://www.haomeiwen.com/subject/yemcvqtx.html