美文网首页数客联盟
快速构建Flink Streaming SQL测试用例

快速构建Flink Streaming SQL测试用例

作者: Woople | 来源:发表于2019-12-30 18:43 被阅读0次

    根据官网Structure of Table API and SQL Programs的样例介绍,如果要测试一个streaming SQL,需要注册一个table source,然后执行一个SQL,最后在注册一个table sink并将结果插入。table source和table sink有很多种,本文将介绍基于内存的table source和sink方便进行测试sql。代码传送门

    测试用例以及Table source

    public class StreamingBaseSqlDemo {
        public static void main(String[] args) throws Exception {
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
            env.setParallelism(2);
    
            DataStream<Tuple3<Long, String, Integer>> ds = env.addSource(new DataSource());
            tableEnv.registerDataStream("Orders", ds, "user, product, amount");
            Table result = tableEnv.sqlQuery("SELECT user, product, amount FROM Orders");
    
            String[] fieldNames = {"user", "product", "amount"};
            TypeInformation[] fieldTypes = {Types.LONG, Types.STRING, Types.INT};
    
            TableSink sink = new MemoryAppendStreamTableSink(fieldNames, fieldTypes);
            tableEnv.registerTableSink("output", sink);
            result.insertInto("output");
    
            env.execute();
        }
    
        private static class DataSource extends RichParallelSourceFunction<Tuple3<Long, String, Integer>> {
            private volatile boolean running = true;
    
            @Override
            public void run(SourceContext<Tuple3<Long, String, Integer>> ctx) throws Exception {
                String[] products = new String[]{"iPhoneX", "iPhone11", "iPhone11 Pro Max"};
    
                final long numElements = 20;
                int i = 0;
                while (running && i < numElements) {
                    Thread.sleep(RandomUtils.nextLong(1, 5) * 1000L);
                    Tuple3 data = new Tuple3<Long, String, Integer>(RandomUtils.nextLong(1, 100), products[RandomUtils.nextInt(0, 3)],RandomUtils.nextInt(10000, 20000));
                    ctx.collect(data);
                    System.out.println("sand data:" + data);
                    i++;
                }
            }
    
            @Override
            public void cancel() {
                running = false;
            }
        }
    }
    

    Table sink

    public class MemoryAppendStreamTableSink implements AppendStreamTableSink<Row> {
        private String[] fieldNames;
        private TypeInformation<?>[] fieldTypes;
    
        public MemoryAppendStreamTableSink(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
            this.fieldNames = fieldNames;
            this.fieldTypes = fieldTypes;
        }
    
        @Override
        public void emitDataStream(DataStream<Row> dataStream) {
            consumeDataStream(dataStream);
        }
    
        @Override
        public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
            return dataStream.addSink(new DataSink()).setParallelism(dataStream.getParallelism());
        }
    
        @Override
        public TypeInformation<Row> getOutputType() {
            return new RowTypeInfo(getFieldTypes(), getFieldNames());
        }
    
        @Override
        public String[] getFieldNames() {
            return fieldNames;
        }
    
        @Override
        public TypeInformation<?>[] getFieldTypes() {
            return fieldTypes;
        }
    
        @Override
        public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
            this.fieldNames = fieldNames;
            this.fieldTypes = fieldTypes;
            return this;
        }
    
        private class DataSink extends RichSinkFunction<Row> {
            public DataSink() {
            }
    
            @Override
            public void invoke(Row value, Context context) throws Exception {
                System.out.println("Result:" + value);
            }
        }
    }
    

    运行结果

    例如:

    sand data:(5,iPhone11,10598)
    Result:41,iPhone11,13471
    sand data:(41,iPhone11,13471)
    Result:31,iPhone11 Pro Max,19726
    sand data:(31,iPhone11 Pro Max,19726)
    Result:16,iPhoneX,12939
    sand data:(16,iPhoneX,12939)
    Result:83,iPhone11,18861
    

    相关文章

      网友评论

        本文标题:快速构建Flink Streaming SQL测试用例

        本文链接:https://www.haomeiwen.com/subject/llwmoctx.html