美文网首页大数据工具包
Structured Streaming自定义MySQLSink

Structured Streaming自定义MySQLSink

作者: 0o青团o0 | 来源:发表于2019-04-04 11:29 被阅读0次

    1.foreachBatch
    spark2.4以后可以直接使用foreachBatch调用sparksql支持的jdbc批量写mysql,如下:

    /*使用2.4foreachBatch*/
    val connectionProperties = PropertyConstants.getProperties()
    resultDF
      .writeStream
      .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
        batchDF.write().mode(SaveMode.Append).jdbc(connectionProperties.getProperty("url"),
            "tableName", connectionProperties)
      }
      .outputMode("Update")
      .start
    

    2.foreach
    但是批写模式要么append,要么overwrite,不能按唯一键更新数据,故需自定义sink。

    import java.sql.{Connection, DriverManager, Timestamp}
    
    import com.xxx.bigdata.utils.PropertyConstants
    import org.apache.spark.sql.{ForeachWriter, Row}
    
    import scala.collection.mutable.ArrayBuffer
    
    class MySQLSink(tableName: String, fieldNames: Array[String]) extends ForeachWriter[Row]() {
      val connectionProperties = PropertyConstants.getProperties()
      var conn: Connection = _
    
      override def open(partitionId: Long, epochId: Long): Boolean = {
        Class.forName("com.mysql.jdbc.Driver")
        conn = DriverManager.getConnection(connectionProperties.getProperty("url"),
          connectionProperties)
        conn.setAutoCommit(false)
        true
      }
    
      override def process(value: Row): Unit = {
        val values = ArrayBuffer[String]()
        value.toSeq.foreach(_ => values += "?")
        val ps = conn.prepareStatement(
          s"""
             |replace into $tableName${fieldNames.mkString("(", ",", ")")}
             |values${values.mkString("(", ",", ")")}
           """.stripMargin)
    
        for (i <- 0 until value.size) {
          value.get(i) match {
            case v: Int => ps.setInt(i + 1, v)
            case v: Long => ps.setLong(i + 1, v)
            case v: Float => ps.setFloat(i + 1, v)
            case v: Double => ps.setDouble(i + 1, v)
            case v: String => ps.setString(i + 1, v)
            case v: Timestamp => ps.setTimestamp(i + 1, v)
          }
        }
        ps.execute()
        conn.commit()
      }
    
      override def close(errorOrNull: Throwable): Unit = {
        conn.close()
      }
    }
    

    调用:

        /*使用自定义MySQLSink */
        val mysqlSink = new MySQLSink("tableName", resultDF.schema.fieldNames)
        resultDF
          .writeStream
          .outputMode("update")
          .foreach(mysqlSink)
          .start
    

    3.功能扩展
    此为单条插入,也可在close里扩展批量,类型匹配也可以扩展。

    相关文章

      网友评论

        本文标题:Structured Streaming自定义MySQLSink

        本文链接:https://www.haomeiwen.com/subject/folfiqtx.html