美文网首页Flink
Flink1.13 SQL执行 oom 排查

Flink1.13 SQL执行 oom 排查

作者: 清蒸三文鱼_ | 来源:发表于2021-07-19 20:03 被阅读0次

    背景

    flink on yarn cluster的模式, yarn上的应用经常发生异常, 如jobmanager的oom, zk心跳丢失, slot分配请求超时, hdfs文件已存在等等; 经过排查定位到了是flink sql的解析问题, 像count, where这类的语句在实际执行的时候变成了全量的查询

    maven

      <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-jdbc_2.12</artifactId>
                <version>1.13.1</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-planner-blink_2.12</artifactId>
                <version>1.13.1</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-clients_2.12</artifactId>
                <version>1.13.1</version>
            </dependency>
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>5.1.47</version>
            </dependency>
        </dependencies>
    

    demo

     public static void main(String[] args) {
            TableEnvironment env = TableEnvironment.create(EnvironmentSettings.newInstance().useBlinkPlanner().build());
            env.executeSql("CREATE TABLE my_table (" +
                    "  id INT,name STRING,age INT" +
                    ") WITH (" +
                    "   'connector' = 'jdbc'," +
                    "   'url' = 'jdbc:mysql://localhost:3306/test'," +
                    "   'table-name' = 'users','username'='root','password'='root'" +
                    ")");
            env.executeSql("show tables").print();
            env.executeSql("select count(id) from my_table where name='jay' and age>=10").print();
            env.executeSql("select * from my_table").print();
        }
    

    定位

    分析dump文件, 得知内存中存放了该表几乎全量的数据, 但sql加上where条件后, 实际上数据只有10来条, 是create table阶段的问题, 还是sql执行阶段的问题呢?


    通过官网了解相关的概念 ,进行debug查看实际执行sql, 发现了端倪
    select count(id) from my_table where name='jay' and age>=10, 变成了下图的语句 sql解析错误

    Flink配置JVM参数

    1. flink-conf.yaml新增一行env.java.opts.taskmanager: -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp
    2. flink on yarn提交脚本加入 -yD env.java.opts.taskmanager="-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp"
    3. CDH 搜索jvm关键字


    异常收集

    Zk NoNode
    Recovery is suppressed by NoRestartBackoffTimeStrategy
    NoResourceAvailableException Could not allocate the required slot within slot request timeout
    org.apache.hadoop.fs.FileAlreadyExistsException
    rest port超出范围 Heartbeat of TaskManager with id container_1626774674426_0494_01_000002(xx:8041) timed out

    相关文章

      网友评论

        本文标题:Flink1.13 SQL执行 oom 排查

        本文链接:https://www.haomeiwen.com/subject/qjvhmltx.html