本篇将演示如何使用 Flink SQL 实现上一篇demo5
的功能,上一篇传送门 Apache Flink 学习笔记(三)
Flink SQl
是无限接近关系型数据库sql语句的抽象模块,SQL
和Table API
查询可以无缝混合,SQL
查询是使用sqlQuery()
方法指定的TableEnvironment
,该方法返回SQL
查询的结果为Table
。
直接上代码demo6
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import java.util.Date;
/**
* Flink SQL
*/
public class Demo6 {
private static final String APP_NAME = "app_name";
public static void main(String[] args) {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableSysoutLogging();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); //设置窗口的时间单位为process time
env.setParallelism(1);//全局并发数
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "kafka bootstrap.servers");
//设置topic和 app name
//FlinkKafkaManager 源码见笔记二
FlinkKafkaManager manager = new FlinkKafkaManager("kafka.topic", APP_NAME, properties);
FlinkKafkaConsumer09<JSONObject> consumer = manager.build(JSONObject.class);
consumer.setStartFromLatest();
//获取DataStream,并转成Bean3
DataStream<Bean3> stream = env.addSource(consumer).map(new FlatMap());
final StreamTableEnvironment tableEnvironment = StreamTableEnvironment.getTableEnvironment(env);
tableEnvironment.registerDataStream("common", stream, "timestamp,appId,module,tt.proctime");//注册表名
//module 必须加``,否则报错
String sql = "SELECT appId, COUNT(`module`) AS totals FROM common WHERE appId = '100007336' OR appId = '100013668' GROUP BY TUMBLE(tt, INTERVAL '10' SECOND),appId";
Table query = tableEnvironment.sqlQuery(sql);
DataStream<Row> result = tableEnvironment.toAppendStream(query, Row.class);
result.process(new ProcessFunction<Row, Object>() {
@Override
public void processElement(Row value, Context ctx, Collector<Object> out) throws Exception {
System.out.println(String.format("AppId:%s, Module Count:%s", value.getField(0).toString(), value.getField(1).toString()));
}
});
try {
env.execute(APP_NAME);
} catch (Exception e) {
e.printStackTrace();
}
}
public static class FlatMap implements MapFunction<JSONObject, Bean3> {
@Override
public Bean3 map(JSONObject jsonObject) throws Exception {
return new Bean3(new Date().getTime(), jsonObject.getString("appId"), jsonObject.getString("module"));
}
}
}
除了StreamTableEnvironment
处理不一样,其余代码几乎没有改变,这里需要注意的是,Table API
中的window
函数在SQL
里表现为TUMBLE(tt, INTERVAL '10' SECOND)
(针对滚动窗口)
更多细节部分,参考官网文档即可(我自己也没怎么看,特殊需求特殊对待,哈哈)。
网友评论