美文网首页
Flinksql ---join

Flinksql ---join

作者: wudl | 来源:发表于2021-08-20 00:33 被阅读0次

    1.Flink sql 之join

    1.1 两个动态流转化为sql 的表进行动态关联

    需要主要的是:

     //默认值为0   表示FlinkSQL中的状态永久保存
    System.out.println(tableEnv.getConfig().getIdleStateRetention());
    //执行FLinkSQL状态保留10秒 输出的值保持时间,以最终的输入时间算起
    tableEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(10));
    

    代码

    package com.wudl.flink.sql;
    
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    
    import java.time.Duration;
    
    /**
     * @ClassName : Flink_SQL_join
     * @Description : Flink -sql  动态表关联
     * @Author :wudl
     * @Date: 2021-08-20 00:16
     */
    
    public class Flink_SQL_join {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    
            //默认值为0   表示FlinkSQL中的状态永久保存
            System.out.println(tableEnv.getConfig().getIdleStateRetention());
    
            //执行FLinkSQL状态保留10秒
            tableEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(10));
    
    
            //2.读取端口数据创建流
            SingleOutputStreamOperator<TableA> aDS = env.socketTextStream("192.168.1.180", 8888)
                    .map(line -> {
                        String[] split = line.split(",");
                        return new TableA(split[0], split[1]);
                    });
            SingleOutputStreamOperator<TableB> bDS = env.socketTextStream("192.168.1.180", 9999)
                    .map(line -> {
                        String[] split = line.split(",");
                        return new TableB(split[0], Integer.parseInt(split[1]));
                    });
    
            // 3. 将流转化为动态表
            tableEnv.createTemporaryView("tabA", aDS);
            tableEnv.createTemporaryView("tabB", bDS);
            // 4. 双流join
            tableEnv.sqlQuery("select * from tabA a left join tabB b on a.id = b.id").execute().print();
    
            env.execute();
    
        }
    }
    
    
    package com.wudl.flink.sql;
    
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    
    /**
     * @ClassName : TableA
     * @Description :
     * @Author :wudl
     * @Date: 2021-08-20 00:18
     */
    
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public class TableA {
        private String id;
        private String name;
    }
    
    
    package com.wudl.flink.sql;
    
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    
    /**
     * @ClassName : TableB
     * @Description :
     * @Author :wudl
     * @Date: 2021-08-20 00:18
     */
    
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public class TableB {
        private String id;
        private int classId;
    }
    
    
    Flink-join.png

    相关文章

      网友评论

          本文标题:Flinksql ---join

          本文链接:https://www.haomeiwen.com/subject/mkuzbltx.html