美文网首页
05 Flink窗口

05 Flink窗口

作者: 张力的程序园 | 来源:发表于2020-06-30 00:19 被阅读0次

    Flink窗口按行为分有滑动滚动窗口,按划分标准有事件时间窗口。本节将演示各个窗口的使用。

    1、 系统、软件以及前提约束

    2、操作步骤

    • 创建maven项目,加入以下依赖:
            <dependency>
                <!--spark依赖-->
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.11</artifactId>
                <version>2.2.0</version>
            </dependency>
            <!--scala依赖-->
            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-library</artifactId>
                <version>2.11.8</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-core</artifactId>
                <version>1.5.1</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
                <version>1.5.1</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_2.11</artifactId>
                <version>1.5.1</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-clients_2.11</artifactId>
                <version>1.5.1</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
                <version>1.5.0</version>
            </dependency>
    
    • 在项目的src/main/java文件夹下创建WindowTest.java
    package flink;
    
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.common.functions.ReduceFunction;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
    import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.util.Collector;
    import org.apache.storm.trident.windowing.config.SlidingCountWindow;
    
    public class WindowTest {
        public static void main(String[] args) throws Exception {
            // local模式
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
            DataStream<String> text = env.socketTextStream("localhost", 9999, "\n");
            @SuppressWarnings("serial")
            DataStream<WordWithCount> windowCounts = text
                    .flatMap(new FlatMapFunction<String, WordWithCount>() {
                        public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
                            for (String word : value.split("\\s")) {
                                out.collect(new WordWithCount(word, 1L));
                            }
                        }
                    })
                    //滑动窗口按数据个数划分
    //                .keyBy("word").countWindow(10,5)
                    //滚动窗口按数据个数划分
    //                .keyBy("word").countWindow(5)
                    //滚动窗口按时间划分
    //                .keyBy("word").timeWindow(Time.seconds(5))
                    //滑动窗口按时间划分
                    .keyBy("word").timeWindow(Time.seconds(10), Time.seconds(5))
                    .reduce(new ReduceFunction<WordWithCount>() {
                        public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception {
                            return new WordWithCount(a.word, a.count + b.count);
                        }
                    });
            windowCounts.print().setParallelism(1);
            env.execute("Socket Window WordCount(zl_test)");
        }
    
        public static class WordWithCount {
            public String word;
            public long count;
    
            public WordWithCount() {
            }
    
            public WordWithCount(String word, long count) {
                this.word = word;
                this.count = count;
            }
    
            @Override
            public String toString() {
                return word + " : " + count;
            }
        }
    }
    
    • 测试
      (1)打开windows命令行,执行以下命令:
    nc -l -p 9999
    

    (2)在idea中执行WindowTest.java,注意注释部分的解释
    (3)在nc窗口输入字符串,观察idea中的控制台,会有统计结果打印。
    以上就是Flink窗口的测试

    相关文章

      网友评论

          本文标题:05 Flink窗口

          本文链接:https://www.haomeiwen.com/subject/jsrzfktx.html