之前使用过foreach单条处理的MySQLSink,可能导致连续开关连接,性能较差,故通过prepareStatement的addBatch批量处理数据。
class MySQLBatchSink(urlName: String, properties: Properties,
tableName: String,
fieldNames: Array[String],
sqlType: SqlType.Value) extends ForeachWriter[Row]() with Logging {
Class.forName(properties.getProperty("driver"))
var conn: Connection = _
var ps: PreparedStatement = _
val values: String = fieldNames.map(_ => "?").mkString(",")
val sqlStr: String = sqlType match {
case SqlType.Replace => s"REPLACE INTO `$tableName` (`${fieldNames.mkString("`,`")}`) VALUES ($values)"
case SqlType.Update => s"UPDATE `$tableName` SET `${fieldNames.mkString("`=?, `")}`=?"
case SqlType.Upsert =>
s"""
|INSERT INTO `$tableName` (`${fieldNames.mkString("`,`")}`) VALUES ($values)
|ON DUPLICATE KEY UPDATE `${fieldNames.mkString("`=?, `")}`=?
|""".stripMargin
}
override def open(partitionId: Long, epochId: Long): Boolean = {
conn = DriverManager.getConnection(properties.getProperty(urlName), properties)
conn.setAutoCommit(false)
ps = conn.prepareStatement(sqlStr)
true
}
override def process(value: Row): Unit = {
for (i <- 0 until value.size) {
val index_head: Int = i + 1
prepareStatament(value, i, index_head)
if (sqlType == SqlType.Upsert) {
val index_tail: Int = index_head + value.size
prepareStatament(value, i, index_tail)
}
}
ps.addBatch()
}
override def close(errorOrNull: Throwable): Unit = {
try {
ps.executeBatch()
conn.commit()
} catch {
case NonFatal(e) => logWarning("Exception committing transaction", e)
} finally {
try {
if (ps != null) ps.close()
} catch {
case e: Exception => logWarning("Exception closing prepareStatement", e)
}
try {
if (conn != null) conn.close()
} catch {
case e: Exception => logWarning("Exception closing connection", e)
}
}
}
private def prepareStatament(value: Row, i: Int, index: Int): Unit = {
value.get(i) match {
case v: Int => ps.setInt(index, v)
case v: Long => ps.setLong(index, v)
case v: String => ps.setString(index, v)
case v: Timestamp => ps.setTimestamp(index, v)
case v: Float => ps.setFloat(index, v)
case v: Double => ps.setDouble(index, v)
case v: java.math.BigDecimal => ps.setBigDecimal(index, v)
case v: Boolean => ps.setBoolean(index, v)
case v: Byte => ps.setByte(index, v)
case v: Short => ps.setShort(index, v)
case null => ps.setNull(index, SparkUtil.sparkTypeToSqlType(value.schema.fields(i).dataType))
case _ => throw new IllegalArgumentException(s"No support for Spark SQL type ${value.schema.fields(i).dataType}")
}
}
}
这样在process按条处理时只是addBatch,真正提交是在close时(即当前批次结束)才执行并关闭连接。
网友评论