下载 Flink
https://flink.apache.org/downloads.html#apache-flink-180
1. Download a binary from the [downloads page](http://flink.apache.org/downloads.html).
You can pick any Hadoop/Scala combination you like.
If you plan to just use the local file system, any Hadoop version will work fine.
2. Go to the download directory.
3. Unpack the downloaded archive.
$ cd ~/Downloads # Go to download directory
$ tar xzf flink-*.tgz # Unpack the downloaded archive
$ cd flink-1.8.0
Start a Local Flink Cluster
$ ./bin/start-cluster.sh # Start Flink
Check the Dispatcher’s web frontend at http://localhost:8081 and make sure everything is up and running. The web frontend should report a single available TaskManager instance.
通过web 检查是否启动成功
访问http://localhost:8081
看到如下的内容代表已经ok了
运行demo
/**
* @author river
* @date 2019/4/18 15:57
**/
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception {
// the port to connect to
final int port;
try {
// final ParameterTool params = ParameterTool.fromArgs(args);
// port = params.getInt("port");
port = 9000;
} catch (Exception e) {
System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'");
return;
}
// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data by connecting to the socket
DataStream<String> text = env.socketTextStream("localhost", port, "\n");
// parse the data, group it, window it, and aggregate the counts
DataStream<WordWithCount> windowCounts = text
.flatMap(new FlatMapFunction<String, WordWithCount>() {
@Override
public void flatMap(String value, Collector<WordWithCount> out) {
for (String word : value.split("\\s")) {
out.collect(new WordWithCount(word, 1L));
}
}
})
.keyBy("word")
.timeWindow(Time.seconds(5), Time.seconds(1))
.reduce(new ReduceFunction<WordWithCount>() {
@Override
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
return new WordWithCount(a.word, a.count + b.count);
}
});
// print the results with a single thread, rather than in parallel
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
}
// Data type for words with count
public static class WordWithCount {
public String word;
public long count;
public WordWithCount() {}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return word + " : " + count;
}
}
}
开启端口9000
[river@s201 ~]$ nc -l 9000
hello lucy how are you
hello
how are you
通过flink 执行
[river@s201 flink-1.8.0]$ ./bin/flink run -c com.river.SocketWindowWordCount ~/Downloads/flink-demo-1.0-SNAPSHOT.jar
Starting execution of program
查看日志
[river@s201 ~]$ tail -f /soft/flink-1.8.0/log/flink-river-taskexecutor-0-s201.out
how : 1
hello : 1
lucy : 1
are : 1
you : 1
hello : 1
hello : 1
hello : 1
hello : 1
hello : 1
how : 1
are : 1
you : 1
are : 1
通过web 端也可以看到任务执行状态
image.png
参考地址
https://ci.apache.org/projects/flink/flink-docs-release-1.8/tutorials/local_setup.html
https://ci.apache.org/projects/flink/flink-docs-release-1.8/tutorials/flink_on_windows.html
网友评论