美文网首页数客联盟flink
FlinkCEP SQL快速入门

FlinkCEP SQL快速入门

作者: Woople | 来源:发表于2019-10-24 19:11 被阅读0次

在之前的文章Hello FlinkCEPFlinkCEP with EventTime介绍了FlinkCEP的基本使用方法,本文将介绍flink提供的sql方式实现模式匹配,即Detecting Patterns in Tables.

完整样例

代码传送门

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.formats.csv.CsvRowDeserializationSchema;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import java.util.Properties;

public class FlinkCEPSqlExample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);
        final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        final TableSchema tableSchema = new TableSchema(new String[]{"symbol","tax","price", "rowtime"}, new TypeInformation[]{Types.STRING, Types.STRING, Types.LONG, Types.SQL_TIMESTAMP});
        final TypeInformation<Row> typeInfo = tableSchema.toRowType();
        final CsvRowDeserializationSchema.Builder deserSchemaBuilder = new CsvRowDeserializationSchema.Builder(typeInfo).setFieldDelimiter(',');

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "host-10-1-236-139:6667");

        FlinkKafkaConsumer010<Row> myConsumer = new FlinkKafkaConsumer010<>(
                "foo",
                deserSchemaBuilder.build(),
                properties);

        myConsumer.setStartFromLatest();

        DataStream<Row> stream = env.addSource(myConsumer).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator());
        tableEnv.registerDataStream("Ticker", stream, "symbol,tax,price,rowtime.rowtime");

        Table result = tableEnv.sqlQuery("SELECT * " +
                "FROM Ticker " +
                "    MATCH_RECOGNIZE( " +
                "        PARTITION BY symbol " +
                "        ORDER BY rowtime " +
                "        MEASURES " +
                "            A.price AS firstPrice, " +
                "            B.price AS lastPrice " +
                "        ONE ROW PER MATCH " +
                "        AFTER MATCH SKIP PAST LAST ROW " +
                "        PATTERN (A+ B) " +
                "        DEFINE " +
                "            A AS A.price < 10, " +
                "            B AS B.price > 100 " +
                "    )");
        
        final TableSchema tableSchemaResult = new TableSchema(new String[]{"symbol","firstPrice","lastPrice"}, new TypeInformation[]{Types.STRING, Types.LONG, Types.LONG});
        final TypeInformation<Row> typeInfoResult = tableSchemaResult.toRowType();
        DataStream ds = tableEnv.toAppendStream(result, typeInfoResult);
        ds.print();
        env.execute("Flink CEP via SQL example");
    }

    private static class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<Row> {
        private final long maxOutOfOrderness = 5000;
        private long currentMaxTimestamp;

        @Override
        public long extractTimestamp(Row row, long previousElementTimestamp) {
            System.out.println("Row is " + row);
            long timestamp = StringUtilsPlus.dateToStamp(String.valueOf(row.getField(3)));
            currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
            System.out.println("watermark:" + StringUtilsPlus.stampToDate(String.valueOf(currentMaxTimestamp - maxOutOfOrderness)));
            return timestamp;
        }

        @Override
        public Watermark getCurrentWatermark() {
            return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
        }
    }
}

样例说明

  1. PARTITION BY symbol根据symbol字段进行逻辑分区
  2. ORDER BY rowtime根据事件时间进行排序,避免时序混乱
  3. MEASURES里面定义了要输出的字段
  4. ONE ROW PER MATCH为输出模式,另一种模式为ALL ROWS PER MATCH,但是目前flink1.9中只支持ONE ROW PER MATCH
  5. AFTER MATCH SKIP PAST LAST ROW匹配后的事件丢弃策略,这种策略保证每个事件最多匹配一次
  • SKIP PAST LAST ROW - resumes the pattern matching at the next row after the last row of the current match.
  • SKIP TO NEXT ROW - continues searching for a new match starting at the next row after the starting row of the match.
  • SKIP TO LAST variable - resumes the pattern matching at the last row that is mapped to the specified pattern variable.
  • SKIP TO FIRST variable - resumes the pattern matching at the first row that is mapped to the specified pattern variable.
  1. PATTERN (A+ B)定义了两个模式的关系,如果是(A B)表示两个事件必须第一个事件满足A的条件,第二个事件满足B的条件。(A+ B)的含义就是在满足B的条件之前,可以有1个或多个事件满足A的条件。具体规则参见Defining a Pattern

  2. DEFINE中定义了具体的每个模式的规则

  3. 虽然是以sql的形式运行,但是最终内部的运行逻辑和FlinkCEP with EventTime中介绍的是一致的,只是flink对SQL进行了解析

总结

本文提供了一个简单的FlinkCEP SQL样例,可以让读者快速体验FlinkCEP SQL的运行效果,本例的业务含义可以理解为,连续小额交易后突然有一笔大额交易即输出结果。

相关文章

网友评论

    本文标题:FlinkCEP SQL快速入门

    本文链接:https://www.haomeiwen.com/subject/fpudvctx.html