代码
public class SourceFromFile extends RichSourceFunction<String> {
private volatile Boolean isRunning = true;
@Override
public void run(SourceContext ctx) throws Exception {
BufferedReader bufferedReader = new BufferedReader(new FileReader("E:\\documents\\test.txt"));
while (isRunning) {
String line = bufferedReader.readLine();
if (StringUtils.isBlank(line)) {
continue;
}
ctx.collect(line);
TimeUnit.SECONDS.sleep(10);
}
}
@Override
public void cancel() {
isRunning = false;
}
}
public class WindowWordCount {
public static void main(String[] args) throws Exception {
final ParameterTool parameters = ParameterTool.fromArgs(args);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameters);
env.enableCheckpointing(5000, CheckpointingMode.AT_LEAST_ONCE);
DataStreamSource<String> dataStream = env.addSource(new SourceFromFile()).setParallelism(1);
dataStream
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] arr = value.split(",");
for (String item : arr) {
out.collect(new Tuple2<>(item, 1));
}
}
})
.keyBy(0)
.timeWindow(Time.minutes(1))
.sum(1)
.print();
env.execute("WindowWordCount");
}
}
测试
文件中输入:
aaa,bbb,ccc
结果:
2> (bbb,1)
3> (aaa,1)
4> (ccc,1)
增加输入:
bbb,ccc,ddd
结果:
4> (ccc,1)
2> (bbb,1)
4> (ddd,1)
增加输入:
aaa,aaa,aaa
结果:
3> (aaa,3)
网友评论