一、maven项目的pom.xml中的依赖
<properties>
<flink.version>1.9.1</flink.version>
</properties>
<!--引入flink依赖-->
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
二、测试数据
input file path:./hello.txt
hello world
hello flink
hello spark
hello scala
how are you
fine thank you
and you
三、Flink WordCount Java版
package com.cn.wc;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class StreamWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行度
env.setParallelism(2);
String inputPath = "C:\\workstation\\maven_project\\flink_wordcount\\src\\main\\resources\\hello.txt";
// DataStreamSource 继承SingleOutputStreamOperator,其继承DataStream
// 从文件中读取数据并模仿流式数据
DataStreamSource<String> inputDataStream = env.readTextFile(inputPath);
// 从socket(localhost:9000 可自己定义)文本流读取数据
// DataStreamSource<String> inputDataStream2 = env.socketTextStream("localhost", 9000);
// 基于数据流进行转换操作
SingleOutputStreamOperator<Tuple2<String, Integer>> result = inputDataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] words = s.split(" ");
for (String word : words) {
collector.collect(new Tuple2<String, Integer>(word, 1));
}
}
})
.keyBy(0) // 针对相同的word 合并,批处理groupby,流处理keyby
.sum(1);
result.print(); // 到这里不会输出
// 触发流执行任务
env.execute();
}
}
四、运行结果
注:随着流的不断的触发任务会不断更新结果。
2> (how,1)
1> (hello,1)
2> (you,1)
2> (fine,1)
1> (hello,2)
2> (you,2)
1> (hello,3)
2> (and,1)
1> (spark,1)
1> (hello,4)
2> (you,3)
1> (scala,1)
2> (world,1)
1> (are,1)
1> (thank,1)
2> (flink,1)
网友评论