生产场景可能是用大数据的文件系统,本机学习,就使用本地文件吧。
一,代码
package org.bbk.flink
import org.apache.commons.io.FileUtils
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.accumulators.LongCounter
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
object Demo {
def main(args:Array[String]):Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
env.registerCachedFile("D:\\tmp\\count.txt", "advert")
val data = env.fromElements("hello", "flink", "spark", "dataset")
val result = data.map(new RichMapFunction[String, String] {
override def open(parameters: Configuration): Unit = {
super.open(parameters)
val myFile = getRuntimeContext.getDistributedCache.getFile("advert")
val lines = FileUtils.readLines(myFile)
val it = lines.iterator()
while (it.hasNext) {
val line = it.next()
println("line: " + line)
}
}
override def map(value: String) = {
value
}
}).setParallelism(2)
result.print()
env.execute()
}
}
网友评论