美文网首页
SparkStreaming入门教程(五)输出操作Output

SparkStreaming入门教程(五)输出操作Output

作者: 胖滚猪学编程 | 来源:发表于2018-02-25 19:03 被阅读0次

    Output Operations将DStream的数据推送到外部系统,如数据库或文件系统。类似于RDD的惰性求值,输出操作才会触发计算的实际执行。

    • print()
      在驱动器程序中打印每个批次中的前十个元素,通常用于调试模式。
    • saveAsTextFiles(prefix, [suffix])
      将此DStream的内容保存为文本文件。每个批处理间隔的文件名基于前缀和后缀:“prefix-TIME_IN_MS [.suffix]”。
    • saveAsObjectFiles(prefix, [suffix])
      将此DStream的内容保存为SequenceFiles序列化的Java对象。每个批处理间隔的文件名基于前缀和后缀:“prefix-TIME_IN_MS [.suffix]”
    • saveAsHadoopFiles(prefix, [suffix])
      将此DStream的内容保存为Hadoop文件。每个批处理间隔的文件名基于前缀和后缀:“prefix-TIME_IN_MS [.suffix]”
    • foreachRDD(func)
      最通用的输出运算符,将函数func应用于从流中生成的每个RDD。此功能将每个RDD中的数据推送到外部系统,例如将RDD保存到文件,或将其写入数据库,func内通常有RDD的action操作

    前面几个都太简单,直接调用方法即可,只演示spark将数据输出插入到mysql数据库的代码

    import java.sql.DriverManager
    import org.apache.spark._
    import org.apache.spark.streaming._
    val ssc = new StreamingContext(sc,Seconds(5))   
    val wordcount = ssc.socketTextStream("localhost", 7799).flatMap(_.split(" ")).map(word => (word, 1))
    wordcount.foreachRDD(wd => wd.foreachPartition(
          data => {
            val conn = DriverManager.getConnection("jdbc:mysql://master:3306/test", "root", "123456")
            try {
              for (row <- data) {
                println("input data is " + row._1 + "  " + row._2)
                val sql = "insert into stream(word,num) values(" + "'" + row._1 + "'," + row._2 + ")"
                conn.prepareStatement(sql).executeUpdate()
              }
            } finally {
              conn.close()
            }
          }))
          
    ssc.start()
    ssc.awaitTermination()
    

    相关文章

      网友评论

          本文标题:SparkStreaming入门教程(五)输出操作Output

          本文链接:https://www.haomeiwen.com/subject/vesztftx.html