美文网首页
Flink-2.Flink Table API

Flink-2.Flink Table API

作者: 笨鸡 | 来源:发表于2022-03-08 14:03 被阅读0次

Flink 1.14.3

package com.ctgu.flink.sql;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import static org.apache.flink.table.api.Expressions.*;


public class Flink_Table_Window {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        String createSql = "CREATE TABLE windowTable " +
                "    (" +
                "    `id` STRING," +
                "    `timestamp` BIGINT," +
                "    `value` DOUBLE," +
                "    `time_ltz` AS TO_TIMESTAMP_LTZ(`timestamp`, 3)," +
                "    `pt` AS PROCTIME()," +
                "    WATERMARK FOR time_ltz AS time_ltz - INTERVAL '5' SECOND" +
                "    )" +
                "    WITH (" +
                "       'connector'='filesystem'," +
                "       'format'='csv'," +
                "       'csv.field-delimiter'=' '," +
                "       'path'='data/dataInfo.txt'" +
                "    )";

        tableEnv.executeSql(createSql);
        Table table = tableEnv.from("windowTable");

        table.printSchema();

        Table tumble = table.window(Tumble.over(lit(4).seconds()).on($("time_ltz")).as("tw"))
                .groupBy($("tw"), $("id"))
                .select($("id"), $("value").count(), $("tw").end().toTime());

        Table slide = table.window(Slide.over(lit(20).seconds()).every(lit(4).seconds()).on($("time_ltz")).as("tw"))
                .groupBy($("tw"), $("id"))
                .select($("id"), $("value").count(), $("tw").end().toTime());


        Table result = table.select($("id"), $("value"), $("time_ltz").toTime());

        tableEnv.toDataStream(result, Row.class).print("result");
        tableEnv.toDataStream(tumble, Row.class).print("tumble");
        tableEnv.toDataStream(slide, Row.class).print("slide");

        Table rank = table.window(
                Over.partitionBy($("id"))
//                        .orderBy($("pt")).preceding(rowInterval(10L)).as("ow"))
                        .orderBy($("time_ltz")).preceding(lit(4).seconds()).as("ow"))
                .select($("id"),
                        $("value"),
                        $("value").max().over($("ow")).as("value_max"),
                        $("time_ltz"));

        rank.printSchema();

        tableEnv.toChangelogStream(rank, Schema.newBuilder()
                .column("id", "STRING")
                .column("score", "DOUBLE")
                .column("value_max", "DOUBLE")
                .column("time_ltz", "TIMESTAMP_LTZ(3)")
                .build()).print("rank");

        result.printSchema();

        env.execute("Table SQL");
    }

}


maven 配置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.ctgu</groupId>
    <artifactId>flink_class</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <flink-version>1.14.3</flink-version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>${flink-version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>${flink-version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.12</artifactId>
            <version>${flink-version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.12</artifactId>
            <version>${flink-version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>${flink-version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-statebackend-rocksdb_2.12</artifactId>
            <version>${flink-version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep_2.12</artifactId>
            <version>${flink-version}</version>
        </dependency>

        <dependency>
            <groupId>com.clickhouse</groupId>
            <artifactId>clickhouse-jdbc</artifactId>
            <version>0.3.2-test3</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.6</version>
        </dependency>
    </dependencies>
</project>

相关文章

网友评论

      本文标题:Flink-2.Flink Table API

      本文链接:https://www.haomeiwen.com/subject/tysarrtx.html