美文网首页大数据
flink定时读取mysql

flink定时读取mysql

作者: apophisdeity | 来源:发表于2022-04-13 10:12 被阅读0次

    生产上有时候需要定期新增或修改某些筛选值.为了避免频繁重启任务,修改代码,可以将这些配置信息放在mysql表中定期更新,然后通过发送广播流给下游.这样可以减少更改频率,降低重启风险.

    public class MysqlSource extends RichSourceFunction<Map<String, ValueConf>> {
        // 定时器间隔时间(ms)
        private static final CountDownLatch countDownLatch = new CountDownLatch(1);
        private final String sql;
        private QueryRunner qr;
    
        public SignalSource(String sql) {
            this.sql = sql;
        }
    
        @Override
        public void open(Configuration parameters) throws Exception {
            // 此处需要提前将mysql配置信息存储全局变量中 env.getConfig().setGlobalJobParameters(params);
            ParameterTool params = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
            String jdbcUrl = params.getRequired("mysql.jdbc.url");
            String userName = params.getRequired("mysql.username");
            String password = params.getRequired("mysql.password");
            qr = new QueryRunner(JDBCUtils.getDataSource(jdbcUrl, userName, password));
        }
    
        @Override
        public void run(SourceContext<Map<String, ValueConf>> ctx) throws Exception {
            while (true) {
                Map<String, ValueConf> rs = qr.query(sql, new BeanListHandler<>(ValueConf.class)).stream().collect(Collectors.toMap(ValueConf::getCode, i -> i));
                ctx.collect(rs);
                // 每隔12小时执行一次
                countDownLatch.await(12, TimeUnit.HOURS);
            }
        }
    
        @Override
        public void cancel() {
    
        }
    }
    

    将mysql数据源并行度设置为1,只需要一个tm读取mysql,减少了mysql连接数

    MapStateDescriptor<String, ValueConf> descriptor=new MapStateDescriptor<>("mysql-descriptor",String.class,ValueConf.class);
    BroadcastStream<Map<String, ValueConf>> mysqlSource=env.addSource(new MysqlSource("querySql")).uid("MysqlSource").name("MysqlSource").setParallelism(1)
    .broadcast(descriptor);
    

    依赖
    除了flink相关jar包,还需要mysql的,以下是我用的

        <properties>
            <commons.dbutils.version>1.6</commons.dbutils.version>
            <mysql.connector.version>8.0.16</mysql.connector.version>
            <hikaricp.version>4.0.3</hikaricp.version>
        </properties>
            <!--MySQL-java连接驱动-->
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>${mysql.connector.version}</version>
            </dependency>
            <dependency>
                <groupId>commons-dbutils</groupId>
                <artifactId>commons-dbutils</artifactId>
                <version>${commons.dbutils.version}</version>
            </dependency>
            <dependency>
                <groupId>com.zaxxer</groupId>
                <artifactId>HikariCP</artifactId>
                <version>${hikaricp.version}</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-api</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
    

    相关文章

      网友评论

        本文标题:flink定时读取mysql

        本文链接:https://www.haomeiwen.com/subject/ymoosrtx.html