Spark:读取mysql数据作为DataFrame

作者: 利伊奥克儿 | 来源:发表于2018-10-12 15:18 被阅读2次

    读取mysql数据作为DataFrame

    import java.text.SimpleDateFormat
    import java.util.{Calendar, Date}
    
    import com.iptv.domain.DatePattern
    import com.iptv.job.JobBase
    import org.apache.spark.sql.expressions.Window
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.types.DoubleType
    import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
        * 获取配置文件
        *
        * @param proPath
        * @return
        */
      def getProPerties(proPath: String): Properties = {
        val properties: Properties = new Properties()
        properties.load(new FileInputStream(proPath))
        properties
      }
    
      /**
        * 获取 Mysql 表的数据
        *
        * @param sqlContext
        * @param tableName 读取Mysql表的名字
        * @param proPath   配置文件的路径
        * @return 返回 Mysql 表的 DataFrame
        */
      def readMysqlTable(sqlContext: SQLContext, tableName: String, proPath: String) = {
        val properties: Properties = getProPerties(proPath)
          sqlContext
            .read
            .format("jdbc")
            .option("url", properties.getProperty("mysql.url"))
            .option("driver", properties.getProperty("mysql.driver"))
            .option("user", properties.getProperty("mysql.username"))
            .option("password", properties.getProperty("mysql.password"))
            //        .option("dbtable", tableName.toUpperCase)
            .option("dbtable", tableName)
            .load()
    
      }
    
      /**
        * 获取 Mysql 表的数据 添加过滤条件
        *
        * @param sqlContext
        * @param table           读取Mysql表的名字
        * @param filterCondition 过滤条件
        * @param proPath         配置文件的路径
        * @return 返回 Mysql 表的 DataFrame
        */
      def readMysqlTable(sqlContext: SQLContext, table: String, filterCondition: String, proPath: String) = {
        val properties: Properties = getProPerties(proPath)
        var tableName = ""
          tableName = "(select * from " + table + " where " + filterCondition + " ) as t1" //支持将条件套入sql将临时表用作数据源
          sqlContext
            .read
            .format("jdbc")
            .option("url", properties.getProperty("mysql.url"))
            .option("driver", properties.getProperty("mysql.driver"))
            .option("user", properties.getProperty("mysql.username"))
            .option("password", properties.getProperty("mysql.password"))
            //        .option("dbtable", tableName.toUpperCase)
            .option("dbtable", tableName)
            .load()
      }
    

    使用示例

    //不添加过滤条件
    val conf: SparkConf = new SparkConf().setAppName(getClass.getSimpleName)
        val sc: SparkContext = new SparkContext(conf)
        val sqlContext: SQLContext = getSQLContext(sc)
        import sqlContext.implicits._
    val test_table_dataFrame: DataFrame = readMysqlTable(sqlContext, "TEST_TABLE", proPath).persist(PERSIST_LEVEL)
    ----------------------------------------------------------------------------------------------------
    //添加过滤条件
    //获取 task_id = ${task_id} 数据做为DataFrame
    val conf: SparkConf = new SparkConf().setAppName(getClass.getSimpleName)
        val sc: SparkContext = new SparkContext(conf)
        val sqlContext: SQLContext = getSQLContext(sc)
        import sqlContext.implicits._
    val test_table_dataFrame = readMysqlTable(sqlContext, "TEST_TABLE", s"task_id=${task_id}", configPath)
    
    

    配置文件部分内容

    配置文件部分内容
    #mysql数据库配置
    mysql.driver=com.mysql.jdbc.Driver
    mysql.url=jdbc:mysql://0.0.0.0:3306/iptv?useSSL=false&autoReconnect=true&failOverReadOnly=false&rewriteBatchedStatements=true
    mysql.username=lillclol
    mysql.password=123456
    
    #hive
    hive.root_path=hdfs://ns1/user/hive/warehouse/
    

    此为本人日常工作中的原创总结,转载请注明出处!!!!!

    相关文章

      网友评论

        本文标题:Spark:读取mysql数据作为DataFrame

        本文链接:https://www.haomeiwen.com/subject/yflgaftx.html