美文网首页大数据工具包
Structured Streaming自定义MySQLSink

Structured Streaming自定义MySQLSink

作者: 0o青团o0 | 来源:发表于2019-04-04 11:29 被阅读0次

1.foreachBatch
spark2.4以后可以直接使用foreachBatch调用sparksql支持的jdbc批量写mysql,如下:

/*使用2.4foreachBatch*/
val connectionProperties = PropertyConstants.getProperties()
resultDF
  .writeStream
  .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    batchDF.write().mode(SaveMode.Append).jdbc(connectionProperties.getProperty("url"),
        "tableName", connectionProperties)
  }
  .outputMode("Update")
  .start

2.foreach
但是批写模式要么append,要么overwrite,不能按唯一键更新数据,故需自定义sink。

import java.sql.{Connection, DriverManager, Timestamp}

import com.xxx.bigdata.utils.PropertyConstants
import org.apache.spark.sql.{ForeachWriter, Row}

import scala.collection.mutable.ArrayBuffer

class MySQLSink(tableName: String, fieldNames: Array[String]) extends ForeachWriter[Row]() {
  val connectionProperties = PropertyConstants.getProperties()
  var conn: Connection = _

  override def open(partitionId: Long, epochId: Long): Boolean = {
    Class.forName("com.mysql.jdbc.Driver")
    conn = DriverManager.getConnection(connectionProperties.getProperty("url"),
      connectionProperties)
    conn.setAutoCommit(false)
    true
  }

  override def process(value: Row): Unit = {
    val values = ArrayBuffer[String]()
    value.toSeq.foreach(_ => values += "?")
    val ps = conn.prepareStatement(
      s"""
         |replace into $tableName${fieldNames.mkString("(", ",", ")")}
         |values${values.mkString("(", ",", ")")}
       """.stripMargin)

    for (i <- 0 until value.size) {
      value.get(i) match {
        case v: Int => ps.setInt(i + 1, v)
        case v: Long => ps.setLong(i + 1, v)
        case v: Float => ps.setFloat(i + 1, v)
        case v: Double => ps.setDouble(i + 1, v)
        case v: String => ps.setString(i + 1, v)
        case v: Timestamp => ps.setTimestamp(i + 1, v)
      }
    }
    ps.execute()
    conn.commit()
  }

  override def close(errorOrNull: Throwable): Unit = {
    conn.close()
  }
}

调用:

    /*使用自定义MySQLSink */
    val mysqlSink = new MySQLSink("tableName", resultDF.schema.fieldNames)
    resultDF
      .writeStream
      .outputMode("update")
      .foreach(mysqlSink)
      .start

3.功能扩展
此为单条插入,也可在close里扩展批量,类型匹配也可以扩展。

相关文章

网友评论

    本文标题:Structured Streaming自定义MySQLSink

    本文链接:https://www.haomeiwen.com/subject/folfiqtx.html