这篇文章以flink官方local cluster教程为主线,引导大家体验一下flink的初次开发。文章中所提到的代码我已经放到github上,欢迎指正。
下载和启动Flink
Flink可以运行在Linux、Mac和Windows上,唯一的要求就是必须安装Java 8或以上版本。可以使用一下命令查看:
(py2.7) bogon:flink-examples yss$ java -version
java version "1.8.0_181"
Java(TM) SE Runtime Environment (build 1.8.0_181-b13)
Java HotSpot(TM) 64-Bit Server VM (build 25.181-b13, mixed mode)
java安装后去官网下载Flink,然后解压即可运行。这里以flink-1.7.0为例。
cd ~/Downloads/
tar xzf flink-1.7.0-bin-scala_2.12.tgz
cd flink-1.7.0
以本地模式启动Flink
在flink目录下运行以下命令即可启动
./bin/start-cluster.sh # Start Flink
这时候可以打开浏览器访问http://localhost:8081
]flink的监控页面。
我们通过jps可以看到多出来两个JVM进程,运行的主类StandaloneSessionClusterEntrypoint和TaskManagerRunner。
[root@henghe-121 bin]# jps -l
16016 org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
16513 org.apache.flink.runtime.taskexecutor.TaskManagerRunner
16619 sun.tools.jps.Jps
集群启动后就可以开发我们的flink程序了~
写第一个Flink WorkCount程序
官网给出的Scala例子如下:
object SocketWindowWordCount {
def main(args: Array[String]) : Unit = {
// the port to connect to
val port: Int = try {
ParameterTool.fromArgs(args).getInt("port")
} catch {
case e: Exception => {
System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'")
return
}
}
// get the execution environment
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// get input data by connecting to the socket
val text = env.socketTextStream("localhost", port, '\n')
// parse the data, group it, window it, and aggregate the counts
val windowCounts = text
.flatMap { w => w.split("\\s") }
.map { w => WordWithCount(w, 1) }
.keyBy("word")
.timeWindow(Time.seconds(5), Time.seconds(1))
.sum("count")
// print the results with a single thread, rather than in parallel
windowCounts.print().setParallelism(1)
env.execute("Socket Window WordCount")
}
// Data type for words with count
case class WordWithCount(word: String, count: Long)
}
代码本身不难,不过建立scala的Maven工程,打成可执行jar还需要很多设置。在这里我将官方的例子配置做了很多简化,对于初学者更友好,并将代码上传到了github上。大家clone之后,直接运行mvn clean package就可以生成jar包。
运行WordCount
对于这个例子首先要用netcat创建服务,监听某个端口。
nc -l 9000
之后就可以提交job任务了
$ ./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000
Starting execution of program
然后再nc端输入要处理的字符串,flink就可以拿到数据进行处理
$ nc -l 9000
lorem ipsum
ipsum ipsum ipsum
bye
该任务的输出在flink的log下以.out结尾的文件下。
$ tail -f flink-yss-taskexecutor-0-bogon.out
WordWithCount(hello,1)
WordWithCount(sdsd,1)
WordWithCount(sdsd,1)
WordWithCount(sss,1)
最后测试完可以关闭集群:
./bin/stop-cluster.sh
网友评论