美文网首页Flink
Flink-sql 基于事件时间的窗口

Flink-sql 基于事件时间的窗口

作者: wudl | 来源:发表于2021-08-11 11:17 被阅读0次

1. 基于事件时间的窗口有三种

1.1 基于事件时间的滚动窗口

package com.wudl.flink.sql;

import com.wudl.flink.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Slide;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Tumble;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import java.time.Duration;

import static org.apache.flink.api.common.eventtime.WatermarkStrategy.forBoundedOutOfOrderness;
import static org.apache.flink.table.api.Expressions.*;

/**
 * @ClassName : Flink_Group_Window  --  基于事件 的处理滚动窗口
 * @Description : Flink sql 窗口
 * @Author :wudl
 * @Date: 2021-08-04 23:13
 */

public class Flink_Group_ShiJianWindow {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
        // 读取数据流中的数据并且提取时间搓生成waterMark
        WatermarkStrategy<WaterSensor> waterSensorWatermarkStrategy =   WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {

                    @Override
                    public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                        return element.getTs() * 1000L;
                    }
                });
        DataStreamSource<String> streamSource = env.socketTextStream("192.168.1.180", 9999);
        SingleOutputStreamOperator<WaterSensor> waterDS = streamSource.map(new MapFunction<String, WaterSensor>() {
            @Override
            public WaterSensor map(String s) throws Exception {
                String[] split = s.split(",");
                return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
            }
        }).assignTimestampsAndWatermarks(waterSensorWatermarkStrategy);

        // 将流转化为表
        Table table = tableEnvironment.fromDataStream(waterDS,
                $("id"),
                $("ts"),
                $("vc"),
                //  事件的处理时间
                $("rt").rowtime());

        // 开窗滚动窗口计算wordCound
        Table result = table.window(Tumble.over(lit(5).seconds()).on($("rt")).as("tw"))
                .groupBy($("id"), $("tw"))
                .select($("id"), $("id").count());

        // 将结果表转化为流进行输出

        tableEnvironment.toAppendStream(result, Row.class).print();
        env.execute();
    }
}

1.2 基于事件时间的滑动窗口

package com.wudl.flink.sql;

import com.wudl.flink.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Slide;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Tumble;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import java.time.Duration;

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.lit;

/**
 * @ClassName : Flink_Group_Window  --  基于事件时间的滑动窗口
 * @Description : Flink sql 窗口
 * @Author :wudl
 * @Date: 2021-08-04 23:13
 */

public class Flink_Group_ShiJian_HuaDongWindow {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
        // 读取数据流中的数据并且提取时间搓生成waterMark
        WatermarkStrategy<WaterSensor> waterSensorWatermarkStrategy =   WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {

                    @Override
                    public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                        return element.getTs() * 1000L;
                    }
                });
        DataStreamSource<String> streamSource = env.socketTextStream("192.168.1.180", 9999);
        SingleOutputStreamOperator<WaterSensor> waterDS = streamSource.map(new MapFunction<String, WaterSensor>() {
            @Override
            public WaterSensor map(String s) throws Exception {
                String[] split = s.split(",");
                return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
            }
        }).assignTimestampsAndWatermarks(waterSensorWatermarkStrategy);

        // 将流转化为表
        Table table = tableEnvironment.fromDataStream(waterDS,
                $("id"),
                $("ts"),
                $("vc"),
                //  事件的处理时间
                $("rt").rowtime());

        // 开窗滚动窗口计算wordCound
        Table result = table.window(Slide.over(lit(6).seconds()).every(lit(2).seconds()).on($("rt")).as($("sw")))
                .groupBy($("id"), $("sw"))
                .select($("id"), $("id").count());

        // 将结果表转化为流进行输出

        tableEnvironment.toAppendStream(result, Row.class).print();
        env.execute();
    }
}

1.3 基于事件时间的会话窗口

package com.wudl.flink.sql;

import com.wudl.flink.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Session;
import org.apache.flink.table.api.Slide;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import java.time.Duration;

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.lit;

/**
 * @ClassName : Flink_Group_Window  --  基于事件时间的滑动窗口
 * @Description : Flink sql 窗口
 * @Author :wudl
 * @Date: 2021-08-04 23:13
 */

public class Flink_Group_ShiJian_huihuaWindow {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
        // 读取数据流中的数据并且提取时间搓生成waterMark
        WatermarkStrategy<WaterSensor> waterSensorWatermarkStrategy =   WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {

                    @Override
                    public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                        return element.getTs() * 1000L;
                    }
                });
        DataStreamSource<String> streamSource = env.socketTextStream("192.168.1.180", 9999);
        SingleOutputStreamOperator<WaterSensor> waterDS = streamSource.map(new MapFunction<String, WaterSensor>() {
            @Override
            public WaterSensor map(String s) throws Exception {
                String[] split = s.split(",");
                return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
            }
        }).assignTimestampsAndWatermarks(waterSensorWatermarkStrategy);

        // 将流转化为表
        Table table = tableEnvironment.fromDataStream(waterDS,
                $("id"),
                $("ts"),
                $("vc"),
                //  事件的处理时间
                $("rt").rowtime());

        // 开窗滚动窗口计算wordCound
        Table result = table.window(Session.withGap(lit(5).seconds()).on($("rt")).as("sw"))
                .groupBy($("id"), $("sw"))
                .select($("id"), $("id").count());

        // 将结果表转化为流进行输出

        tableEnvironment.toAppendStream(result, Row.class).print();
        env.execute();
    }
}

相关文章

  • Flink-sql 基于事件时间的窗口

    1. 基于事件时间的窗口有三种 1.1 基于事件时间的滚动窗口 1.2 基于事件时间的滑动窗口 1.3 基于事件时...

  • Flink_四大基石windows && Time

    Flink 流式计算引擎:四大基石Window窗口和Time时间基于时间窗口计算,尤其是事件时间窗口,其中滚动Tu...

  • Flink-sql 计数窗口

    1. Flink 的计数窗口有两种 1.1 计数混动窗口 1.2 计数窗口的滑动

  • Flink 概述

    Flink 相关概念 Flink 支持哪些流式特性 exactly-once 语义。 支持基于事件发生时间的窗口计...

  • Flink -sql 处理时间的窗口

    1.flink 窗口的分类 1.1 分类 2. 先看基于处理时间的窗口 2.1 处理时间的滚动窗口 2.1.1 ...

  • Flink Join实现

    Window Join 基于窗口的Join是将具有相同key并位于同一个窗口中的事件进行联结。 用法: 官方案例:...

  • 05 Flink窗口

    Flink窗口按行为分有滑动滚动窗口,按划分标准有事件时间窗口。本节将演示各个窗口的使用。 1、 系统、软件以及前...

  • 窗口事件

    窗口事件的应用 functio...

  • 在 Vue 中使用lodash对事件进行防抖和节流

    有些浏览器事件可以在短时间内快速触发多次,比如调整窗口大小或向下滚动页面。例如,监听页面窗口滚动事件,并且用户持续...

  • WNDCLASS 结构

    WNDCLASS 结构 Windows 的窗口总是基于窗口类来创建的,窗口类同时确定了处理窗口消息的窗口过程(回调...

网友评论

    本文标题:Flink-sql 基于事件时间的窗口

    本文链接:https://www.haomeiwen.com/subject/jekqbltx.html