根据官网Structure of Table API and SQL Programs的样例介绍,如果要测试一个streaming SQL,需要注册一个table source,然后执行一个SQL,最后在注册一个table sink并将结果插入。table source和table sink有很多种,本文将介绍基于内存的table source和sink方便进行测试sql。代码传送门
测试用例以及Table source
public class StreamingBaseSqlDemo {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
env.setParallelism(2);
DataStream<Tuple3<Long, String, Integer>> ds = env.addSource(new DataSource());
tableEnv.registerDataStream("Orders", ds, "user, product, amount");
Table result = tableEnv.sqlQuery("SELECT user, product, amount FROM Orders");
String[] fieldNames = {"user", "product", "amount"};
TypeInformation[] fieldTypes = {Types.LONG, Types.STRING, Types.INT};
TableSink sink = new MemoryAppendStreamTableSink(fieldNames, fieldTypes);
tableEnv.registerTableSink("output", sink);
result.insertInto("output");
env.execute();
}
private static class DataSource extends RichParallelSourceFunction<Tuple3<Long, String, Integer>> {
private volatile boolean running = true;
@Override
public void run(SourceContext<Tuple3<Long, String, Integer>> ctx) throws Exception {
String[] products = new String[]{"iPhoneX", "iPhone11", "iPhone11 Pro Max"};
final long numElements = 20;
int i = 0;
while (running && i < numElements) {
Thread.sleep(RandomUtils.nextLong(1, 5) * 1000L);
Tuple3 data = new Tuple3<Long, String, Integer>(RandomUtils.nextLong(1, 100), products[RandomUtils.nextInt(0, 3)],RandomUtils.nextInt(10000, 20000));
ctx.collect(data);
System.out.println("sand data:" + data);
i++;
}
}
@Override
public void cancel() {
running = false;
}
}
}
Table sink
public class MemoryAppendStreamTableSink implements AppendStreamTableSink<Row> {
private String[] fieldNames;
private TypeInformation<?>[] fieldTypes;
public MemoryAppendStreamTableSink(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
this.fieldNames = fieldNames;
this.fieldTypes = fieldTypes;
}
@Override
public void emitDataStream(DataStream<Row> dataStream) {
consumeDataStream(dataStream);
}
@Override
public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
return dataStream.addSink(new DataSink()).setParallelism(dataStream.getParallelism());
}
@Override
public TypeInformation<Row> getOutputType() {
return new RowTypeInfo(getFieldTypes(), getFieldNames());
}
@Override
public String[] getFieldNames() {
return fieldNames;
}
@Override
public TypeInformation<?>[] getFieldTypes() {
return fieldTypes;
}
@Override
public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
this.fieldNames = fieldNames;
this.fieldTypes = fieldTypes;
return this;
}
private class DataSink extends RichSinkFunction<Row> {
public DataSink() {
}
@Override
public void invoke(Row value, Context context) throws Exception {
System.out.println("Result:" + value);
}
}
}
运行结果
例如:
sand data:(5,iPhone11,10598)
Result:41,iPhone11,13471
sand data:(41,iPhone11,13471)
Result:31,iPhone11 Pro Max,19726
sand data:(31,iPhone11 Pro Max,19726)
Result:16,iPhoneX,12939
sand data:(16,iPhoneX,12939)
Result:83,iPhone11,18861
网友评论