美文网首页
05 Flink窗口

05 Flink窗口

作者: 张力的程序园 | 来源:发表于2020-06-30 00:19 被阅读0次

Flink窗口按行为分有滑动滚动窗口,按划分标准有事件时间窗口。本节将演示各个窗口的使用。

1、 系统、软件以及前提约束

2、操作步骤

  • 创建maven项目,加入以下依赖:
        <dependency>
            <!--spark依赖-->
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>
        <!--scala依赖-->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.8</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>1.5.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.5.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.5.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.5.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
            <version>1.5.0</version>
        </dependency>
  • 在项目的src/main/java文件夹下创建WindowTest.java
package flink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.apache.storm.trident.windowing.config.SlidingCountWindow;

public class WindowTest {
    public static void main(String[] args) throws Exception {
        // local模式
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        DataStream<String> text = env.socketTextStream("localhost", 9999, "\n");
        @SuppressWarnings("serial")
        DataStream<WordWithCount> windowCounts = text
                .flatMap(new FlatMapFunction<String, WordWithCount>() {
                    public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
                        for (String word : value.split("\\s")) {
                            out.collect(new WordWithCount(word, 1L));
                        }
                    }
                })
                //滑动窗口按数据个数划分
//                .keyBy("word").countWindow(10,5)
                //滚动窗口按数据个数划分
//                .keyBy("word").countWindow(5)
                //滚动窗口按时间划分
//                .keyBy("word").timeWindow(Time.seconds(5))
                //滑动窗口按时间划分
                .keyBy("word").timeWindow(Time.seconds(10), Time.seconds(5))
                .reduce(new ReduceFunction<WordWithCount>() {
                    public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception {
                        return new WordWithCount(a.word, a.count + b.count);
                    }
                });
        windowCounts.print().setParallelism(1);
        env.execute("Socket Window WordCount(zl_test)");
    }

    public static class WordWithCount {
        public String word;
        public long count;

        public WordWithCount() {
        }

        public WordWithCount(String word, long count) {
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {
            return word + " : " + count;
        }
    }
}
  • 测试
    (1)打开windows命令行,执行以下命令:
nc -l -p 9999

(2)在idea中执行WindowTest.java,注意注释部分的解释
(3)在nc窗口输入字符串,观察idea中的控制台,会有统计结果打印。
以上就是Flink窗口的测试

相关文章

  • 05 Flink窗口

    Flink窗口按行为分有滑动滚动窗口,按划分标准有事件时间窗口。本节将演示各个窗口的使用。 1、 系统、软件以及前...

  • Flink 的窗口

    Flink 的窗口分类: 1.Flink 的窗口分类如下图: 2. 窗口的详细介绍:

  • Flink 窗口

    window的使用场景 聚合统计、数据合并(积攒批)、双流join Window Assigner Window ...

  • Flink滑动窗口原理与细粒度滑动窗口的性能问题

    Flink的窗口机制是其底层核心之一,也是高效流处理的关键。Flink窗口分配的基类是WindowAssigner...

  • Flink实战(七) - Time & Windows编

    0 相关源码 掌握Flink中三种常用的Time处理方式,掌握Flink中滚动窗口以及滑动窗口的使用,了解Flin...

  • Flink-sql 计数窗口

    1. Flink 的计数窗口有两种 1.1 计数混动窗口 1.2 计数窗口的滑动

  • 六、Flink窗口

    概述 Apache Flink 是一个为生产环境而生的流处理器,具有易于使用的 API,可以用于定义高级流分析程序...

  • flink 时间窗口

    flink强大的窗口功能,是相较于其他流计算引擎比较有优势的地方。flink中窗口是如何设计的?一共有四个要素。1...

  • flink异步io应用场景之流表join维表

    1.flink异步io的定义参考 http://wuchong.me/blog/2017/05/17/flink-...

  • Flink -sql 处理时间的窗口

    1.flink 窗口的分类 1.1 分类 2. 先看基于处理时间的窗口 2.1 处理时间的滚动窗口 2.1.1 ...

网友评论

      本文标题:05 Flink窗口

      本文链接:https://www.haomeiwen.com/subject/jsrzfktx.html