Flink Examples:Batch examples
官网链接:https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/batch/examples.html
以下示例程序展示了Flink的不同应用程序,从简单的字数统计到图形算法。代码示例说明了Flink的DataSet API的使用。
运行一个demo
最简单的的demo就是直接运行worldcount了。
./bin/start-cluster.sh
## 使用内在文件
./bin/flink run ./examples/batch/WordCount.jar
## 可以添加参数
./bin/flink run ./examples/batch/WordCount.jar --input /path/to/some/text/data --output /path/to/result
WordCount
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._
object WordCount {
def main(args: Array[String]): Unit = {
val params=ParameterTool.fromArgs(args)
val env=ExecutionEnvironment.getExecutionEnvironment
env.getConfig.setGlobalJobParameters(params)
val text=
if(params.has("input")){
env.readTextFile(params.get("input"))
}else{
println("Executing WordCount example with default input data set.")
println("Use --input to specify file input.")
env.readTextFile("/Users/lorenyplv/software/flink-1.6.0/README.txt")
}
val counts=text.flatMap{_.toLowerCase.split("\\W") filter{_.nonEmpty}}.map{(_,1)}
.groupBy(0)
.sum(1)
if(params.has("output")){
counts.writeAsCsv(params.get("output"), "\n", " ")
env.execute("Scala WordCount Example")
}else{
println("Printing result to stdout. Use --output to specify output path.")
counts.print()
}
}
}
网友评论