美文网首页
Flink-sql 计数窗口

Flink-sql 计数窗口

作者: wudl | 来源:发表于2021-08-10 23:33 被阅读0次

1. Flink 的计数窗口有两种

1.1 计数混动窗口

package com.wudl.flink.sql;

import com.wudl.flink.bean.WaterSensor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Session;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Tumble;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import static org.apache.flink.table.api.Expressions.*;

/**
 * @ClassName : Flink_Group_Window  --  基于计数滚动窗口
 * @Description : Flink sql 窗口
 * @Author :wudl
 * @Date: 2021-08-04 23:13
 */

public class Flink_Group_Count_Window {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
        DataStreamSource<String> streamSource = env.socketTextStream("192.168.1.180", 9999);
        SingleOutputStreamOperator<WaterSensor> waterDS = streamSource.map(new MapFunction<String, WaterSensor>() {
            @Override
            public WaterSensor map(String s) throws Exception {
                String[] split = s.split(",");
                return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
            }
        });

        // 将流转化为表
        Table table = tableEnvironment.fromDataStream(waterDS,
                $("id"),
                $("ts"),
                $("vc"),
                $("pt").proctime());

        // 开窗滚动窗口计算wordCound
        Table result = table.window(Tumble.over(rowInterval(5L)).on($("pt")).as("cw"))
                .groupBy($("id"), $("cw"))
                .select($("id"), $("id").count());

        // 将结果表转化为流进行输出

        tableEnvironment.toAppendStream(result, Row.class).print();
        env.execute();
    }
}

1.2 计数窗口的滑动

package com.wudl.flink.sql;

import com.wudl.flink.bean.WaterSensor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Slide;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Tumble;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.rowInterval;

/**
 * @ClassName : Flink_Group_Window  --  基于计数滑动窗口
 * @Description : Flink sql 窗口
 * @Author :wudl
 * @Date: 2021-08-04 23:13
 */

public class Flink_Group_Count_Sliding_Window {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
        DataStreamSource<String> streamSource = env.socketTextStream("192.168.1.180", 9999);
        SingleOutputStreamOperator<WaterSensor> waterDS = streamSource.map(new MapFunction<String, WaterSensor>() {
            @Override
            public WaterSensor map(String s) throws Exception {
                String[] split = s.split(",");
                return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
            }
        });

        // 将流转化为表
        Table table = tableEnvironment.fromDataStream(waterDS,
                $("id"),
                $("ts"),
                $("vc"),
                $("pt").proctime());

        // 开窗滚动窗口计算wordCound
        Table result = table.window(Slide.over(rowInterval(5L)).every(rowInterval(2L)).on($("pt")).as("cw"))
                .groupBy($("id"), $("cw"))
                .select($("id"), $("id").count());

        // 将结果表转化为流进行输出

        tableEnvironment.toAppendStream(result, Row.class).print();
        env.execute();
    }
}

相关文章

  • Flink-sql 计数窗口

    1. Flink 的计数窗口有两种 1.1 计数混动窗口 1.2 计数窗口的滑动

  • Flink-sql 基于事件时间的窗口

    1. 基于事件时间的窗口有三种 1.1 基于事件时间的滚动窗口 1.2 基于事件时间的滑动窗口 1.3 基于事件时...

  • 180701-计数时间窗口数据结构的设计

    相关博文: 180625-关于时间窗口的想法 180621-一个简单的时间窗口设计与实现 维持计数时间窗口数据结构...

  • 限流算法

    计数器 滑动窗口 漏桶 令牌桶 计数器 计数器是一种最简单限流算法,其原理就是:在一段时间间隔内,对请求进行计数,...

  • 服务限流之滑动窗口计数

    上一篇 <<<服务限流之计数器方式[https://www.jianshu.com/p/5ad5f76739a2]...

  • 180621-一个简单的时间窗口设计与实现

    如何设计一个计数的时间窗口 时间窗口,通常对于一些实时信息展示中用得比较多,比如维持一个五分钟的交易明细时间窗口,...

  • 你知道常见的限流算法有哪些吗?

    我们常见的限流算法有四种:计数器(固定窗口)算法、滑动窗口算法、漏桶算法、令牌桶算法。 为什么要限流 资源是有限的...

  • 常见限流算法及方案

    1.限流 1.1 限流的基本概念 1.1.1 计数器 1.1.2 滑动窗口 1.1.3 漏桶 1.1.4 令牌桶 ...

  • Flink-SQL如何连接外部资源

    Flink-SQL如何连接外部资源 最近项目中需要把FlinkSQL对标SparkSQL做一套可视化页面,但网上针...

  • 2019-05-01 STM32_WWDG

    WWDG: 中文名:窗口看门狗 是一个7位递减计数器,不断向下递减计数,当减到一个固定值0x40时还不喂狗,就会产...

网友评论

      本文标题:Flink-sql 计数窗口

      本文链接:https://www.haomeiwen.com/subject/wckqbltx.html