美文网首页spark
SparkSQL读写JDBC数据

SparkSQL读写JDBC数据

作者: 白面葫芦娃92 | 来源:发表于2018-09-26 11:56 被阅读0次

    一、使用IDEA sparksql读取jdbc数据源
    首先看一下mysql中的数据:

    mysql> use test;
    
    mysql> create table emp(empno int, ename varchar(100),job varchar(100),mgr int, hiredate varchar(100), sal double, comm double, deptno int);
    
    mysql> load data infile '/usr/local/mysql/data/emp.txt' replace into table emp fields terminated by'\t';
    
    mysql> create table dept(deptno int, dname varchar(100),loc varchar(100));   
    
    mysql> load data infile '/usr/local/mysql/data/dept.txt' replace into table dept fields terminated by'\t';      
    
    mysql> show tables;
    +----------------+
    | Tables_in_test |
    +----------------+
    | dept           |
    | emp            |
    | user           |
    +----------------+
    3 rows in set (0.00 sec)
    
    mysql> select * from emp;
    +-------+--------+-----------+------+------------+-------+------+--------+
    | empno | ename  | job       | mgr  | hiredate   | sal   | comm | deptno |
    +-------+--------+-----------+------+------------+-------+------+--------+
    |  7369 | SMITH  | CLERK     | 7902 | 1980-12-17 |   800 |    0 |     20 |
    |  7499 | ALLEN  | SALESMAN  | 7698 | 1981-2-20  |  1600 |  300 |     30 |
    |  7521 | WARD   | SALESMAN  | 7698 | 1981-2-22  |  1250 |  500 |     30 |
    |  7566 | JONES  | MANAGER   | 7839 | 1981-4-2   |  2975 |    0 |     20 |
    |  7654 | MARTIN | SALESMAN  | 7698 | 1981-9-28  |  1250 | 1400 |     30 |
    |  7698 | BLAKE  | MANAGER   | 7839 | 1981-5-1   |  2850 |    0 |     30 |
    |  7782 | CLARK  | MANAGER   | 7839 | 1981-6-9   |  2450 |    0 |     10 |
    |  7788 | SCOTT  | ANALYST   | 7566 | 1987-4-19  |  3000 |    0 |     20 |
    |  7839 | KING   | PRESIDENT |    0 | 1981-11-17 |  5000 |    0 |     10 |
    |  7844 | TURNER | SALESMAN  | 7698 | 1981-9-8   |  1500 |    0 |     30 |
    |  7876 | ADAMS  | CLERK     | 7788 | 1987-5-23  |  1100 |    0 |     20 |
    |  7900 | JAMES  | CLERK     | 7698 | 1981-12-3  |   950 |    0 |     30 |
    |  7902 | FORD   | ANALYST   | 7566 | 1981-12-3  |  3000 |    0 |     20 |
    |  7934 | MILLER | CLERK     | 7782 | 1982-1-23  |  1300 |    0 |     10 |
    |  8888 | HIVE   | PROGRAM   | 7839 | 1988-1-23  | 10300 |    0 |   NULL |
    +-------+--------+-----------+------+------------+-------+------+--------+
    15 rows in set (0.00 sec)
    

    IDEA代码如下:

    import org.apache.spark.sql.SparkSession
    
    object ExtDSApp {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession
          .builder()
          .appName("ExtDSApp")
          .master("local[2]")
          .getOrCreate()
    
    val emp = spark.read.format("jdbc").options(Map("url"->"jdbc:mysql://hadoop000:3306/test?user=root&password=123456","dbtable"->"emp","driver"->"com.mysql.jdbc.Driver")).load().show()
    
        spark.stop()
      }
    }
    

    运行报错:

    Exception in thread "main" java.lang.ClassNotFoundException: com.mysql.jdbc.Driver
    
    找不到jdbc驱动,所以需要在加入驱动jar包

    也可以在pom文件中直接添加依赖

    <dependency>
          <groupId>mysql</groupId>
          <artifactId>mysql-connector-java</artifactId>
          <version>5.1.27</version>
        </dependency>
    

    加入jar包之后再运行,报错:

    Exception in thread "main" java.sql.SQLException: null,  message from server: "Host '192.168.137.1' is not allowed to connect to this MySQL server"
    

    没有权限访问数据库,需要开放数据库的访问权限

    mysql> grant all privileges on test.* to root@'192.168.137.251' identified by '123456';
    Query OK, 0 rows affected (0.00 sec)
    mysql> flush privileges;
    Query OK, 0 rows affected (0.00 sec)
    

    再次运行,读取成功

    +-----+------+---------+----+----------+-------+------+------+
    |empno| ename|      job| mgr|  hiredate|    sal|  comm|deptno|
    +-----+------+---------+----+----------+-------+------+------+
    | 7369| SMITH|    CLERK|7902|1980-12-17|  800.0|   0.0|    20|
    | 7499| ALLEN| SALESMAN|7698| 1981-2-20| 1600.0| 300.0|    30|
    | 7521|  WARD| SALESMAN|7698| 1981-2-22| 1250.0| 500.0|    30|
    | 7566| JONES|  MANAGER|7839|  1981-4-2| 2975.0|   0.0|    20|
    | 7654|MARTIN| SALESMAN|7698| 1981-9-28| 1250.0|1400.0|    30|
    | 7698| BLAKE|  MANAGER|7839|  1981-5-1| 2850.0|   0.0|    30|
    | 7782| CLARK|  MANAGER|7839|  1981-6-9| 2450.0|   0.0|    10|
    | 7788| SCOTT|  ANALYST|7566| 1987-4-19| 3000.0|   0.0|    20|
    | 7839|  KING|PRESIDENT|   0|1981-11-17| 5000.0|   0.0|    10|
    | 7844|TURNER| SALESMAN|7698|  1981-9-8| 1500.0|   0.0|    30|
    | 7876| ADAMS|    CLERK|7788| 1987-5-23| 1100.0|   0.0|    20|
    | 7900| JAMES|    CLERK|7698| 1981-12-3|  950.0|   0.0|    30|
    | 7902|  FORD|  ANALYST|7566| 1981-12-3| 3000.0|   0.0|    20|
    | 7934|MILLER|    CLERK|7782| 1982-1-23| 1300.0|   0.0|    10|
    | 8888|  HIVE|  PROGRAM|7839| 1988-1-23|10300.0|   0.0|  null|
    +-----+------+---------+----+----------+-------+------+------+
    

    二、使用spark-shell测试

    [hadoop@hadoop000 bin]$ ./spark-shell --master local[2] --jars ~/software/mysql-connector-java-5.1.27.jar 
    scala> val empDF = spark.read.format("jdbc").options(Map("url"->"jdbc:mysql://hadoop000:3306/test?user=root&password=123456","dbtable"->"emp","driver"->"com.mysql.jdbc.Driver")).load()
    empDF: org.apache.spark.sql.DataFrame = [empno: int, ename: string ... 6 more fields]
    
    scala> empDF.show
    +-----+------+---------+----+----------+-------+------+------+
    |empno| ename|      job| mgr|  hiredate|    sal|  comm|deptno|
    +-----+------+---------+----+----------+-------+------+------+
    | 7369| SMITH|    CLERK|7902|1980-12-17|  800.0|   0.0|    20|
    | 7499| ALLEN| SALESMAN|7698| 1981-2-20| 1600.0| 300.0|    30|
    | 7521|  WARD| SALESMAN|7698| 1981-2-22| 1250.0| 500.0|    30|
    | 7566| JONES|  MANAGER|7839|  1981-4-2| 2975.0|   0.0|    20|
    | 7654|MARTIN| SALESMAN|7698| 1981-9-28| 1250.0|1400.0|    30|
    | 7698| BLAKE|  MANAGER|7839|  1981-5-1| 2850.0|   0.0|    30|
    | 7782| CLARK|  MANAGER|7839|  1981-6-9| 2450.0|   0.0|    10|
    | 7788| SCOTT|  ANALYST|7566| 1987-4-19| 3000.0|   0.0|    20|
    | 7839|  KING|PRESIDENT|   0|1981-11-17| 5000.0|   0.0|    10|
    | 7844|TURNER| SALESMAN|7698|  1981-9-8| 1500.0|   0.0|    30|
    | 7876| ADAMS|    CLERK|7788| 1987-5-23| 1100.0|   0.0|    20|
    | 7900| JAMES|    CLERK|7698| 1981-12-3|  950.0|   0.0|    30|
    | 7902|  FORD|  ANALYST|7566| 1981-12-3| 3000.0|   0.0|    20|
    | 7934|MILLER|    CLERK|7782| 1982-1-23| 1300.0|   0.0|    10|
    | 8888|  HIVE|  PROGRAM|7839| 1988-1-23|10300.0|   0.0|  null|
    +-----+------+---------+----+----------+-------+------+------+
    

    三、SparkSQL/Hive中数据与JDBC中数据做聚合
    SparkSQL中数据如下:

    scala> spark.sql("show tables").show
    +--------+---------+-----------+
    |database|tableName|isTemporary|
    +--------+---------+-----------+
    | default|     dept|      false|
    | default|      emp|      false|
    +--------+---------+-----------+
    
    scala> spark.sql("select * from dept").show
    +------+----------+--------+
    |deptno|     dname|     loc|
    +------+----------+--------+
    |    10|ACCOUNTING|NEW YORK|
    |    20|  RESEARCH|  DALLAS|
    |    30|     SALES| CHICAGO|
    |    40|OPREATIONS|  BOSTON|
    +------+----------+--------+
    

    下面和mysql中的emp表做join(上文中已经把mysql中的emp表转成empDF)

    scala> val deptDF = spark.table("dept")
    deptDF: org.apache.spark.sql.DataFrame = [deptno: int, dname: string ... 1 more field]
    
    scala> deptDF.show
    +------+----------+--------+
    |deptno|     dname|     loc|
    +------+----------+--------+
    |    10|ACCOUNTING|NEW YORK|
    |    20|  RESEARCH|  DALLAS|
    |    30|     SALES| CHICAGO|
    |    40|OPREATIONS|  BOSTON|
    +------+----------+--------+
    
    scala> empDF.join(deptDF, empDF.col("deptno")===deptDF.col("deptno")).show
    +-----+------+---------+----+----------+------+------+------+------+----------+--------+
    |empno| ename|      job| mgr|  hiredate|   sal|  comm|deptno|deptno|     dname|     loc|
    +-----+------+---------+----+----------+------+------+------+------+----------+--------+
    | 7369| SMITH|    CLERK|7902|1980-12-17| 800.0|   0.0|    20|    20|  RESEARCH|  DALLAS|
    | 7499| ALLEN| SALESMAN|7698| 1981-2-20|1600.0| 300.0|    30|    30|     SALES| CHICAGO|
    | 7521|  WARD| SALESMAN|7698| 1981-2-22|1250.0| 500.0|    30|    30|     SALES| CHICAGO|
    | 7566| JONES|  MANAGER|7839|  1981-4-2|2975.0|   0.0|    20|    20|  RESEARCH|  DALLAS|
    | 7654|MARTIN| SALESMAN|7698| 1981-9-28|1250.0|1400.0|    30|    30|     SALES| CHICAGO|
    | 7698| BLAKE|  MANAGER|7839|  1981-5-1|2850.0|   0.0|    30|    30|     SALES| CHICAGO|
    | 7782| CLARK|  MANAGER|7839|  1981-6-9|2450.0|   0.0|    10|    10|ACCOUNTING|NEW YORK|
    | 7788| SCOTT|  ANALYST|7566| 1987-4-19|3000.0|   0.0|    20|    20|  RESEARCH|  DALLAS|
    | 7839|  KING|PRESIDENT|   0|1981-11-17|5000.0|   0.0|    10|    10|ACCOUNTING|NEW YORK|
    | 7844|TURNER| SALESMAN|7698|  1981-9-8|1500.0|   0.0|    30|    30|     SALES| CHICAGO|
    | 7876| ADAMS|    CLERK|7788| 1987-5-23|1100.0|   0.0|    20|    20|  RESEARCH|  DALLAS|
    | 7900| JAMES|    CLERK|7698| 1981-12-3| 950.0|   0.0|    30|    30|     SALES| CHICAGO|
    | 7902|  FORD|  ANALYST|7566| 1981-12-3|3000.0|   0.0|    20|    20|  RESEARCH|  DALLAS|
    | 7934|MILLER|    CLERK|7782| 1982-1-23|1300.0|   0.0|    10|    10|ACCOUNTING|NEW YORK|
    +-----+------+---------+----+----------+------+------+------+------+----------+--------+
    

    四、从sparksql写到mysql中

    scala> val empDF = spark.table("emp");
    empDF: org.apache.spark.sql.DataFrame = [empno: int, ename: string ... 6 more fields]
    
    scala> empDF.write.format("jdbc").option("url", "jdbc:mysql://hadoop000:3306").option("dbtable", "test.emp_sparksql").option("user", "root").option("password", "123456").option("driver", "com.mysql.jdbc.Driver").save()
    

    查看mysql

    mysql> show tables;
    +----------------+
    | Tables_in_test |
    +----------------+
    | dept           |
    | emp            |
    | emp_sparksql   |
    | user           |
    +----------------+
    4 rows in set (0.00 sec)
    

    再写一次会报错:表已经存在

    scala> empDF.write.format("jdbc").option("url", "jdbc:mysql://hadoop000:3306").option("dbtable", "test.emp_sparksql").option("user", "root").option("password", "123456").option("driver", "com.mysql.jdbc.Driver").save()
    org.apache.spark.sql.AnalysisException: Table or view 'test.emp_sparksql' already exists. SaveMode: ErrorIfExists.;
    

    需要加入mode参数

    scala> empDF.write.mode("overwrite").format("jdbc").option("url", "jdbc:mysql://hadoop000:3306").option("dbtable", "test.emp_sparksql").option("user", "root").option("password", "123456").option("driver", "com.mysql.jdbc.Driver").save()
    

    还有mode("append")可以在有需要的时候使用
    这样写入之后有个区别:

    mysql> desc emp_sparksql;
    +----------+---------+------+-----+---------+-------+
    | Field    | Type    | Null | Key | Default | Extra |
    +----------+---------+------+-----+---------+-------+
    | empno    | int(11) | YES  |     | NULL    |       |
    | ename    | text    | YES  |     | NULL    |       |
    | job      | text    | YES  |     | NULL    |       |
    | mgr      | int(11) | YES  |     | NULL    |       |
    | hiredate | text    | YES  |     | NULL    |       |
    | salary   | double  | YES  |     | NULL    |       |
    | comm     | double  | YES  |     | NULL    |       |
    | deptno   | int(11) | YES  |     | NULL    |       |
    +----------+---------+------+-----+---------+-------+
    8 rows in set (0.00 sec)
    
    mysql> desc emp;
    +----------+--------------+------+-----+---------+-------+
    | Field    | Type         | Null | Key | Default | Extra |
    +----------+--------------+------+-----+---------+-------+
    | empno    | int(11)      | YES  |     | NULL    |       |
    | ename    | varchar(100) | YES  |     | NULL    |       |
    | job      | varchar(100) | YES  |     | NULL    |       |
    | mgr      | int(11)      | YES  |     | NULL    |       |
    | hiredate | varchar(100) | YES  |     | NULL    |       |
    | sal      | double       | YES  |     | NULL    |       |
    | comm     | double       | YES  |     | NULL    |       |
    | deptno   | int(11)      | YES  |     | NULL    |       |
    +----------+--------------+------+-----+---------+-------+
    8 rows in set (0.00 sec)
    

    数据类型出现了变化,可以加入一个option指定每列的数据类型

    scala> empDF.write.mode("overwrite").format("jdbc").option("url", "jdbc:mysql://hadoop000:3306").option("dbtable", "test.emp1_sparksql").option("user", "root").option("password", "123456").option("driver", "com.mysql.jdbc.Driver").option("createTableColumnTypes", "ename varchar(100),job varchar(100), hiredate varchar(100)").save()
    
    mysql> desc emp1_sparksql;
    +----------+--------------+------+-----+---------+-------+
    | Field    | Type         | Null | Key | Default | Extra |
    +----------+--------------+------+-----+---------+-------+
    | empno    | int(11)      | YES  |     | NULL    |       |
    | ename    | varchar(100) | YES  |     | NULL    |       |
    | job      | varchar(100) | YES  |     | NULL    |       |
    | mgr      | int(11)      | YES  |     | NULL    |       |
    | hiredate | varchar(100) | YES  |     | NULL    |       |
    | salary   | double       | YES  |     | NULL    |       |
    | comm     | double       | YES  |     | NULL    |       |
    | deptno   | int(11)      | YES  |     | NULL    |       |
    +----------+--------------+------+-----+---------+-------+
    8 rows in set (0.00 sec)
    

    附:table函数的源码:

    /**
       * Returns the specified table/view as a `DataFrame`.
       *
       * @param tableName is either a qualified or unqualified name that designates a table or view.
       *                  If a database is specified, it identifies the table/view from the database.
       *                  Otherwise, it first attempts to find a temporary view with the given name
       *                  and then match the table/view from the current database.
       *                  Note that, the global temporary view database is also valid here.
       * @since 2.0.0
       */
      def table(tableName: String): DataFrame = {
        table(sessionState.sqlParser.parseTableIdentifier(tableName))
      }
    

    五、sql的方式读取JDBC数据

    [hadoop@hadoop000 bin]$ ./spark-sql --master local[2] --driver-class-path ~/software/mysql-connector-java-5.1.27.jar
    spark-sql>CREATE TEMPORARY VIEW emp_mysql USING org.apache.spark.sql.jdbc OPTIONS (url "jdbc:mysql://hadoop000:3306/",dbtable "test.emp",user 'root', password '123456');
    Time taken: 0.517 seconds
    18/09/26 02:53:17 INFO thriftserver.SparkSQLCLIDriver: Time taken: 0.517 seconds
    spark-sql> show tables;
    default dept    false
    default emp     false
            emp_mysql       true
    Time taken: 0.142 seconds, Fetched 3 row(s)
    spark-sql> select* from emp_mysql;
    7369    SMITH   CLERK   7902    1980-12-17      800.0   0.0     20
    7499    ALLEN   SALESMAN        7698    1981-2-20       1600.0  300.0   30
    7521    WARD    SALESMAN        7698    1981-2-22       1250.0  500.0   30
    7566    JONES   MANAGER 7839    1981-4-2        2975.0  0.0     20
    7654    MARTIN  SALESMAN        7698    1981-9-28       1250.0  1400.0  30
    7698    BLAKE   MANAGER 7839    1981-5-1        2850.0  0.0     30
    7782    CLARK   MANAGER 7839    1981-6-9        2450.0  0.0     10
    7788    SCOTT   ANALYST 7566    1987-4-19       3000.0  0.0     20
    7839    KING    PRESIDENT       0       1981-11-17      5000.0  0.0     10
    7844    TURNER  SALESMAN        7698    1981-9-8        1500.0  0.0     30
    7876    ADAMS   CLERK   7788    1987-5-23       1100.0  0.0     20
    7900    JAMES   CLERK   7698    1981-12-3       950.0   0.0     30
    7902    FORD    ANALYST 7566    1981-12-3       3000.0  0.0     20
    7934    MILLER  CLERK   7782    1982-1-23       1300.0  0.0     10
    8888    HIVE    PROGRAM 7839    1988-1-23       10300.0 0.0     NULL
    Time taken: 2.844 seconds, Fetched 15 row(s)
    

    相关文章

      网友评论

        本文标题:SparkSQL读写JDBC数据

        本文链接:https://www.haomeiwen.com/subject/wpjloftx.html