脚本
-c,--class <classname> 程序的入口(main method or getplan()).只有在jar程序的manifest中没有指定class
-m,--jobmanager <host:port> 在哪运行yarn-cluster
-C,--classpath <url> 代码路径
-p,--parallelism <parallelism> 并行度
-ynm,--yarnname <arg> 设置application的名字
-yjm,--yarnjobManagerMemory <arg> JobManager Container的内存
-ytm,--yarntaskManagerMemory <arg> TaskManager Container的内存
-s,--fromSavepoint <savepointPath> savepoint保存的地方,路径需写到chk-某个数
-yn,--yarncontainer <arg> Number of YARN container to allocate(=Number of Task Managers)
-ys,--yarnslots <arg> Number of slots per TaskManager
-yq -yD env.java.opts.taskmanager="-Dsun.stdout.encoding=utf-8"
代码中配置
//状态管理器MemoryStateBackend,FsStateBackend,RocksDBStateBackend后俩需要指定路径
env.setStateBackend(stateBackend);
//设置保存间隔,每 1000ms 开始一次 checkpoint
env.enableCheckpointing(1000);
//exactly-ance 和 at-least-once 语义选择,设置模式为精确一次 (这是默认值)
env.enableCheckpointing(10,EXACTLY_ONCE);
//checkpoint最小时间间隔
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
//Checkpoint 超时时间,Checkpoint 必须在一分钟内完成,否则就会被抛弃
env.getCheckpointConfig().setCheckpointTimeout(60 * 1000);
//最大并行执行的检查点数量,默认是一个
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
//开启在 job 中止后仍然保留的 externalized checkpoints
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//当checkpoint出现错误时是否关闭应用,默认是true,我们可以手动设置为false
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
//默认的重启策略是:固定延迟无限重启
//此处设置重启策略为:出现异常重启1次,隔5秒一次
bsEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, Time.seconds(5)));
//设置任务处理的时间,事件时间,注入时间,处理时间
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
网友评论