1.foreachBatch
spark2.4以后可以直接使用foreachBatch调用sparksql支持的jdbc批量写mysql,如下:
/*使用2.4foreachBatch*/
val connectionProperties = PropertyConstants.getProperties()
resultDF
.writeStream
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.write().mode(SaveMode.Append).jdbc(connectionProperties.getProperty("url"),
"tableName", connectionProperties)
}
.outputMode("Update")
.start
2.foreach
但是批写模式要么append,要么overwrite,不能按唯一键更新数据,故需自定义sink。
import java.sql.{Connection, DriverManager, Timestamp}
import com.xxx.bigdata.utils.PropertyConstants
import org.apache.spark.sql.{ForeachWriter, Row}
import scala.collection.mutable.ArrayBuffer
class MySQLSink(tableName: String, fieldNames: Array[String]) extends ForeachWriter[Row]() {
val connectionProperties = PropertyConstants.getProperties()
var conn: Connection = _
override def open(partitionId: Long, epochId: Long): Boolean = {
Class.forName("com.mysql.jdbc.Driver")
conn = DriverManager.getConnection(connectionProperties.getProperty("url"),
connectionProperties)
conn.setAutoCommit(false)
true
}
override def process(value: Row): Unit = {
val values = ArrayBuffer[String]()
value.toSeq.foreach(_ => values += "?")
val ps = conn.prepareStatement(
s"""
|replace into $tableName${fieldNames.mkString("(", ",", ")")}
|values${values.mkString("(", ",", ")")}
""".stripMargin)
for (i <- 0 until value.size) {
value.get(i) match {
case v: Int => ps.setInt(i + 1, v)
case v: Long => ps.setLong(i + 1, v)
case v: Float => ps.setFloat(i + 1, v)
case v: Double => ps.setDouble(i + 1, v)
case v: String => ps.setString(i + 1, v)
case v: Timestamp => ps.setTimestamp(i + 1, v)
}
}
ps.execute()
conn.commit()
}
override def close(errorOrNull: Throwable): Unit = {
conn.close()
}
}
调用:
/*使用自定义MySQLSink */
val mysqlSink = new MySQLSink("tableName", resultDF.schema.fieldNames)
resultDF
.writeStream
.outputMode("update")
.foreach(mysqlSink)
.start
3.功能扩展
此为单条插入,也可在close里扩展批量,类型匹配也可以扩展。
网友评论