美文网首页
flink配置

flink配置

作者: 神呐_宽恕我把 | 来源:发表于2020-11-05 11:07 被阅读0次

    脚本

    -c,--class <classname> 程序的入口(main method or getplan()).只有在jar程序的manifest中没有指定class

    -m,--jobmanager <host:port> 在哪运行yarn-cluster

    -C,--classpath <url> 代码路径

    -p,--parallelism <parallelism> 并行度

    -ynm,--yarnname <arg> 设置application的名字

    -yjm,--yarnjobManagerMemory <arg> JobManager Container的内存

    -ytm,--yarntaskManagerMemory <arg> TaskManager Container的内存

    -s,--fromSavepoint <savepointPath> savepoint保存的地方,路径需写到chk-某个数

    -yn,--yarncontainer <arg> Number of YARN container to allocate(=Number of Task Managers)

    -ys,--yarnslots <arg> Number of slots per TaskManager

    -yq -yD env.java.opts.taskmanager="-Dsun.stdout.encoding=utf-8"

    代码中配置

    //状态管理器MemoryStateBackend,FsStateBackend,RocksDBStateBackend后俩需要指定路径

    env.setStateBackend(stateBackend);

    //设置保存间隔,每 1000ms 开始一次 checkpoint

    env.enableCheckpointing(1000);

    //exactly-ance 和 at-least-once 语义选择,设置模式为精确一次 (这是默认值)

    env.enableCheckpointing(10,EXACTLY_ONCE);

    //checkpoint最小时间间隔

    env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

    //Checkpoint 超时时间,Checkpoint 必须在一分钟内完成,否则就会被抛弃

    env.getCheckpointConfig().setCheckpointTimeout(60 * 1000);

    //最大并行执行的检查点数量,默认是一个

    env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

    //开启在 job 中止后仍然保留的 externalized checkpoints

    env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

    //当checkpoint出现错误时是否关闭应用,默认是true,我们可以手动设置为false

    env.getCheckpointConfig().setPreferCheckpointForRecovery(true);

    //默认的重启策略是:固定延迟无限重启

    //此处设置重启策略为:出现异常重启1次,隔5秒一次

    bsEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, Time.seconds(5)));

    //设置任务处理的时间,事件时间,注入时间,处理时间

    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

    相关文章

      网友评论

          本文标题:flink配置

          本文链接:https://www.haomeiwen.com/subject/oyvgvktx.html