美文网首页
SparkSql操作Hive数据入Mysql中的注意事项和问题

SparkSql操作Hive数据入Mysql中的注意事项和问题

作者: IT_小白 | 来源:发表于2018-11-09 13:23 被阅读0次

    Version

     Spark:1.6.2
     Hive  : 1.1.0
    

    先看下代码:::::::::

        object SparkSql_Hive_Mysql {
            def main(args: Array[String]): Unit = {
                val url = "jdbc:mysql://0.0.0.0:3306/data?characterEncoding=UTF-8&serverTimezone=CST"
                val tableName = "test"
    
                val prop = new Properties()
                      prop.setProperty("user", "username")
                      prop.setProperty("password", "pwd")
                      prop.setProperty("useSSL", "false");
    
                val conf = new SparkConf()
                        .setAppName("test")
                        .setMaster("local[5]")
                val sc = new SparkContext(conf)
                val sqlContext = new HiveContext(sc)
    
                val sql = "SELECT * FROM test"
                val retDF = sqlContext.sql(sql)
    
                /**
                  * 将结果集已追加的方式写入 mysql 数据库
                  * Overwrite 覆盖
                  * Append    追加
                  * Ignore    忽略
                  */
                retDF.write.mode(SaveMode.Overwrite) jdbc(url, tableName, prop)
                sc.stop()
          }
        }
    

    使用封装好的写入方式:::::

                retDF.write.mode(SaveMode.Overwrite) jdbc(url, tableName, prop)
    

    现在说下我遇到的问题:::::
    在任务执行过程中出现了超过Mysql 数据库链接的最大数

    下面看下这个方法的源码:::::

           def saveTable(
                 df: DataFrame,
                 url: String,
                 table: String,
                 properties: Properties) {
               val dialect = JdbcDialects.get(url)
               val nullTypes: Array[Int] = df.schema.fields.map { field =>
                       getJdbcType(field.dataType, dialect).jdbcNullType
                 }
               val rddSchema = df.schema
               val getConnection: () => Connection = createConnectionFactory(url, properties)
               val batchSize = properties.getProperty("batchsize", "1000").toInt
                     df.foreachPartition { iterator =>
               savePartition(getConnection, table, iterator, rddSchema, nullTypes, batchSize, dialect)
           }
         }
    #并且链接也都在使用后进行了关闭回收::::
           } finally {
                 if (!committed) {
                   // The stage must fail.  We got here through an exception path, so
                   // let the exception through unless rollback() or close() want to
                   // tell the user about another problem.
                   if (supportsTransactions) {
                         conn.rollback()
                       }
                   conn.close()
               } else {
                 // The stage must succeed.  We cannot propagate any exception close() might throw.
                 try {
                     conn.close()
               } catch {
                   case e: Exception => logWarning("Transaction succeeded, but closing failed", e)
             }
           }
         }
    

    也就数说再不能增加Mysql最大链接数配置的情况下,使用

    Spark 中自带的 coalesce(“numPartition”)方法尽量减少分区数

    以此来减少创建的Mysql 连接数,来减少因为超过最大连接数而导致的问题。

    至于为什么用coalesce方法而不用repartition方法我在另一篇文章中有讲到。
    感兴趣可以点击查看!传送门在此

    相关文章

      网友评论

          本文标题:SparkSql操作Hive数据入Mysql中的注意事项和问题

          本文链接:https://www.haomeiwen.com/subject/byznxqtx.html