对于单节点设置,Flink已经准备就绪,不需要更改默认配置就可以启动。
开箱即用的配置会使用你默认安装的Java。如果你想手动覆盖Java运行环境,可以手动设置环境变量JAVA_HOME
或者conf/flink-conf.yaml
文件中的env.java.home
属性。
本页列出了配置一个良好的运行(分布式的)环境,通常需要设置的最常见的选项。。此外,这里列出了所有可用配置参数的完整列表。
所有的配置都是在 conf/flink-conf.yaml
中完成的,它期望是一个key: value
格式的YAML键值对 的扁平集合。
系统和运行脚本在启动时解析配置。对配置文件的更改需要重新启动Flink JobManager和TaskManager。
任务管理器的配置文件可以不同,Flink不假设集群中有统一的机器。
通用选项
-
env.java.home
: 使用的Java安装路径(默认:如果找到的话,系统默认的Java安装路径)。如果启动脚本未能自动解析java主目录,则需要指定。可以指定指向一个特定的Java安装路径或版本。如果没有指定此选项,启动脚本还好评估$JAVA_HOME
环境变量。 -
env.java.opts
: 设置自定义JVM选项。Flink的启动脚本,JobManager,TaskManager和Flink的YARN客户端关心这个值。它可以用来设置不同的垃圾收集器或者将远程调试添加到运行Flink服务的JVM中。包含双引号的选项,延迟参数替换,从而允许Flink的启动脚本访问变量。分别使用env.java.opts.jobmanager
和env.java.opts.taskmanager
为JobManager和TaskManager制定选项。 -
env.java.opts.jobmanager
: JobManager指定的JVM选项。JobManager-specific JVM options. 这些是除了常规env.java.opts
之外的选项。 -
env.java.opts.taskmanager
: TaskManager 指定的JVM选项。 这些是除了常规env.java.opts
之外的选项。 -
jobmanager.rpc.address
: JobManager的外部地址,它是分布式系统的master/协调者的地址(默认: localhost)。注意: 地址(主机名和IP)应该能被所有的节点和客户端访问。 -
jobmanager.rpc.port
: JobManager的端口号(默认:6123)。 -
jobmanager.heap.mb
: JobManager的JVM堆大小(以兆字节为单位)。如果您正在运行非常大的应用程序(有许多操作符),或者您保存了很长的历史,那么您可能需要增加JobManager的堆大小。 -
taskmanager.heap.mb
: TaskManager的JVM堆大小,它是系统的并行worker。与Hadoop相比,Flink在TaskManager内(包括排序/哈希/缓存)运行操作符(例如,join,aggregate)和用户定义的函数(例如,Map,Reduce,CoGroup),因此这个值应该尽可能大。如果集群只是运行Flink,那么每个机器的可用内存总量减去一些操作系统的内存(可能1-2GB)是一个比较好的值。在YARN上,这个值自动设置成TaskManager的YARN容器的大小,减去一定的公差值。 -
taskmanager.numberOfTaskSlots
: 一个TaskManager能够运行的并行的操作符或者用户函数实例的数量(默认:1)。如果这个值大于1,一个TaskManager将接受多个函数或操作符的实例。这样,TaskManager能利用多个CPU核,但是同时,可用内存被划分给不同的操作符或函数实例。这个值通常和TaskManager所在的机器的物理CPU核数成正比(例如,等于核心的数量,或者是核心数量的一半)。更多的关于slot. -
parallelism.default
: 对于没有指定并行性的程序,默认并行性为1。对于没有并发作业运行的系统,设置这个值为NumTaskManagers * NumSlotsPerTaskManager,这样使得系统使用所有可用的资源来执行程序。注意: 默认的并行性可以被作业重写,通过调用ExecutionEnvironment
上的setParallelism(int parallelism)
方法或者通过传递-p <parallelism>
到Flink命令行前端。可以通过调用操作符上的setParallelism(int parallelism)
方法重写单个转换的并行度。关于并行度的更多信息见并行执行。 -
fs.default-scheme
: 使用的默认文件系统的scheme,具有连接的必要权限,例如在HDFS的情况下,NameNode 的host:port(如果需要)。默认情况下,这个值被设为file:///
,它指向本地文件系统。这意味着将使用本地文件系统来搜索用户指定的文件,而不需要显示scheme定义。另外一个例子,如果这个值被设为hdfs://localhost:9000/
,然后用户指定的文件路径没有scheme定义,例如/user/USERNAME/in.txt
,将被转换为hdfs://localhost:9000/user/USERNAME/in.txt
。这个scheme仅用于在用户提供的URI
中没有指定其它scheme(显示的)。 -
fs.hdfs.hadoopconf
: Hadoop文件系统(HDFS)配置目录的绝对路径(可选值)。指定这个值允许程序使用短URI(hdfs:///path/to/files
,而不用在URI中包含NameNode的地址和端口)引用HDFS文件。如果没有这个选项,HDFS文件可以访问,但需要完全限定的URI,就像hdfs://address:port/path/to/files
。这个选项还会导致文件写者获取HDFS的块大小和复制因子的默认值。Flink会在指定的目录中查找“core-site.xml”和“hdfs-site.xml”文件。 -
classloader.resolve-order
: 当加载用户代码类时,Flink使用child-first的ClassLoader
还是parent-firstClassLoader
。可以是parent-first
或child-first
中的一个值。(默认:child-first
) -
classloader.parent-first-patterns
: 一个(分号分割的)模式列表,它指定哪些类应该总是通过父ClassLoader
进行解析。模式是对类全限定名的简单前缀匹配。默认情况下,它被设置为java.;org.apache.flink.;javax.annotation;org.slf4j;org.apache.log4j;org.apache.logging.log4j;ch.qos.logback
。如果你想更改此设置,并且希望保持默认行为,则必须确保在你的模式列表中包含默认模式。
高级选项
计算
-
taskmanager.compute.numa
: When enabled a TaskManager is started on each NUMA node for each worker listed in conf/slaves (DEFAULT: false). Note: only supported when deploying Flink as a standalone cluster.
托管内存
默认情况下,Flink分配空闲内存(通过taskmanager.heap.mb
配置的总内存减去用作网络缓冲的内存)的0.7倍作为托管内存。托管内存帮助Flink有效的运行批处理操作。它阻止OutOfMemoryException
发生,因为Flink知道它有多少内存能用于执行操作。如果Flink耗尽了托管内存,它就利用磁盘空间。使用托管内存,一些操作可以直接在原始数据上执行,而不用将数据反序列化成Java对象。总之,托管内存提高了系统的健壮性和速度。
托管内存的默认分数可以通过taskmanager.memory.fraction
参数进行调整。可以使用taskmanager.memory.size
设置绝对值(覆盖分数参数)。如果需要,托内内存可以分配在JVM堆外。这可以提高具有大内存空间情况下的性能。
-
taskmanager.memory.size
: TaskManager持有的用于排序,hash表和中间结果缓存的堆内或堆外(依赖于taskmanager.memory.off-heap
)的内存大小(MB)。如果未指定(-1),内存管理器将使用TaskManager JVM堆大小的固定比率的内存,该比率由taskmanager.memory.fraction
指定。(默认值:-1) -
taskmanager.memory.fraction
: 相对数量的内存(相对于taskmanager.heap.mb
,再减去网络缓存使用的内存数量之后),TaskManager使用它来进行排序,hash表和缓存中间结果。例如,值为0.8
意味着TaskManager将其内存(堆上或者堆外依赖于taskmanager.memory.off-heap
)的80%用于内部数据缓冲,剩下的20%的内存用于用户定义的函数创建的对象。(默认:0.7) 这个参数仅用于没有设置taskmanager.memory.size
时进行评估。 -
taskmanager.memory.off-heap
: 如果设置为true
,TaskManager分配用于排序,hash表和缓存中间结果的内存位于JVM堆外。对于具有较大内存的设置,这可以提高在内存上执行的操作的效率(默认为false)。 -
taskmanager.memory.segment-size
: 内存管理和网络栈使用的内存缓冲块字节数大小。(默认: 32768 (= 32 KB))。 -
taskmanager.memory.preallocate
: 可以是true
或false
。指定TaskManager是否应该在启动时分配所有的托管内存。 (默认: false)。当taskmanager.memory.off-heap
设置为true
时,建议这个配置也设为true
。如果这个配置设为false
,那么只有配置的JVM参数MaxDirectMemorySize到达并触发一个full GC时才会清理分配的堆外内存。注意:对于流设置,我们强烈推荐设置这个值为false
,因为核心的状态后端当前不使用托管内存。
内存和性能调试
这些选项对于调试Flink应用内存和垃圾收集相关问题时非常有用,例如性能和OOM导致死亡或异常。
-
taskmanager.debug.memory.startLogThread
: 使TaskManager定期记录内存和垃圾收集统计信息。统计数据包括当前堆,堆外和其它内存池的利用率,以及堆内存池花费在垃圾收集上的时间。 -
taskmanager.debug.memory.logIntervalMs
: TaskManager记录内存和垃圾收集统计信息的间隔(毫秒)。只有在taskmanager.debug.memory.startLogThread
设置为true时才生效。
其它
-
taskmanager.tmp.dirs
: The directory for temporary files, or a list of directories separated by the system’s directory delimiter (for example ‘:’ (colon) on Linux/Unix). If multiple directories are specified, then the temporary files will be distributed across the directories in a round-robin fashion. The I/O manager component will spawn one reading and one writing thread per directory. A directory may be listed multiple times to have the I/O manager use multiple threads for it (for example if it is physically stored on a very fast disc or RAID) (DEFAULT: The system’s tmp dir). -
taskmanager.log.path
: The config parameter defining the taskmanager log file location -
jobmanager.web.address
: Address of the JobManager’s web interface (DEFAULT: anyLocalAddress()). -
jobmanager.web.port
: Port of the JobManager’s web interface (DEFAULT: 8081). -
jobmanager.web.tmpdir
: This configuration parameter allows defining the Flink web directory to be used by the web interface. The web interface will copy its static files into the directory. Also uploaded job jars are stored in the directory if not overridden. By default, the temporary directory is used. -
jobmanager.web.upload.dir
: The config parameter defining the directory for uploading the job jars. If not specified a dynamic directory will be used under the directory specified by jobmanager.web.tmpdir. -
fs.overwrite-files
: Specifies whether file output writers should overwrite existing files by default. Set to true to overwrite by default, false otherwise. (DEFAULT: false) -
fs.output.always-create-directory
: File writers running with a parallelism larger than one create a directory for the output file path and put the different result files (one per parallel writer task) into that directory. If this option is set to true, writers with a parallelism of 1 will also create a directory and place a single result file into it. If the option is set to false, the writer will directly create the file directly at the output path, without creating a containing directory. (DEFAULT: false) -
taskmanager.network.memory.fraction
: Fraction of JVM memory to use for network buffers. This determines how many streaming data exchange channels a TaskManager can have at the same time and how well buffered the channels are. If a job is rejected or you get a warning that the system has not enough buffers available, increase this value or the min/max values below. (DEFAULT: 0.1) -
taskmanager.network.memory.min
: Minimum memory size for network buffers in bytes (DEFAULT: 64 MB) -
taskmanager.network.memory.max
: Maximum memory size for network buffers in bytes (DEFAULT: 1 GB) -
state.backend
: The backend that will be used to store operator state checkpoints if checkpointing is enabled. Supported backends:-
jobmanager
: In-memory state, backup to JobManager’s/ZooKeeper’s memory. Should be used only for minimal state (Kafka offsets) or testing and local debugging. -
filesystem
: State is in-memory on the TaskManagers, and state snapshots are stored in a file system. Supported are all filesystems supported by Flink, for example HDFS, S3, …
-
-
state.backend.fs.checkpointdir
: Directory for storing checkpoints in a Flink supported filesystem. Note: State backend must be accessible from the JobManager, usefile://
only for local setups. -
state.backend.rocksdb.checkpointdir
: The local directory for storing RocksDB files, or a list of directories separated by the systems directory delimiter (for example ‘:’ (colon) on Linux/Unix). (DEFAULT value istaskmanager.tmp.dirs
) -
state.checkpoints.dir
: The target directory for meta data of externalized checkpoints. -
state.checkpoints.num-retained
: The number of completed checkpoint instances to retain. Having more than one allows recovery fallback to an earlier checkpoints if the latest checkpoint is corrupt. (Default: 1) -
high-availability.zookeeper.storageDir
: Required for HA. Directory for storing JobManager metadata; this is persisted in the state backend and only a pointer to this state is stored in ZooKeeper. Exactly like the checkpoint directory it must be accessible from the JobManager and a local filesystem should only be used for local deployments. Previously this key was namedrecovery.zookeeper.storageDir
. -
blob.storage.directory
: Directory for storing blobs (such as user JARs) on the TaskManagers. -
blob.service.cleanup.interval
: Cleanup interval (in seconds) of transient blobs at server and caches as well as permanent blobs at the caches (DEFAULT: 1 hour). Whenever a job is not referenced at the cache anymore, we set a TTL for its permanent blob files and let the periodic cleanup task (executed everyblob.service.cleanup.interval
seconds) remove them after this TTL has passed. We do the same for transient blob files at both server and caches but immediately after accessing them, i.e. an put or get operation. This means that a blob will be retained at most <tt>2 *blob.service.cleanup.interval
</tt> seconds after not being referenced anymore (permanent blobs) or their last access (transient blobs). For permanent blobs, this means that a recovery still has the chance to use existing files rather downloading them again. -
blob.server.port
: Port definition for the blob server (serving user JARs) on the TaskManagers. By default the port is set to 0, which means that the operating system is picking an ephemeral port. Flink also accepts a list of ports (“50100,50101”), ranges (“50100-50200”) or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple JobManagers are running on the same machine. -
blob.service.ssl.enabled
: Flag to enable ssl for the blob client/server communication. This is applicable only when the global ssl flag security.ssl.enabled is set to true (DEFAULT: true). -
restart-strategy
: Default restart strategy to use in case no restart strategy has been specified for the job. The options are:- fixed delay strategy:
fixed-delay
. - failure rate strategy:
failure-rate
. - no restarts:
none
Default value is
none
unless checkpointing is enabled for the job in which case the default isfixed-delay
withInteger.MAX_VALUE
restart attempts and10s
delay. - fixed delay strategy:
-
restart-strategy.fixed-delay.attempts
: Number of restart attempts, used if the default restart strategy is set to “fixed-delay”. Default value is 1, unless “fixed-delay” was activated by enabling checkpoints, in which case the default isInteger.MAX_VALUE
. -
restart-strategy.fixed-delay.delay
: Delay between restart attempts, used if the default restart strategy is set to “fixed-delay”. (default:1 s
) -
restart-strategy.failure-rate.max-failures-per-interval
: Maximum number of restarts in given time interval before failing a job in “failure-rate” strategy. Default value is 1. -
restart-strategy.failure-rate.failure-rate-interval
: Time interval for measuring failure rate in “failure-rate” strategy. Default value is1 minute
. -
restart-strategy.failure-rate.delay
: Delay between restart attempts, used if the default restart strategy is set to “failure-rate”. Default value is theakka.ask.timeout
.
Full Reference
HDFS
These parameters configure the default HDFS used by Flink. Setups that do not specify a HDFS configuration have to specify the full path to HDFS files (hdfs://address:port/path/to/files
) Files will also be written with default HDFS parameters (block size, replication factor).
-
fs.hdfs.hadoopconf
: The absolute path to the Hadoop configuration directory. The system will look for the “core-site.xml” and “hdfs-site.xml” files in that directory (DEFAULT: null). -
fs.hdfs.hdfsdefault
: The absolute path of Hadoop’s own configuration file “hdfs-default.xml” (DEFAULT: null). -
fs.hdfs.hdfssite
: The absolute path of Hadoop’s own configuration file “hdfs-site.xml” (DEFAULT: null).
JobManager & TaskManager
The following parameters configure Flink’s JobManager and TaskManagers.
-
jobmanager.rpc.address
: The external address of the JobManager, which is the master/coordinator of the distributed system (DEFAULT: localhost). Note: The address (host name or IP) should be accessible by all nodes including the client. -
jobmanager.rpc.port
: The port number of the JobManager (DEFAULT: 6123). -
taskmanager.hostname
: The hostname of the network interface that the TaskManager binds to. By default, the TaskManager searches for network interfaces that can connect to the JobManager and other TaskManagers. This option can be used to define a hostname if that strategy fails for some reason. Because different TaskManagers need different values for this option, it usually is specified in an additional non-shared TaskManager-specific config file. -
taskmanager.rpc.port
: The task manager’s IPC port (DEFAULT: 0, which lets the OS choose a free port). Flink also accepts a list of ports (“50100,50101”), ranges (“50100-50200”) or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple TaskManagers are running on the same machine. -
taskmanager.data.port
: The task manager’s port used for data exchange operations (DEFAULT: 0, which lets the OS choose a free port). -
taskmanager.data.ssl.enabled
: Enable SSL support for the taskmanager data transport. This is applicable only when the global ssl flag security.ssl.enabled is set to true (DEFAULT: true) -
jobmanager.heap.mb
: JVM heap size (in megabytes) for the JobManager (DEFAULT: 256). -
taskmanager.heap.mb
: JVM heap size (in megabytes) for the TaskManagers, which are the parallel workers of the system. In contrast to Hadoop, Flink runs operators (e.g., join, aggregate) and user-defined functions (e.g., Map, Reduce, CoGroup) inside the TaskManager (including sorting/hashing/caching), so this value should be as large as possible (DEFAULT: 512). On YARN setups, this value is automatically configured to the size of the TaskManager’s YARN container, minus a certain tolerance value. -
taskmanager.numberOfTaskSlots
: The number of parallel operator or user function instances that a single TaskManager can run (DEFAULT: 1). If this value is larger than 1, a single TaskManager takes multiple instances of a function or operator. That way, the TaskManager can utilize multiple CPU cores, but at the same time, the available memory is divided between the different operator or function instances. This value is typically proportional to the number of physical CPU cores that the TaskManager’s machine has (e.g., equal to the number of cores, or half the number of cores). -
taskmanager.tmp.dirs
: The directory for temporary files, or a list of directories separated by the system’s directory delimiter (for example ‘:’ (colon) on Linux/Unix). If multiple directories are specified, then the temporary files will be distributed across the directories in a round robin fashion. The I/O manager component will spawn one reading and one writing thread per directory. A directory may be listed multiple times to have the I/O manager use multiple threads for it (for example if it is physically stored on a very fast disc or RAID) (DEFAULT: The system’s tmp dir). -
taskmanager.network.memory.fraction
: Fraction of JVM memory to use for network buffers. This determines how many streaming data exchange channels a TaskManager can have at the same time and how well buffered the channels are. If a job is rejected or you get a warning that the system has not enough buffers available, increase this value or the min/max values below. Also note, thattaskmanager.network.memory.min
andtaskmanager.network.memory.max
may override this fraction. (DEFAULT: 0.1) -
taskmanager.network.memory.min
: Minimum memory size for network buffers in bytes (DEFAULT: 64 MB). Previously, this was determined fromtaskmanager.network.numberOfBuffers
andtaskmanager.memory.segment-size
. -
taskmanager.network.memory.max
: Maximum memory size for network buffers in bytes (DEFAULT: 1 GB). Previously, this was determined fromtaskmanager.network.numberOfBuffers
andtaskmanager.memory.segment-size
. -
taskmanager.network.numberOfBuffers
(deprecated, replaced by the three parameters above): The number of buffers available to the network stack. This number determines how many streaming data exchange channels a TaskManager can have at the same time and how well buffered the channels are. If a job is rejected or you get a warning that the system has not enough buffers available, increase this value (DEFAULT: 2048). If set, it will be mapped totaskmanager.network.memory.min
andtaskmanager.network.memory.max
based ontaskmanager.memory.segment-size
. -
taskmanager.memory.size
: The amount of memory (in megabytes) that the task manager reserves on the JVM’s heap space for sorting, hash tables, and caching of intermediate results. If unspecified (-1), the memory manager will take a fixed ratio of the heap memory available to the JVM, as specified bytaskmanager.memory.fraction
. (DEFAULT: -1) -
taskmanager.memory.fraction
: The relative amount of memory (with respect totaskmanager.heap.mb
, after subtracting the amount of memory used by network buffers) that the task manager reserves for sorting, hash tables, and caching of intermediate results. For example, a value of0.8
means that a task manager reserves 80% of its memory (on-heap or off-heap depending ontaskmanager.memory.off-heap
) for internal data buffers, leaving 20% of free memory for the task manager’s heap for objects created by user-defined functions. (DEFAULT: 0.7) This parameter is only evaluated, iftaskmanager.memory.size
is not set. -
taskmanager.debug.memory.startLogThread
: Causes the TaskManagers to periodically log memory and Garbage collection statistics. The statistics include current heap-, off-heap, and other memory pool utilization, as well as the time spent on garbage collection, by heap memory pool. -
taskmanager.debug.memory.logIntervalMs
: The interval (in milliseconds) in which the TaskManagers log the memory and garbage collection statistics. Only has an effect, iftaskmanager.debug.memory.startLogThread
is set to true. -
taskmanager.maxRegistrationDuration
: Defines the maximum time it can take for the TaskManager registration. If the duration is exceeded without a successful registration, then the TaskManager terminates. The max registration duration requires a time unit specifier (ms/s/min/h/d) (e.g. “10 min”). (DEFAULT: Inf) -
taskmanager.initial-registration-pause
: The initial registration pause between two consecutive registration attempts. The pause is doubled for each new registration attempt until it reaches the maximum registration pause. The initial registration pause requires a time unit specifier (ms/s/min/h/d) (e.g. “5 s”). (DEFAULT: 500 ms) -
taskmanager.max-registration-pause
: The maximum registration pause between two consecutive registration attempts. The max registration pause requires a time unit specifier (ms/s/min/h/d) (e.g. “5 s”). (DEFAULT: 30 s) -
taskmanager.refused-registration-pause
: The pause after a registration has been refused by the job manager before retrying to connect. The refused registration pause requires a time unit specifier (ms/s/min/h/d) (e.g. “5 s”). (DEFAULT: 10 s) -
taskmanager.jvm-exit-on-oom
: Indicates that the TaskManager should immediately terminate the JVM if the task thread throws anOutOfMemoryError
(DEFAULT: false). -
blob.fetch.retries
: The number of retries for the TaskManager to download BLOBs (such as JAR files) from the JobManager (DEFAULT: 50). -
blob.fetch.num-concurrent
: The number concurrent BLOB fetches (such as JAR file downloads) that the JobManager serves (DEFAULT: 50). -
blob.fetch.backlog
: The maximum number of queued BLOB fetches (such as JAR file downloads) that the JobManager allows (DEFAULT: 1000). -
task.cancellation-interval
: Time interval between two successive task cancellation attempts in milliseconds (DEFAULT: 30000). -
taskmanager.exit-on-fatal-akka-error
: Whether the TaskManager shall be terminated in case of a fatal Akka error (quarantining event). (DEFAULT: false) -
jobmanager.tdd.offload.minsize
: Maximum size of theTaskDeploymentDescriptor
’s serialized task and job information to still transmit them via RPC. Larger blobs may be offloaded to the BLOB server. (DEFAULT: 1 KiB).
High Availability (HA)
-
high-availability
: Defines the high availability mode used for the cluster execution. Currently, Flink supports the following modes:-
none
(default): No high availability. A single JobManager runs and no JobManager state is checkpointed. -
zookeeper
: Supports the execution of multiple JobManagers and JobManager state checkpointing. Among the group of JobManagers, ZooKeeper elects one of them as the leader which is responsible for the cluster execution. In case of a JobManager failure, a standby JobManager will be elected as the new leader and is given the last checkpointed JobManager state. In order to use the ‘zookeeper’ mode, it is mandatory to also define thehigh-availability.zookeeper.quorum
configuration value.
-
-
high-availability.cluster-id
: (Default/default_ns
in standalone cluster mode, or the <yarn-application-id>under YARN) Defines the subdirectory under the root dir where the ZooKeeper HA mode will create znodes. This allows to isolate multiple applications on the same ZooKeeper. Previously this key was namedrecovery.zookeeper.path.namespace
andhigh-availability.zookeeper.path.namespace
.</yarn-application-id>
Previously this key was named recovery.mode
and the default value was standalone
.
ZooKeeper-based HA Mode
-
high-availability.zookeeper.quorum
: Defines the ZooKeeper quorum URL which is used to connect to the ZooKeeper cluster when the ‘zookeeper’ HA mode is selected. Previously this key was namedrecovery.zookeeper.quorum
. -
high-availability.zookeeper.path.root
: (Default/flink
) Defines the root dir under which the ZooKeeper HA mode will create namespace directories. Previously this ket was namedrecovery.zookeeper.path.root
. -
high-availability.zookeeper.path.latch
: (Default/leaderlatch
) Defines the znode of the leader latch which is used to elect the leader. Previously this key was namedrecovery.zookeeper.path.latch
. -
high-availability.zookeeper.path.leader
: (Default/leader
) Defines the znode of the leader which contains the URL to the leader and the current leader session ID. Previously this key was namedrecovery.zookeeper.path.leader
. -
high-availability.storageDir
: Defines the directory in the state backend where the JobManager metadata will be stored (ZooKeeper only keeps pointers to it). Required for HA. Previously this key was namedrecovery.zookeeper.storageDir
andhigh-availability.zookeeper.storageDir
. -
high-availability.zookeeper.client.session-timeout
: (Default60000
) Defines the session timeout for the ZooKeeper session in ms. Previously this key was namedrecovery.zookeeper.client.session-timeout
-
high-availability.zookeeper.client.connection-timeout
: (Default15000
) Defines the connection timeout for ZooKeeper in ms. Previously this key was namedrecovery.zookeeper.client.connection-timeout
. -
high-availability.zookeeper.client.retry-wait
: (Default5000
) Defines the pause between consecutive retries in ms. Previously this key was namedrecovery.zookeeper.client.retry-wait
. -
high-availability.zookeeper.client.max-retry-attempts
: (Default3
) Defines the number of connection retries before the client gives up. Previously this key was namedrecovery.zookeeper.client.max-retry-attempts
. -
high-availability.job.delay
: (Defaultakka.ask.timeout
) Defines the delay before persisted jobs are recovered in case of a master recovery situation. Previously this key was namedrecovery.job.delay
. -
high-availability.zookeeper.client.acl
: (Defaultopen
) Defines the ACL (open|creator) to be configured on ZK node. The configuration value can be set to “creator” if the ZooKeeper server configuration has the “authProvider” property mapped to use SASLAuthenticationProvider and the cluster is configured to run in secure mode (Kerberos). The ACL options are based on https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes
ZooKeeper Security
-
zookeeper.sasl.disable
: (Default:true
) Defines if SASL based authentication needs to be enabled or disabled. The configuration value can be set to “true” if ZooKeeper cluster is running in secure mode (Kerberos). -
zookeeper.sasl.service-name
: (Default:zookeeper
) If the ZooKeeper server is configured with a different service name (default:”zookeeper”) then it can be supplied using this configuration. A mismatch in service name between client and server configuration will cause the authentication to fail.
Environment
-
env.log.dir
: (Defaults to thelog
directory under Flink’s home) Defines the directory where the Flink logs are saved. It has to be an absolute path. -
env.log.max
: (Default:5
) The maximum number of old log files to keep. -
env.ssh.opts
: Additional command line options passed to SSH clients when starting or stopping JobManager, TaskManager, and Zookeeper services (start-cluster.sh, stop-cluster.sh, start-zookeeper-quorum.sh, stop-zookeeper-quorum.sh).
Configuring TaskManager processing slots
Flink executes a program in parallel by splitting it into subtasks and scheduling these subtasks to processing slots.
Each Flink TaskManager provides processing slots in the cluster. The number of slots is typically proportional to the number of available CPU cores of each TaskManager. As a general recommendation, the number of available CPU cores is a good default for taskmanager.numberOfTaskSlots
.
When starting a Flink application, users can supply the default number of slots to use for that job. The command line value therefore is called -p
(for parallelism). In addition, it is possible to set the number of slots in the programming APIs for the whole application and for individual operators.
![](https://img.haomeiwen.com/i8031348/1d2717583cf024b9.jpg)
网友评论