美文网首页
Apache Flink 学习笔记(四)

Apache Flink 学习笔记(四)

作者: 憨人Zoe | 来源:发表于2018-09-20 14:15 被阅读0次

    本篇将演示如何使用 Flink SQL 实现上一篇demo5的功能,上一篇传送门 Apache Flink 学习笔记(三)

    Flink SQl 是无限接近关系型数据库sql语句的抽象模块,SQLTable API查询可以无缝混合,SQL查询是使用sqlQuery()方法指定的TableEnvironment,该方法返回SQL查询的结果为Table

    直接上代码demo6

    import com.alibaba.fastjson.JSONObject;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.ProcessFunction;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.java.StreamTableEnvironment;
    import org.apache.flink.types.Row;
    import org.apache.flink.util.Collector;
    
    import java.util.Date;
    
    /**
     * Flink SQL
     */
    public class Demo6 {
        private static final String APP_NAME = "app_name";
    
        public static void main(String[] args) {
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.getConfig().enableSysoutLogging();
            env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); //设置窗口的时间单位为process time
            env.setParallelism(1);//全局并发数
    
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", "kafka bootstrap.servers");
            //设置topic和 app name
            //FlinkKafkaManager 源码见笔记二
            FlinkKafkaManager manager = new FlinkKafkaManager("kafka.topic", APP_NAME, properties);
            FlinkKafkaConsumer09<JSONObject> consumer = manager.build(JSONObject.class);
            consumer.setStartFromLatest();
    
            //获取DataStream,并转成Bean3
            DataStream<Bean3> stream = env.addSource(consumer).map(new FlatMap());
    
            final StreamTableEnvironment tableEnvironment = StreamTableEnvironment.getTableEnvironment(env);
            tableEnvironment.registerDataStream("common", stream, "timestamp,appId,module,tt.proctime");//注册表名
    
            //module 必须加``,否则报错
            String sql = "SELECT appId, COUNT(`module`) AS totals FROM common WHERE appId = '100007336' OR appId = '100013668' GROUP BY TUMBLE(tt, INTERVAL '10' SECOND),appId";
            Table query = tableEnvironment.sqlQuery(sql);
    
            DataStream<Row> result = tableEnvironment.toAppendStream(query, Row.class);
            result.process(new ProcessFunction<Row, Object>() {
                @Override
                public void processElement(Row value, Context ctx, Collector<Object> out) throws Exception {
                    System.out.println(String.format("AppId:%s, Module Count:%s", value.getField(0).toString(), value.getField(1).toString()));
                }
            });
    
            try {
                env.execute(APP_NAME);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        public static class FlatMap implements MapFunction<JSONObject, Bean3> {
            @Override
            public Bean3 map(JSONObject jsonObject) throws Exception {
                return new Bean3(new Date().getTime(), jsonObject.getString("appId"), jsonObject.getString("module"));
            }
        }
    }
    

    除了StreamTableEnvironment处理不一样,其余代码几乎没有改变,这里需要注意的是,Table API中的window函数在SQL里表现为TUMBLE(tt, INTERVAL '10' SECOND)(针对滚动窗口)

    更多细节部分,参考官网文档即可(我自己也没怎么看,特殊需求特殊对待,哈哈)。

    相关文章

      网友评论

          本文标题:Apache Flink 学习笔记(四)

          本文链接:https://www.haomeiwen.com/subject/sptnnftx.html