准备数据
mysql> create database if not exists testdb default charset utf8;
mysql> use testdb;
mysql> create table testTable(
id integer not null,
name varchar(20) not null,
age integer not null
)ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
mysql> insert into testTable values(1,'baozi',10),(2,'baozi2',20),(3,'baozi3',30);
mysql> select * from testTable;
+----+--------+-----+
| id | name | age |
+----+--------+-----+
| 1 | baozi | 10 |
| 2 | baozi2 | 20 |
| 3 | baozi3 | 30 |
+----+--------+-----+
如果是在idea中开发需要添加pom,如果是spark-shell中就不需要了,因为spark/jars中放了mysql的jar包
<!-- mysql驱动连接 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.37</version>
</dependency>
方式1
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object MySQLApp {
def main(args: Array[String]): Unit = {
// ---------------------------
val spark = SparkSession.builder()
.master("local[2]")
.config(new SparkConf().set("spark.driver.host", "localhost"))
.getOrCreate()
// 1. 数据源 -> DataFrame
val jdbcDF = spark.read.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306?useSSL=false")
.option("dbtable", "testdb.testTable")
.option("user", "root")
.option("password", "root")
.option("driver", "com.mysql.jdbc.Driver")
.load()
// 2. 操作DataFrame
jdbcDF.show()
// 3. DataFrame -> 数据源
jdbcDF.write.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306?useSSL=false")
.option("dbtable", "testdb.testTable2") // 写出到新的表
.option("user", "root")
.option("password", "root")
.option("driver", "com.mysql.jdbc.Driver")
.save()
// ---------------------------
spark.stop()
}
}
方式2
import java.util.Properties
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object MySQLApp {
def main(args: Array[String]): Unit = {
// ---------------------------
val spark = SparkSession.builder()
.master("local[2]")
.config(new SparkConf().set("spark.driver.host", "localhost"))
.getOrCreate()
// 1. 数据源 -> DataFrame
val connProperties = new Properties()
connProperties.put("user", "root")
connProperties.put("password","root")
connProperties.put("driver","com.mysql.jdbc.Driver")
val jdbcDF = spark.read.jdbc(
"jdbc:mysql://localhost:3306?useSSL=false",
"testdb.testTable",
connProperties
)
// 2. 操作DataFrame
jdbcDF.show()
// 3. DataFrame -> 数据源
jdbcDF.write.jdbc(
"jdbc:mysql://localhost:3306?useSSL=false",
"testdb.testTable3", // 同样是写出到新的表
connProperties
)
// ---------------------------
spark.stop()
}
}
方式3
// 1. 创建临时表
CREATE TEMPORARY VIEW jdbcTable
USING org.apache.spark.sql.jdbc
OPTIONS (
url "jdbc:mysql://localhost:3306?useSSL=false",
dbtable "testdb.testTable",
user 'root',
password 'root',
driver 'com.mysql.jdbc.Driver'
);
// 2. 操作
spark-sql> select * from jdbcTable;
// 3. 插入新数据(这里是插入新数据到表中,而上面两个例子是写入到一个新的表中)
spark-sql> insert into jdbcTable values(4,'baozi4',40);
网友评论