美文网首页数客联盟flink
FlinkCEP SQL快速入门

FlinkCEP SQL快速入门

作者: Woople | 来源:发表于2019-10-24 19:11 被阅读0次

    在之前的文章Hello FlinkCEPFlinkCEP with EventTime介绍了FlinkCEP的基本使用方法,本文将介绍flink提供的sql方式实现模式匹配,即Detecting Patterns in Tables.

    完整样例

    代码传送门

    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.api.common.typeinfo.Types;
    import org.apache.flink.formats.csv.CsvRowDeserializationSchema;
    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
    import org.apache.flink.streaming.api.watermark.Watermark;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.TableSchema;
    import org.apache.flink.table.api.java.StreamTableEnvironment;
    import org.apache.flink.types.Row;
    import java.util.Properties;
    
    public class FlinkCEPSqlExample {
        public static void main(String[] args) throws Exception {
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            env.setParallelism(1);
            final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
            final TableSchema tableSchema = new TableSchema(new String[]{"symbol","tax","price", "rowtime"}, new TypeInformation[]{Types.STRING, Types.STRING, Types.LONG, Types.SQL_TIMESTAMP});
            final TypeInformation<Row> typeInfo = tableSchema.toRowType();
            final CsvRowDeserializationSchema.Builder deserSchemaBuilder = new CsvRowDeserializationSchema.Builder(typeInfo).setFieldDelimiter(',');
    
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", "host-10-1-236-139:6667");
    
            FlinkKafkaConsumer010<Row> myConsumer = new FlinkKafkaConsumer010<>(
                    "foo",
                    deserSchemaBuilder.build(),
                    properties);
    
            myConsumer.setStartFromLatest();
    
            DataStream<Row> stream = env.addSource(myConsumer).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator());
            tableEnv.registerDataStream("Ticker", stream, "symbol,tax,price,rowtime.rowtime");
    
            Table result = tableEnv.sqlQuery("SELECT * " +
                    "FROM Ticker " +
                    "    MATCH_RECOGNIZE( " +
                    "        PARTITION BY symbol " +
                    "        ORDER BY rowtime " +
                    "        MEASURES " +
                    "            A.price AS firstPrice, " +
                    "            B.price AS lastPrice " +
                    "        ONE ROW PER MATCH " +
                    "        AFTER MATCH SKIP PAST LAST ROW " +
                    "        PATTERN (A+ B) " +
                    "        DEFINE " +
                    "            A AS A.price < 10, " +
                    "            B AS B.price > 100 " +
                    "    )");
            
            final TableSchema tableSchemaResult = new TableSchema(new String[]{"symbol","firstPrice","lastPrice"}, new TypeInformation[]{Types.STRING, Types.LONG, Types.LONG});
            final TypeInformation<Row> typeInfoResult = tableSchemaResult.toRowType();
            DataStream ds = tableEnv.toAppendStream(result, typeInfoResult);
            ds.print();
            env.execute("Flink CEP via SQL example");
        }
    
        private static class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<Row> {
            private final long maxOutOfOrderness = 5000;
            private long currentMaxTimestamp;
    
            @Override
            public long extractTimestamp(Row row, long previousElementTimestamp) {
                System.out.println("Row is " + row);
                long timestamp = StringUtilsPlus.dateToStamp(String.valueOf(row.getField(3)));
                currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
                System.out.println("watermark:" + StringUtilsPlus.stampToDate(String.valueOf(currentMaxTimestamp - maxOutOfOrderness)));
                return timestamp;
            }
    
            @Override
            public Watermark getCurrentWatermark() {
                return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
            }
        }
    }
    

    样例说明

    1. PARTITION BY symbol根据symbol字段进行逻辑分区
    2. ORDER BY rowtime根据事件时间进行排序,避免时序混乱
    3. MEASURES里面定义了要输出的字段
    4. ONE ROW PER MATCH为输出模式,另一种模式为ALL ROWS PER MATCH,但是目前flink1.9中只支持ONE ROW PER MATCH
    5. AFTER MATCH SKIP PAST LAST ROW匹配后的事件丢弃策略,这种策略保证每个事件最多匹配一次
    • SKIP PAST LAST ROW - resumes the pattern matching at the next row after the last row of the current match.
    • SKIP TO NEXT ROW - continues searching for a new match starting at the next row after the starting row of the match.
    • SKIP TO LAST variable - resumes the pattern matching at the last row that is mapped to the specified pattern variable.
    • SKIP TO FIRST variable - resumes the pattern matching at the first row that is mapped to the specified pattern variable.
    1. PATTERN (A+ B)定义了两个模式的关系,如果是(A B)表示两个事件必须第一个事件满足A的条件,第二个事件满足B的条件。(A+ B)的含义就是在满足B的条件之前,可以有1个或多个事件满足A的条件。具体规则参见Defining a Pattern

    2. DEFINE中定义了具体的每个模式的规则

    3. 虽然是以sql的形式运行,但是最终内部的运行逻辑和FlinkCEP with EventTime中介绍的是一致的,只是flink对SQL进行了解析

    总结

    本文提供了一个简单的FlinkCEP SQL样例,可以让读者快速体验FlinkCEP SQL的运行效果,本例的业务含义可以理解为,连续小额交易后突然有一笔大额交易即输出结果。

    相关文章

      网友评论

        本文标题:FlinkCEP SQL快速入门

        本文链接:https://www.haomeiwen.com/subject/fpudvctx.html