一、使用IDEA sparksql读取jdbc数据源
首先看一下mysql中的数据:
mysql> use test;
mysql> create table emp(empno int, ename varchar(100),job varchar(100),mgr int, hiredate varchar(100), sal double, comm double, deptno int);
mysql> load data infile '/usr/local/mysql/data/emp.txt' replace into table emp fields terminated by'\t';
mysql> create table dept(deptno int, dname varchar(100),loc varchar(100));
mysql> load data infile '/usr/local/mysql/data/dept.txt' replace into table dept fields terminated by'\t';
mysql> show tables;
+----------------+
| Tables_in_test |
+----------------+
| dept |
| emp |
| user |
+----------------+
3 rows in set (0.00 sec)
mysql> select * from emp;
+-------+--------+-----------+------+------------+-------+------+--------+
| empno | ename | job | mgr | hiredate | sal | comm | deptno |
+-------+--------+-----------+------+------------+-------+------+--------+
| 7369 | SMITH | CLERK | 7902 | 1980-12-17 | 800 | 0 | 20 |
| 7499 | ALLEN | SALESMAN | 7698 | 1981-2-20 | 1600 | 300 | 30 |
| 7521 | WARD | SALESMAN | 7698 | 1981-2-22 | 1250 | 500 | 30 |
| 7566 | JONES | MANAGER | 7839 | 1981-4-2 | 2975 | 0 | 20 |
| 7654 | MARTIN | SALESMAN | 7698 | 1981-9-28 | 1250 | 1400 | 30 |
| 7698 | BLAKE | MANAGER | 7839 | 1981-5-1 | 2850 | 0 | 30 |
| 7782 | CLARK | MANAGER | 7839 | 1981-6-9 | 2450 | 0 | 10 |
| 7788 | SCOTT | ANALYST | 7566 | 1987-4-19 | 3000 | 0 | 20 |
| 7839 | KING | PRESIDENT | 0 | 1981-11-17 | 5000 | 0 | 10 |
| 7844 | TURNER | SALESMAN | 7698 | 1981-9-8 | 1500 | 0 | 30 |
| 7876 | ADAMS | CLERK | 7788 | 1987-5-23 | 1100 | 0 | 20 |
| 7900 | JAMES | CLERK | 7698 | 1981-12-3 | 950 | 0 | 30 |
| 7902 | FORD | ANALYST | 7566 | 1981-12-3 | 3000 | 0 | 20 |
| 7934 | MILLER | CLERK | 7782 | 1982-1-23 | 1300 | 0 | 10 |
| 8888 | HIVE | PROGRAM | 7839 | 1988-1-23 | 10300 | 0 | NULL |
+-------+--------+-----------+------+------------+-------+------+--------+
15 rows in set (0.00 sec)
IDEA代码如下:
import org.apache.spark.sql.SparkSession
object ExtDSApp {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("ExtDSApp")
.master("local[2]")
.getOrCreate()
val emp = spark.read.format("jdbc").options(Map("url"->"jdbc:mysql://hadoop000:3306/test?user=root&password=123456","dbtable"->"emp","driver"->"com.mysql.jdbc.Driver")).load().show()
spark.stop()
}
}
运行报错:
Exception in thread "main" java.lang.ClassNotFoundException: com.mysql.jdbc.Driver
找不到jdbc驱动,所以需要在加入驱动jar包
也可以在pom文件中直接添加依赖
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>
加入jar包之后再运行,报错:
Exception in thread "main" java.sql.SQLException: null, message from server: "Host '192.168.137.1' is not allowed to connect to this MySQL server"
没有权限访问数据库,需要开放数据库的访问权限
mysql> grant all privileges on test.* to root@'192.168.137.251' identified by '123456';
Query OK, 0 rows affected (0.00 sec)
mysql> flush privileges;
Query OK, 0 rows affected (0.00 sec)
再次运行,读取成功
+-----+------+---------+----+----------+-------+------+------+
|empno| ename| job| mgr| hiredate| sal| comm|deptno|
+-----+------+---------+----+----------+-------+------+------+
| 7369| SMITH| CLERK|7902|1980-12-17| 800.0| 0.0| 20|
| 7499| ALLEN| SALESMAN|7698| 1981-2-20| 1600.0| 300.0| 30|
| 7521| WARD| SALESMAN|7698| 1981-2-22| 1250.0| 500.0| 30|
| 7566| JONES| MANAGER|7839| 1981-4-2| 2975.0| 0.0| 20|
| 7654|MARTIN| SALESMAN|7698| 1981-9-28| 1250.0|1400.0| 30|
| 7698| BLAKE| MANAGER|7839| 1981-5-1| 2850.0| 0.0| 30|
| 7782| CLARK| MANAGER|7839| 1981-6-9| 2450.0| 0.0| 10|
| 7788| SCOTT| ANALYST|7566| 1987-4-19| 3000.0| 0.0| 20|
| 7839| KING|PRESIDENT| 0|1981-11-17| 5000.0| 0.0| 10|
| 7844|TURNER| SALESMAN|7698| 1981-9-8| 1500.0| 0.0| 30|
| 7876| ADAMS| CLERK|7788| 1987-5-23| 1100.0| 0.0| 20|
| 7900| JAMES| CLERK|7698| 1981-12-3| 950.0| 0.0| 30|
| 7902| FORD| ANALYST|7566| 1981-12-3| 3000.0| 0.0| 20|
| 7934|MILLER| CLERK|7782| 1982-1-23| 1300.0| 0.0| 10|
| 8888| HIVE| PROGRAM|7839| 1988-1-23|10300.0| 0.0| null|
+-----+------+---------+----+----------+-------+------+------+
二、使用spark-shell测试
[hadoop@hadoop000 bin]$ ./spark-shell --master local[2] --jars ~/software/mysql-connector-java-5.1.27.jar
scala> val empDF = spark.read.format("jdbc").options(Map("url"->"jdbc:mysql://hadoop000:3306/test?user=root&password=123456","dbtable"->"emp","driver"->"com.mysql.jdbc.Driver")).load()
empDF: org.apache.spark.sql.DataFrame = [empno: int, ename: string ... 6 more fields]
scala> empDF.show
+-----+------+---------+----+----------+-------+------+------+
|empno| ename| job| mgr| hiredate| sal| comm|deptno|
+-----+------+---------+----+----------+-------+------+------+
| 7369| SMITH| CLERK|7902|1980-12-17| 800.0| 0.0| 20|
| 7499| ALLEN| SALESMAN|7698| 1981-2-20| 1600.0| 300.0| 30|
| 7521| WARD| SALESMAN|7698| 1981-2-22| 1250.0| 500.0| 30|
| 7566| JONES| MANAGER|7839| 1981-4-2| 2975.0| 0.0| 20|
| 7654|MARTIN| SALESMAN|7698| 1981-9-28| 1250.0|1400.0| 30|
| 7698| BLAKE| MANAGER|7839| 1981-5-1| 2850.0| 0.0| 30|
| 7782| CLARK| MANAGER|7839| 1981-6-9| 2450.0| 0.0| 10|
| 7788| SCOTT| ANALYST|7566| 1987-4-19| 3000.0| 0.0| 20|
| 7839| KING|PRESIDENT| 0|1981-11-17| 5000.0| 0.0| 10|
| 7844|TURNER| SALESMAN|7698| 1981-9-8| 1500.0| 0.0| 30|
| 7876| ADAMS| CLERK|7788| 1987-5-23| 1100.0| 0.0| 20|
| 7900| JAMES| CLERK|7698| 1981-12-3| 950.0| 0.0| 30|
| 7902| FORD| ANALYST|7566| 1981-12-3| 3000.0| 0.0| 20|
| 7934|MILLER| CLERK|7782| 1982-1-23| 1300.0| 0.0| 10|
| 8888| HIVE| PROGRAM|7839| 1988-1-23|10300.0| 0.0| null|
+-----+------+---------+----+----------+-------+------+------+
三、SparkSQL/Hive中数据与JDBC中数据做聚合
SparkSQL中数据如下:
scala> spark.sql("show tables").show
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
| default| dept| false|
| default| emp| false|
+--------+---------+-----------+
scala> spark.sql("select * from dept").show
+------+----------+--------+
|deptno| dname| loc|
+------+----------+--------+
| 10|ACCOUNTING|NEW YORK|
| 20| RESEARCH| DALLAS|
| 30| SALES| CHICAGO|
| 40|OPREATIONS| BOSTON|
+------+----------+--------+
下面和mysql中的emp表做join(上文中已经把mysql中的emp表转成empDF)
scala> val deptDF = spark.table("dept")
deptDF: org.apache.spark.sql.DataFrame = [deptno: int, dname: string ... 1 more field]
scala> deptDF.show
+------+----------+--------+
|deptno| dname| loc|
+------+----------+--------+
| 10|ACCOUNTING|NEW YORK|
| 20| RESEARCH| DALLAS|
| 30| SALES| CHICAGO|
| 40|OPREATIONS| BOSTON|
+------+----------+--------+
scala> empDF.join(deptDF, empDF.col("deptno")===deptDF.col("deptno")).show
+-----+------+---------+----+----------+------+------+------+------+----------+--------+
|empno| ename| job| mgr| hiredate| sal| comm|deptno|deptno| dname| loc|
+-----+------+---------+----+----------+------+------+------+------+----------+--------+
| 7369| SMITH| CLERK|7902|1980-12-17| 800.0| 0.0| 20| 20| RESEARCH| DALLAS|
| 7499| ALLEN| SALESMAN|7698| 1981-2-20|1600.0| 300.0| 30| 30| SALES| CHICAGO|
| 7521| WARD| SALESMAN|7698| 1981-2-22|1250.0| 500.0| 30| 30| SALES| CHICAGO|
| 7566| JONES| MANAGER|7839| 1981-4-2|2975.0| 0.0| 20| 20| RESEARCH| DALLAS|
| 7654|MARTIN| SALESMAN|7698| 1981-9-28|1250.0|1400.0| 30| 30| SALES| CHICAGO|
| 7698| BLAKE| MANAGER|7839| 1981-5-1|2850.0| 0.0| 30| 30| SALES| CHICAGO|
| 7782| CLARK| MANAGER|7839| 1981-6-9|2450.0| 0.0| 10| 10|ACCOUNTING|NEW YORK|
| 7788| SCOTT| ANALYST|7566| 1987-4-19|3000.0| 0.0| 20| 20| RESEARCH| DALLAS|
| 7839| KING|PRESIDENT| 0|1981-11-17|5000.0| 0.0| 10| 10|ACCOUNTING|NEW YORK|
| 7844|TURNER| SALESMAN|7698| 1981-9-8|1500.0| 0.0| 30| 30| SALES| CHICAGO|
| 7876| ADAMS| CLERK|7788| 1987-5-23|1100.0| 0.0| 20| 20| RESEARCH| DALLAS|
| 7900| JAMES| CLERK|7698| 1981-12-3| 950.0| 0.0| 30| 30| SALES| CHICAGO|
| 7902| FORD| ANALYST|7566| 1981-12-3|3000.0| 0.0| 20| 20| RESEARCH| DALLAS|
| 7934|MILLER| CLERK|7782| 1982-1-23|1300.0| 0.0| 10| 10|ACCOUNTING|NEW YORK|
+-----+------+---------+----+----------+------+------+------+------+----------+--------+
四、从sparksql写到mysql中
scala> val empDF = spark.table("emp");
empDF: org.apache.spark.sql.DataFrame = [empno: int, ename: string ... 6 more fields]
scala> empDF.write.format("jdbc").option("url", "jdbc:mysql://hadoop000:3306").option("dbtable", "test.emp_sparksql").option("user", "root").option("password", "123456").option("driver", "com.mysql.jdbc.Driver").save()
查看mysql
mysql> show tables;
+----------------+
| Tables_in_test |
+----------------+
| dept |
| emp |
| emp_sparksql |
| user |
+----------------+
4 rows in set (0.00 sec)
再写一次会报错:表已经存在
scala> empDF.write.format("jdbc").option("url", "jdbc:mysql://hadoop000:3306").option("dbtable", "test.emp_sparksql").option("user", "root").option("password", "123456").option("driver", "com.mysql.jdbc.Driver").save()
org.apache.spark.sql.AnalysisException: Table or view 'test.emp_sparksql' already exists. SaveMode: ErrorIfExists.;
需要加入mode参数
scala> empDF.write.mode("overwrite").format("jdbc").option("url", "jdbc:mysql://hadoop000:3306").option("dbtable", "test.emp_sparksql").option("user", "root").option("password", "123456").option("driver", "com.mysql.jdbc.Driver").save()
还有mode("append")可以在有需要的时候使用
这样写入之后有个区别:
mysql> desc emp_sparksql;
+----------+---------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+----------+---------+------+-----+---------+-------+
| empno | int(11) | YES | | NULL | |
| ename | text | YES | | NULL | |
| job | text | YES | | NULL | |
| mgr | int(11) | YES | | NULL | |
| hiredate | text | YES | | NULL | |
| salary | double | YES | | NULL | |
| comm | double | YES | | NULL | |
| deptno | int(11) | YES | | NULL | |
+----------+---------+------+-----+---------+-------+
8 rows in set (0.00 sec)
mysql> desc emp;
+----------+--------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+----------+--------------+------+-----+---------+-------+
| empno | int(11) | YES | | NULL | |
| ename | varchar(100) | YES | | NULL | |
| job | varchar(100) | YES | | NULL | |
| mgr | int(11) | YES | | NULL | |
| hiredate | varchar(100) | YES | | NULL | |
| sal | double | YES | | NULL | |
| comm | double | YES | | NULL | |
| deptno | int(11) | YES | | NULL | |
+----------+--------------+------+-----+---------+-------+
8 rows in set (0.00 sec)
数据类型出现了变化,可以加入一个option指定每列的数据类型
scala> empDF.write.mode("overwrite").format("jdbc").option("url", "jdbc:mysql://hadoop000:3306").option("dbtable", "test.emp1_sparksql").option("user", "root").option("password", "123456").option("driver", "com.mysql.jdbc.Driver").option("createTableColumnTypes", "ename varchar(100),job varchar(100), hiredate varchar(100)").save()
mysql> desc emp1_sparksql;
+----------+--------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+----------+--------------+------+-----+---------+-------+
| empno | int(11) | YES | | NULL | |
| ename | varchar(100) | YES | | NULL | |
| job | varchar(100) | YES | | NULL | |
| mgr | int(11) | YES | | NULL | |
| hiredate | varchar(100) | YES | | NULL | |
| salary | double | YES | | NULL | |
| comm | double | YES | | NULL | |
| deptno | int(11) | YES | | NULL | |
+----------+--------------+------+-----+---------+-------+
8 rows in set (0.00 sec)
附:table函数的源码:
/**
* Returns the specified table/view as a `DataFrame`.
*
* @param tableName is either a qualified or unqualified name that designates a table or view.
* If a database is specified, it identifies the table/view from the database.
* Otherwise, it first attempts to find a temporary view with the given name
* and then match the table/view from the current database.
* Note that, the global temporary view database is also valid here.
* @since 2.0.0
*/
def table(tableName: String): DataFrame = {
table(sessionState.sqlParser.parseTableIdentifier(tableName))
}
五、sql的方式读取JDBC数据
[hadoop@hadoop000 bin]$ ./spark-sql --master local[2] --driver-class-path ~/software/mysql-connector-java-5.1.27.jar
spark-sql>CREATE TEMPORARY VIEW emp_mysql USING org.apache.spark.sql.jdbc OPTIONS (url "jdbc:mysql://hadoop000:3306/",dbtable "test.emp",user 'root', password '123456');
Time taken: 0.517 seconds
18/09/26 02:53:17 INFO thriftserver.SparkSQLCLIDriver: Time taken: 0.517 seconds
spark-sql> show tables;
default dept false
default emp false
emp_mysql true
Time taken: 0.142 seconds, Fetched 3 row(s)
spark-sql> select* from emp_mysql;
7369 SMITH CLERK 7902 1980-12-17 800.0 0.0 20
7499 ALLEN SALESMAN 7698 1981-2-20 1600.0 300.0 30
7521 WARD SALESMAN 7698 1981-2-22 1250.0 500.0 30
7566 JONES MANAGER 7839 1981-4-2 2975.0 0.0 20
7654 MARTIN SALESMAN 7698 1981-9-28 1250.0 1400.0 30
7698 BLAKE MANAGER 7839 1981-5-1 2850.0 0.0 30
7782 CLARK MANAGER 7839 1981-6-9 2450.0 0.0 10
7788 SCOTT ANALYST 7566 1987-4-19 3000.0 0.0 20
7839 KING PRESIDENT 0 1981-11-17 5000.0 0.0 10
7844 TURNER SALESMAN 7698 1981-9-8 1500.0 0.0 30
7876 ADAMS CLERK 7788 1987-5-23 1100.0 0.0 20
7900 JAMES CLERK 7698 1981-12-3 950.0 0.0 30
7902 FORD ANALYST 7566 1981-12-3 3000.0 0.0 20
7934 MILLER CLERK 7782 1982-1-23 1300.0 0.0 10
8888 HIVE PROGRAM 7839 1988-1-23 10300.0 0.0 NULL
Time taken: 2.844 seconds, Fetched 15 row(s)
网友评论