美文网首页大数据程序员
SparkSQL DataFrame与MySQL增删改查那些事儿

SparkSQL DataFrame与MySQL增删改查那些事儿

作者: 腾飞的大象 | 来源:发表于2018-11-28 09:55 被阅读117次

    在使用Spark中通过各种算子计算完后各种指标后,一般都需要将计算好的结果数据存放到关系型数据库,比如MySQL和PostgreSQL等,随后配置到展示平台进行展现,花花绿绿的图表就生成了。下面我讲解一下,在Spark中如何通过c3p0连接池的方式对MySQL进行增加改查(CRUD),增加(Create),读取查询(Retrieve),更新(Update)和删除(Delete)。

    项目github地址:spark-mysql

    1.Create(增加)
      case class CardMember(m_id:String,card_type:String,expire:Timestamp,duration:Int,is_sale:Boolean,date:Date,user:long,salary:Float)
    
       val memberSeq = Seq(
          CardMember(“member_2”,“月卡”,新时间戳(System.currentTimeMillis()),31,false,新日期(System.currentTimeMillis()),123223,0.32f),
          CardMember(“member_1” “,”季卡“,新的时间戳(System.currentTimeMillis()),93,false,new Date(System.currentTimeMillis()),124224,0.362f)
        )
        val memberDF = memberSeq.toDF()
        //把DataFrame存入到MySQL的中,如果数据库中不存在此表的话就会自动创建
        MySQLUtils.saveDFtoDBCreateTableIfNotExist( “member_test”,memberDF)
    
    
    2.Retrieve(读取查询)
         //根据表名把MySQL中的数据表直接映射成DataFrame 
        MySQLUtils.getDFFromMysql(hiveContext,“member_test”,null)
    
    3.Update(更新)
        //根据主键更新指定字段,如果没有此主键数据则直接插入
        MySQLUtils.insertOrUpdateDFtoDBUsePool(“member_test”,memberDF,Array(“user”,“salary”))
    
    4.Delete(删除)
     //删除指定条件的数据
     MySQLUtils.deleteMysqlTable(hiveContext,“member_test”,“m_id ='member_1'”); 
     //删除指定数据表
     MySQLUtils.dropMysqlTable(hiveContext, “member_test”); 
    

    具体操作步骤如下:
    在pom.xml中导入MySQL连接器jar包和c3p0的依赖包,并导入更改

            <dependency> 
                <groupId> mysql </ groupId> 
                <artifactId> mysql-connector-java </ artifactId> 
                <version> 5.1.38 </ version> 
            </ dependency> 
            <dependency> 
                <groupId> com.mchange </ groupId> 
                <artifactId> c3p0 </ artifactId> 
                <version> 0.9.5 </ version> 
            </ dependency> 
    

    把数据库连接池的获取,DDL和DML操作方法都封装在了下面3个工具类中

    PropertyUtils获取conf / mysql-user.properties文件的配置信息

    package utils
    
    import java.util.Properties 
    / ** 
      *使用IntelliJ IDEA创建。
      *作者:fly_elephant@163.com 
      *描述:PropertyUtils工具类
      *日期:创建于2018-11-17 11:43 
      * / 
    object PropertyUtils { 
      def getFileProperties(fileName:String,propertyKey:String):String = { 
        val result = this.getClass.getClassLoader.getResourceAsStream(fileName)
        val prop = new Properties 
        prop.load(result)
        prop.getProperty(propertyKey)
      } 
    } 
    

    MySQLPoolManager此类封装了数据库连接池的获取

    package utils
    
    import java.sql.Connection
    
    import com.mchange.v2.c3p0.ComboPooledDataSource
    
    / ** 
      *使用IntelliJ IDEA创建。
      *作者:fly_elephant@163.com 
      *描述:MySQL连接池管理类
      *日期:创建于2018-11-17 12:43 
      * / 
    object MySQLPoolManager { 
      var mysqlManager:MysqlPool = _
    
      def getMysqlManager:MysqlPool = { 
        synchronized { 
          if(mysqlManager == null){ 
            mysqlManager = new MysqlPool 
          } 
        } 
        mysqlManager 
      }
    
      class MysqlPool extends Serializable { 
        private val cpds:ComboPooledDataSource = new ComboPooledDataSource(true)
        try { 
          cpds.setJdbcUrl(PropertyUtils.getFileProperties(“mysql-user.properties”,“mysql.jdbc.url”))
          cpds.setDriverClass(PropertyUtils.getFileProperties) (“mysql-user.properties”,“mysql.pool.jdbc.driverClass”))
          cpds.setUser(PropertyUtils.getFileProperties(“mysql-user.properties”,“mysql.jdbc.username”))
          cpds.setPassword(PropertyUtils .getFileProperties(“mysql-user.properties”,“mysql.jdbc.password”))
          cpds.setMinPoolSize(PropertyUtils.getFileProperties(“mysql-user.properties”,“mysql.pool.jdbc.minPoolSize”)。toInt)
          cpds.setMaxPoolSize(PropertyUtils.getFileProperties(“mysql-user.properties”,“mysql.pool.jdbc.maxPoolSize”)。toInt)
          cpds.setAcquireIncrement(PropertyUtils.getFileProperties(“mysql-user.properties”,“mysql.pool。 jdbc.acquireIncrement“)。toInt)
          cpds.setMaxStatements(PropertyUtils.getFileProperties(”mysql-user.properties“,”mysql.pool.jdbc.maxStatements“)。toInt)
        } catch { 
          case e:Exception => e.printStackTrace( )
        }
    
        def getConnection:Connection = { 
          try { 
            cpds.getConnection()
          } catch { 
            case ex:Exception => 
              ex.printStackTrace()
              null 
          } 
        }
    
        def close():Unit = { 
          try { 
            cpds.close()
          } catch { 
            case ex:Exception => 
              ex.printStackTrace()
          } 
        } 
      }
    
    }
    
    

    MySQLUtils封装了增加改查方法,直接使用即可

    package utils
    
    import java.sql。{Date,Timestamp} 
    import java.util.Properties
    
    import org.apache.log4j.Logger 
    import org.apache.spark.sql.types._ 
    import org.apache.spark.sql。{DataFrame,SQLContext} 
    / ** 
      *使用IntelliJ IDEA创建。
      *作者:fly_elephant@163.com 
      *描述:MySQL DDL和DML工具类
      *日期:创建于2018-11-17 12:43 
      * / 
    object MySQLUtils { 
      val logger:Logger = Logger.getLogger(getClass.getSimpleName)
    
      / ** 
        *将DataFrame所有类型(除id外)转换为String后,通过c3p0的连接池方法,向mysql写入数据
        * 
        * @param tableName表名
        * @param resultDateFrame DataFrame 
        * / 
      def saveDFtoDBUsePool(tableName:String ,resultDateFrame:DataFrame){ 
        val colNumbers = resultDateFrame.columns.length 
        val sql = getInsertSql(tableName,colNumbers)
        val columnDataTypes = resultDateFrame.schema.fields.map(_。dataType)
        resultDateFrame.foreachPartition(partitionRecords => { 
          val conn = MySQLPoolManager .getMysqlManager.getConnection //从连接池中获取一个连接
          val preparedStatement = conn.prepareStatement(sql)
          val metaData = conn.getMetaData.getColumns(null,“%”,tableName,“%”)//通过连接获取表名对应数据表的元数据
          try { 
            conn.setAutoCommit(false)
            partitionRecords.foreach(record => { 
              //注意:setString方法从1开始,record.getString()方法从0开始
              for(i < - 1 to colNumbers){ 
                val value = record。 get(i - 1)
                val dateType = columnDataTypes(i - 1)
                if(value!= null){//如何值不为空,将类型转换为String 
                  preparedStatement.setString(i,value.toString)
                  dateType match { 
                    case _:ByteType => preparedStatement.setInt(i,record.getAs [Int](i - 1))
                    case _:ShortType => preparedStatement.setInt(i,record.getAs [Int](i - 1))
                    case _: IntegerType => preparedStatement.setInt(i,record.getAs [Int](i - 1))
                    case _:LongType => preparedStatement.setLong(i,record.getAs [Long](i - 1))
                    case _:BooleanType => preparedStatement.setBoolean(i,record.getAs [Boolean](i - 1))
                    case _ :FloatType => preparedStatement.setFloat(i,record.getAs [Float](i - 1))
                    case _:DoubleType => preparedStatement.setDouble(i,record.getAs [Double](i - 1))
                    case _:StringType => preparedStatement.setString(i,record.getAs [String](i - 1))
                    case _:TimestampType => preparedStatement.setTimestamp(i,record.getAs [Timestamp](i - 1))
                    case _:DateType => preparedStatement.setDate(i,record.getAs [Date](i - 1))
                    case _ => throw new RuntimeException(s“nonsupport $ {dateType} !!!”)
                  } 
                } else {//如果值为空,将值设为对应类型的空值
                  metaData.absolute(i)
                  preparedStatement.setNull( i,metaData.getInt(“DATA_TYPE”))
                } 
              } 
              preparedStatement.addBatch()
            })
            preparedStatement.executeBatch()
            conn.commit()
          } catch { 
            case e:Exception => println(s“@@ saveDFtoDBUsePool $ {e。 getMessage}“)
            //做一些log 
          } finally { 
            preparedStatement.close()
            conn.close()
          } 
        })
      }
    
      / ** 
        *拼装插入SQL 
        * @param tableName 
        * @param colNumbers 
        * @return 
        * / 
      def getInsertSql(tableName:String,colNumbers:Int):String = { 
        var sqlStr =“insert into”+ tableName +“values(” 
        for (i < - 1 to colNumbers){ 
          sqlStr + =“?” 
          if if(i!= colNumbers){ 
            sqlStr + =“,” 
          } 
        } 
        sqlStr + =“)” 
        sqlStr 
      }
    
      / **以元组的方式返回mysql属性信息** / 
      def getMySQLInfo:(String,String,String)= { 
        val jdbcURL = PropertyUtils.getFileProperties(“mysql-user.properties”,“mysql.jdbc.url”)
        VAL的userName = PropertyUtils.getFileProperties( “mysql-user.properties”, “mysql.jdbc.username”)
        VAL密码= PropertyUtils.getFileProperties( “mysql-user.properties”, “mysql.jdbc.password”)
        (JDBCURL,用户名,passWord)
      }
    
      / ** 
        *从MySQL的数据库中获取DateFrame 
        * 
        * @参数sqlContext sqlContext 
        * @参数mysqlTableName表名
        * @参数queryCondition查询条件(可选)
        * @返回DateFrame 
        * / 
      DEF getDFFromMysql(sqlContext:SQLContext,mysqlTableName:字符串,queryCondition :String):DataFrame = { 
        val(jdbcURL,userName,passWord)= getMySQLInfo 
        val prop = new Properties()
        prop.put(“user”,userName)
        prop.put(“password”,passWord)
    
        if(null == queryCondition ||“”= = queryCondition)
          sqlContext.read.jdbc(jdbcURL,mysqlTableName,prop)
        else 
          sqlContext.read.jdbc(jdbcURL,mysqlTableName,prop).where(queryCondition)
      }
    
      / ** 
        *删除数据表
        * @param sqlContext 
        * @param mysqlTableName 
        * @return 
        * / 
      def dropMysqlTable(sqlContext:SQLContext,mysqlTableName:String):Boolean = { 
        val conn = MySQLPoolManager.getMysqlManager.getConnection //从连接池中获取一个连接
        val preparedStatement = conn.createStatement()
        try { 
          preparedStatement.execute(s“drop table $ mysqlTableName”)
        } catch { 
          case e:Exception => 
            println(s“mysql dropMysqlTable error:$ {e.getMessage}”)
            false 
        } finally { 
          preparedStatement.close()
          conn.close()
        } 
      }
    
      / ** 
        *删除表中的数据
        * @param sqlContext 
        * @param mysqlTableName 
        * @param condition 
        * @return 
        * / 
      def deleteMysqlTableData(sqlContext:SQLContext,mysqlTableName:String,condition:String):Boolean = { 
        val conn = MySQLPoolManager。 getMysqlManager.getConnection //从连接池中获取一个连接
        val preparedStatement = conn.createStatement()
        try { 
          preparedStatement.execute(s“从$ mysqlTableName中删除$ condition”)
        } catch { 
          case e:Exception => 
            println(s“ mysql deleteMysqlTable错误:$ {e.getMessage}“)
            false 
        } finally { 
          preparedStatement.close()
          conn.close()
        } 
      }
    
      / ** 
        *保存DataFrame到MySQL中,如果表不存在的话,会自动创建
        * @param tableName 
        * @param resultDateFrame 
        * / 
      def saveDFtoDBCreateTableIfNotExist(tableName:String,resultDateFrame:DataFrame){ 
        //如果没有表,根据DataFrame建表
        createTableIfNotExist(tableName,resultDateFrame)
        //验证数据表字段和dataFrame字段个数和名称,顺序是否一致
        verifyFieldConsistency(tableName,resultDateFrame)
        //保存df 
        saveDFtoDBUsePool(tableName,resultDateFrame)
      }
    
    
      / ** 
        *拼装insertOrUpdate SQL语句
        * @param tableName 
        * @param cols 
        * @param updateColumns 
        * @return 
        * / 
      def getInsertOrUpdateSql(tableName:String,cols:Array [String],updateColumns:Array [String]):String = { 
        val colNumbers = cols.length 
        var sqlStr =“insert into”+ tableName +“values(” 
        for(i < - 1 to colNumbers){ 
          sqlStr + =“?” 
          if if(i!= colNumbers){ 
            sqlStr + =“,” 
          } 
        } 
        sqlStr + = “)ON DUPLICATE KEY UPDATE”
    
        updateColumns.foreach(str => { 
          sqlStr + = s“$ str =?,” 
        })
    
        sqlStr.substring(0,sqlStr.length - 1)
      }
    
      / ** 
        *通过insertOrUpdate的方式把DataFrame写入到MySQL中,注意:此方式,必须对表设置主键
        * @param tableName 
        * @param resultDateFrame 
        * @param updateColumns 
        * / 
      def insertOrUpdateDFtoDBUsePool(tableName:String,resultDateFrame:DataFrame ,updateColumns:Array [String]){ 
        val colNumbers = resultDateFrame.columns.length 
        val sql = getInsertOrUpdateSql(tableName,resultDateFrame.columns,updateColumns)
        val columnDataTypes = resultDateFrame.schema.fields.map(_。dataType)
        println(“## ############ sql =“+ sql” 
        resultDateFrame.foreachPartition(partitionRecords => { 
          val conn = MySQLPoolManager.getMysqlManager.getConnection //从连接池中获取一个连接
          val preparedStatement = conn.prepareStatement(sql)
          val metaData = conn.getMetaData.getColumns(null,“%”,tableName,“%”)//通过连接获取表名对应数据表的元数据
          try { 
            conn.setAutoCommit(false )
            partitionRecords.foreach(record => { 
              //注意:setString方法从1开始,record.getString()方法从0开始
              for(i < - 1 to colNumbers){ 
                val value = record.get(i - 1)
                val dateType = columnDataTypes(i - 1)
                if(value!= null){//如何值不为空,将类型转换为String 
                  preparedStatement.setString(i,value.toString)
                  dateType match { 
                    case _:ByteType => preparedStatement。 setInt(i,record.getAs [Int](i - 1))
                    case _:ShortType => preparedStatement.setInt(i,record.getAs [Int](i - 1))
                    case _:IntegerType => preparedStatement.setInt(i,record.getAs [Int](i - 1))
                    case _ :LongType => preparedStatement.setLong(i,record.getAs [Long](i - 1))
                    case _:BooleanType => preparedStatement.setInt(i,if(record.getAs [Boolean](i - 1))1 else 0)
                    case _:FloatType => preparedStatement.setFloat(i,record.getAs [Float](i - 1))
                    case _:DoubleType => preparedStatement.setDouble(i,record.getAs [Double](i - 1))
                    case _:StringType => preparedStatement.setString(i,record.getAs [String](i - 1))
                    case _:TimestampType => preparedStatement.setTimestamp(i,record.getAs [Timestamp](i - 1))
                    case _:DateType => preparedStatement.setDate(i,record.getAs [Date](i - 1))
                    case _ =>抛出新的RuntimeException(s“nonsupport $ {dateType} !!!”)
                  } 
                } else {//如果值为空,将值设为对应类型的空值
                  metaData.absolute(i)
                  preparedStatement.setNull(i, metaData.getInt(“DATA_TYPE”))
                }
    
              } 
              //设置需要更新的字段值
              用于(ⅰ< - 1至updateColumns.length){ 
                VAL字段索引= record.fieldIndex(updateColumns(I - 1))
                VAL值= record.get(字段索引)
                VAL的dataType = columnDataTypes(字段索引)
                println(s“@@ $ fieldIndex,$ value,$ dataType”)
                if(value!= null){//如何值不为空,将类型转换为String 
                  dataType match { 
                    case _:ByteType => preparedStatement.setInt (colNumbers + i,record.getAs [Int](fieldIndex))
                    case _:ShortType => preparedStatement.setInt(colNumbers + i,record.getAs [Int](fieldIndex))
                    case _:IntegerType => preparedStatement.setInt(colNumbers + i,record.getAs [Int](fieldIndex))
                    case _:LongType => preparedStatement.setLong(colNumbers + i,record.getAs [Long](fieldIndex))
                    case _ :BooleanType => preparedStatement.setBoolean(colNumbers + i,record.getAs [Boolean](fieldIndex))
                    case _:FloatType => preparedStatement.setFloat(colNumbers + i,record.getAs [Float](fieldIndex))
                    case _:DoubleType => preparedStatement.setDouble(colNumbers + i,record.getAs [Double](fieldIndex))
                    case _:StringType => preparedStatement.setString(colNumbers + i,record.getAs [String](fieldIndex))
                    case _:TimestampType => preparedStatement.setTimestamp(colNumbers + i,record.getAs [Timestamp](fieldIndex))
                    case _:DateType => preparedStatement.setDate(colNumbers + i,record.getAs [Date](fieldIndex))
                    case _ =>抛出新的RuntimeException(s“nonsupport $ {dataType} !!!”)
                  } 
                } else {//如果值为空,将值设为对应类型的空值
                  metaData.absolute(colNumbers + i)
                  preparedStatement.setNull( colNumbers + i,metaData.getInt(“DATA_TYPE”))
                } 
              } 
              preparedStatement.addBatch()
            })
            preparedStatement.executeBatch()
            conn.commit()
          } catch {
            case e:Exception => println(s“@@ insertOrUpdateDFtoDBUsePool $ {e.getMessage}”)
            //做一些log 
          } finally { 
            preparedStatement.close()
            conn.close()
          } 
        })
      }
    
    
      / ** 
        *如果数据表不存在,根据DataFrame的字段创建数据表,数据表字段顺序和dataFrame对应
        *若DateFrame出现名为id的字段,将其设为数据库主键(int,自增,主键),其他字段会根据DataFrame的DataType类型来自动映射到MySQL中
        * 
        * @param tableName表名
        * @param df dataFrame 
        * @return 
        * / 
      def createTableIfNotExist(tableName:String,df:DataFrame):AnyVal = { 
        val con = MySQLPoolManager .getMysqlManager.getConnection 
        val metaData = con.getMetaData 
        val colResultSet = metaData.getColumns(null,“%”,tableName,“%”)
        //如果没有该表,创建数据表
        if(!colResultSet.next()){ 
          / /构建建表字符串
          val sb = new StringBuilder(s“CREATE 
          TABLE` $ tableName`(”)df.schema.fields.foreach(x =>
            if(x.name.equalsIgnoreCase(“id”)){ 
              sb.append(s“`$ {x.name}`int(255)NOT NULL AUTO_INCREMENT PRIMARY KEY,”)//如果是字段名为id,设置主键,整形,自增
            } else { 
              x.dataType match { 
                case _:ByteType => sb.append(s“`$ {x.name}`int(100)DEFAULT NULL,”)
                case _:ShortType => sb .append(s“`$ {x.name}`int(100)DEFAULT NULL,”)
                case _:IntegerType => sb.append(s“`$ {x.name}`int(100)DEFAULT NULL,” )
                case _:LongType => sb.append(s“`$ {x.name}`bigint(100)DEFAULT NULL,”)
                case _:BooleanType => sb.append(s“`$ {x.name}` tinyint DEFAULT NULL,“)
                case _:FloatType => sb.append(s”`$ {x。name}`float(50)DEFAULT NULL,“)
                case _:DoubleType => sb.append(s“`$ {x.name}`double(50)DEFAULT NULL,”)
                case _:StringType => sb.append(s“`$ {x.name}`varchar (50)DEFAULT NULL,“)
                case _:TimestampType => sb.append(s”`$ {x.name}`timestamp DEFAULT current_timestamp,“)
                case _:DateType => sb.append(s”`$ {x .name}`date DEFAULT NULL,“)
                case _ => throw new RuntimeException(s”nonsupport $ {x.dataType} !!!“)
              } 
            } 
          )
          sb.append(”)ENGINE = InnoDB DEFAULT CHARSET = utf8“)
          val sql_createTable = sb.deleteCharAt(sb.lastIndexOf(','))。toString()
          println(sql_createTable)
          val statement = con。的createStatement()
          statement.execute(sql_createTable)
        } 
      }
    
      / ** 
        *验证数据表和dataFrame字段个数,名称,顺序是否一致
        * 
        * @param tableName表名
        * @param df dataFrame 
        * / 
      def verifyFieldConsistency(tableName:String,df:DataFrame):Unit = { 
        val con = MySQLPoolManager.getMysqlManager.getConnection 
        val metaData = con.getMetaData 
        val colResultSet = metaData.getColumns(null,“%”,tableName,“%”)
        colResultSet.last()
        val tableFiledNum = colResultSet.getRow 
        val dfFiledNum = df.columns.length 
        if (tableFiledNum!= dfFiledNum){ 
          throw new Exception(s“数据表和DataFrame字段个数不一致!! table - $ tableFiledNum但dataFrame - $ dfFiledNum”)
        } 
        for(i < - 1 to tableFiledNum){
          colResultSet.absolute(I)
          VAL tableFileName = colResultSet.getString( “COLUMN_NAME”)
          VAL dfFiledName = df.columns.apply(I - 1) 
          (!tableFileName.equals(dfFiledName)){IF 
            抛出新的异常(一个或多个“数据表和DataFrame字段名不一致!! table - '$ tableFileName'但dataFrame - '$ dfFiledName'“)
          } 
        } 
        colResultSet.beforeFirst()
      }
    
    } 
    

    相关文章

      网友评论

      本文标题:SparkSQL DataFrame与MySQL增删改查那些事儿

      本文链接:https://www.haomeiwen.com/subject/hitaqqtx.html