美文网首页
spark写数据到mysql第二弹

spark写数据到mysql第二弹

作者: M2shad0w | 来源:发表于2016-01-28 11:55 被阅读0次

    需求

    分析的中间数据, 很多需要持久化到关系型数据库, 以便后续的二次分析, 在官方给出insert指定字段的接口之前我先实现自己的方法吧。

    背景

    之前有一篇文章 spark SQL操作之关系型数据库

    简单讲解了spark 写mysql的接口。

    逐行指定字段写入数据库, 我们必须能拿到每一行数据。在spark SQL 操作完的对象是一个 RDD, spark SQL scala api说明 上有一个api:

    def foreachPartition(f: (Iterator[T]) ⇒ Unit):Unit
    // Applies a function f to each partition of this RDD.
    

    因此我们只需要实现一个参数是 Iterator

    类型的函数, 就能取出每一行的数据。

    代码实现实例

    闭包

    val keyWords = sqlContext.sql("your sql ")
    // scala 闭包, 传参数
    def keyWordsr2mysql(iter: Iterator[org.apache.spark.sql.Row]): Unit = {
    val keyTags = Array("count_start_time", "kind")
    val tags = Array("word", "count")
    write2mysql(iter, "keyWordCount", keyTags, tags, sStartTime, kind)
    }
    

    抽象写数据库

    将数据库表名, 指定字段等以参数形式传入, 接口更抽象通用

    def write2mysql(iter :Iterator[org.apache.spark.sql.Row], sTable: String, keytags :Array[String], tags :Array[String], args:Any*): Unit = {
    // your code
    }

    相关文章

      网友评论

          本文标题:spark写数据到mysql第二弹

          本文链接:https://www.haomeiwen.com/subject/qscjkttx.html