Flink窗口按行为分有滑动滚动窗口,按划分标准有事件时间窗口。本节将演示各个窗口的使用。
1、 系统、软件以及前提约束
- CentOS 7 64 工作站 作者的机子ip是192.168.100.200,请读者根据自己实际情况设置
- idea 2018.1
- 在Win10中安装nc
https://www.jianshu.com/p/4f6fb8834ad9
2、操作步骤
- 创建maven项目,加入以下依赖:
<dependency>
<!--spark依赖-->
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<!--scala依赖-->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.5.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.5.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.5.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.5.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>1.5.0</version>
</dependency>
- 在项目的src/main/java文件夹下创建WindowTest.java
package flink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.apache.storm.trident.windowing.config.SlidingCountWindow;
public class WindowTest {
public static void main(String[] args) throws Exception {
// local模式
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
DataStream<String> text = env.socketTextStream("localhost", 9999, "\n");
@SuppressWarnings("serial")
DataStream<WordWithCount> windowCounts = text
.flatMap(new FlatMapFunction<String, WordWithCount>() {
public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
for (String word : value.split("\\s")) {
out.collect(new WordWithCount(word, 1L));
}
}
})
//滑动窗口按数据个数划分
// .keyBy("word").countWindow(10,5)
//滚动窗口按数据个数划分
// .keyBy("word").countWindow(5)
//滚动窗口按时间划分
// .keyBy("word").timeWindow(Time.seconds(5))
//滑动窗口按时间划分
.keyBy("word").timeWindow(Time.seconds(10), Time.seconds(5))
.reduce(new ReduceFunction<WordWithCount>() {
public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception {
return new WordWithCount(a.word, a.count + b.count);
}
});
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount(zl_test)");
}
public static class WordWithCount {
public String word;
public long count;
public WordWithCount() {
}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return word + " : " + count;
}
}
}
- 测试
(1)打开windows命令行,执行以下命令:
nc -l -p 9999
(2)在idea中执行WindowTest.java,注意注释部分的解释
(3)在nc窗口输入字符串,观察idea中的控制台,会有统计结果打印。
以上就是Flink窗口的测试
网友评论