美文网首页大数据工具包
Structured Streaming自定义MySQLSink

Structured Streaming自定义MySQLSink

作者: 0o青团o0 | 来源:发表于2019-12-11 13:52 被阅读0次

    之前使用过foreach单条处理的MySQLSink,可能导致连续开关连接,性能较差,故通过prepareStatement的addBatch批量处理数据。

    class MySQLBatchSink(urlName: String, properties: Properties,
                         tableName: String,
                         fieldNames: Array[String],
                         sqlType: SqlType.Value) extends ForeachWriter[Row]() with Logging {
      Class.forName(properties.getProperty("driver"))
      var conn: Connection = _
      var ps: PreparedStatement = _
    
      val values: String = fieldNames.map(_ => "?").mkString(",")
      val sqlStr: String = sqlType match {
        case SqlType.Replace => s"REPLACE INTO `$tableName` (`${fieldNames.mkString("`,`")}`) VALUES ($values)"
        case SqlType.Update => s"UPDATE `$tableName` SET `${fieldNames.mkString("`=?, `")}`=?"
        case SqlType.Upsert =>
          s"""
             |INSERT INTO `$tableName` (`${fieldNames.mkString("`,`")}`) VALUES ($values)
             |ON DUPLICATE KEY UPDATE `${fieldNames.mkString("`=?, `")}`=?
             |""".stripMargin
      }
    
      override def open(partitionId: Long, epochId: Long): Boolean = {
        conn = DriverManager.getConnection(properties.getProperty(urlName), properties)
        conn.setAutoCommit(false)
        ps = conn.prepareStatement(sqlStr)
        true
      }
    
      override def process(value: Row): Unit = {
        for (i <- 0 until value.size) {
          val index_head: Int = i + 1
          prepareStatament(value, i, index_head)
    
          if (sqlType == SqlType.Upsert) {
            val index_tail: Int = index_head + value.size
            prepareStatament(value, i, index_tail)
          }
        }
        ps.addBatch()
      }
    
      override def close(errorOrNull: Throwable): Unit = {
        try {
          ps.executeBatch()
          conn.commit()
        } catch {
          case NonFatal(e) => logWarning("Exception committing transaction", e)
        } finally {
          try {
            if (ps != null) ps.close()
          } catch {
            case e: Exception => logWarning("Exception closing prepareStatement", e)
          }
    
          try {
            if (conn != null) conn.close()
          } catch {
            case e: Exception => logWarning("Exception closing connection", e)
          }
        }
      }
    
      private def prepareStatament(value: Row, i: Int, index: Int): Unit = {
        value.get(i) match {
          case v: Int => ps.setInt(index, v)
          case v: Long => ps.setLong(index, v)
          case v: String => ps.setString(index, v)
          case v: Timestamp => ps.setTimestamp(index, v)
          case v: Float => ps.setFloat(index, v)
          case v: Double => ps.setDouble(index, v)
          case v: java.math.BigDecimal => ps.setBigDecimal(index, v)
          case v: Boolean => ps.setBoolean(index, v)
          case v: Byte => ps.setByte(index, v)
          case v: Short => ps.setShort(index, v)
          case null => ps.setNull(index, SparkUtil.sparkTypeToSqlType(value.schema.fields(i).dataType))
          case _ => throw new IllegalArgumentException(s"No support for Spark SQL type ${value.schema.fields(i).dataType}")
        }
      }
    }
    

    这样在process按条处理时只是addBatch,真正提交是在close时(即当前批次结束)才执行并关闭连接。

    相关文章

      网友评论

        本文标题:Structured Streaming自定义MySQLSink

        本文链接:https://www.haomeiwen.com/subject/htmogctx.html