Output Operations将DStream的数据推送到外部系统,如数据库或文件系统。类似于RDD的惰性求值,输出操作才会触发计算的实际执行。
- print()
在驱动器程序中打印每个批次中的前十个元素,通常用于调试模式。 - saveAsTextFiles(prefix, [suffix])
将此DStream的内容保存为文本文件。每个批处理间隔的文件名基于前缀和后缀:“prefix-TIME_IN_MS [.suffix]”。 - saveAsObjectFiles(prefix, [suffix])
将此DStream的内容保存为SequenceFiles序列化的Java对象。每个批处理间隔的文件名基于前缀和后缀:“prefix-TIME_IN_MS [.suffix]” - saveAsHadoopFiles(prefix, [suffix])
将此DStream的内容保存为Hadoop文件。每个批处理间隔的文件名基于前缀和后缀:“prefix-TIME_IN_MS [.suffix]” - foreachRDD(func)
最通用的输出运算符,将函数func应用于从流中生成的每个RDD。此功能将每个RDD中的数据推送到外部系统,例如将RDD保存到文件,或将其写入数据库,func内通常有RDD的action操作
前面几个都太简单,直接调用方法即可,只演示spark将数据输出插入到mysql数据库的代码
import java.sql.DriverManager
import org.apache.spark._
import org.apache.spark.streaming._
val ssc = new StreamingContext(sc,Seconds(5))
val wordcount = ssc.socketTextStream("localhost", 7799).flatMap(_.split(" ")).map(word => (word, 1))
wordcount.foreachRDD(wd => wd.foreachPartition(
data => {
val conn = DriverManager.getConnection("jdbc:mysql://master:3306/test", "root", "123456")
try {
for (row <- data) {
println("input data is " + row._1 + " " + row._2)
val sql = "insert into stream(word,num) values(" + "'" + row._1 + "'," + row._2 + ")"
conn.prepareStatement(sql).executeUpdate()
}
} finally {
conn.close()
}
}))
ssc.start()
ssc.awaitTermination()
网友评论