美文网首页
聊聊flink的NetworkEnvironmentConfig

聊聊flink的NetworkEnvironmentConfig

作者: go4it | 来源:发表于2019-02-24 12:25 被阅读14次

    本文主要研究一下flink的NetworkEnvironmentConfiguration

    NetworkEnvironmentConfiguration

    flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java

    public class NetworkEnvironmentConfiguration {
    
        private final float networkBufFraction;
    
        private final long networkBufMin;
    
        private final long networkBufMax;
    
        private final int networkBufferSize;
    
        private final IOMode ioMode;
    
        private final int partitionRequestInitialBackoff;
    
        private final int partitionRequestMaxBackoff;
    
        private final int networkBuffersPerChannel;
    
        private final int floatingNetworkBuffersPerGate;
    
        private final NettyConfig nettyConfig;
    
        /**
         * Constructor for a setup with purely local communication (no netty).
         */
        public NetworkEnvironmentConfiguration(
                float networkBufFraction,
                long networkBufMin,
                long networkBufMax,
                int networkBufferSize,
                IOMode ioMode,
                int partitionRequestInitialBackoff,
                int partitionRequestMaxBackoff,
                int networkBuffersPerChannel,
                int floatingNetworkBuffersPerGate) {
    
            this(networkBufFraction, networkBufMin, networkBufMax, networkBufferSize,
                    ioMode,
                    partitionRequestInitialBackoff, partitionRequestMaxBackoff,
                    networkBuffersPerChannel, floatingNetworkBuffersPerGate,
                    null);
            
        }
    
        public NetworkEnvironmentConfiguration(
                float networkBufFraction,
                long networkBufMin,
                long networkBufMax,
                int networkBufferSize,
                IOMode ioMode,
                int partitionRequestInitialBackoff,
                int partitionRequestMaxBackoff,
                int networkBuffersPerChannel,
                int floatingNetworkBuffersPerGate,
                @Nullable NettyConfig nettyConfig) {
    
            this.networkBufFraction = networkBufFraction;
            this.networkBufMin = networkBufMin;
            this.networkBufMax = networkBufMax;
            this.networkBufferSize = networkBufferSize;
            this.ioMode = ioMode;
            this.partitionRequestInitialBackoff = partitionRequestInitialBackoff;
            this.partitionRequestMaxBackoff = partitionRequestMaxBackoff;
            this.networkBuffersPerChannel = networkBuffersPerChannel;
            this.floatingNetworkBuffersPerGate = floatingNetworkBuffersPerGate;
            this.nettyConfig = nettyConfig;
        }
    
        // ------------------------------------------------------------------------
    
        public float networkBufFraction() {
            return networkBufFraction;
        }
    
        public long networkBufMin() {
            return networkBufMin;
        }
    
        public long networkBufMax() {
            return networkBufMax;
        }
    
        public int networkBufferSize() {
            return networkBufferSize;
        }
    
        public IOMode ioMode() {
            return ioMode;
        }
    
        public int partitionRequestInitialBackoff() {
            return partitionRequestInitialBackoff;
        }
    
        public int partitionRequestMaxBackoff() {
            return partitionRequestMaxBackoff;
        }
    
        public int networkBuffersPerChannel() {
            return networkBuffersPerChannel;
        }
    
        public int floatingNetworkBuffersPerGate() {
            return floatingNetworkBuffersPerGate;
        }
    
        public NettyConfig nettyConfig() {
            return nettyConfig;
        }
    
        // ------------------------------------------------------------------------
    
        @Override
        public int hashCode() {
            int result = 1;
            result = 31 * result + networkBufferSize;
            result = 31 * result + ioMode.hashCode();
            result = 31 * result + partitionRequestInitialBackoff;
            result = 31 * result + partitionRequestMaxBackoff;
            result = 31 * result + networkBuffersPerChannel;
            result = 31 * result + floatingNetworkBuffersPerGate;
            result = 31 * result + (nettyConfig != null ? nettyConfig.hashCode() : 0);
            return result;
        }
    
        @Override
        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            else if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            else {
                final NetworkEnvironmentConfiguration that = (NetworkEnvironmentConfiguration) obj;
    
                return this.networkBufFraction == that.networkBufFraction &&
                        this.networkBufMin == that.networkBufMin &&
                        this.networkBufMax == that.networkBufMax &&
                        this.networkBufferSize == that.networkBufferSize &&
                        this.partitionRequestInitialBackoff == that.partitionRequestInitialBackoff &&
                        this.partitionRequestMaxBackoff == that.partitionRequestMaxBackoff &&
                        this.networkBuffersPerChannel == that.networkBuffersPerChannel &&
                        this.floatingNetworkBuffersPerGate == that.floatingNetworkBuffersPerGate &&
                        this.ioMode == that.ioMode && 
                        (nettyConfig != null ? nettyConfig.equals(that.nettyConfig) : that.nettyConfig == null);
            }
        }
    
        @Override
        public String toString() {
            return "NetworkEnvironmentConfiguration{" +
                    "networkBufFraction=" + networkBufFraction +
                    ", networkBufMin=" + networkBufMin +
                    ", networkBufMax=" + networkBufMax +
                    ", networkBufferSize=" + networkBufferSize +
                    ", ioMode=" + ioMode +
                    ", partitionRequestInitialBackoff=" + partitionRequestInitialBackoff +
                    ", partitionRequestMaxBackoff=" + partitionRequestMaxBackoff +
                    ", networkBuffersPerChannel=" + networkBuffersPerChannel +
                    ", floatingNetworkBuffersPerGate=" + floatingNetworkBuffersPerGate +
                    ", nettyConfig=" + nettyConfig +
                    '}';
        }
    }
    
    • NetworkEnvironmentConfiguration主要是flink network的相关配置,里头有networkBufFraction、networkBufMin、networkBufMax、networkBufferSize、ioMode、partitionRequestInitialBackoff、partitionRequestMaxBackoff、networkBuffersPerChannel、floatingNetworkBuffersPerGate、nettyConfig属性

    TaskManagerServicesConfiguration

    flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java

    public class TaskManagerServicesConfiguration {
    
        //......
    
        /**
         * Creates the {@link NetworkEnvironmentConfiguration} from the given {@link Configuration}.
         *
         * @param configuration to create the network environment configuration from
         * @param localTaskManagerCommunication true if task manager communication is local
         * @param taskManagerAddress address of the task manager
         * @param slots to start the task manager with
         * @return Network environment configuration
         */
        @SuppressWarnings("deprecation")
        private static NetworkEnvironmentConfiguration parseNetworkEnvironmentConfiguration(
            Configuration configuration,
            boolean localTaskManagerCommunication,
            InetAddress taskManagerAddress,
            int slots) throws Exception {
    
            // ----> hosts / ports for communication and data exchange
    
            int dataport = configuration.getInteger(TaskManagerOptions.DATA_PORT);
    
            checkConfigParameter(dataport >= 0, dataport, TaskManagerOptions.DATA_PORT.key(),
                "Leave config parameter empty or use 0 to let the system choose a port automatically.");
    
            checkConfigParameter(slots >= 1, slots, TaskManagerOptions.NUM_TASK_SLOTS.key(),
                "Number of task slots must be at least one.");
    
            final int pageSize = checkedDownCast(MemorySize.parse(configuration.getString(TaskManagerOptions.MEMORY_SEGMENT_SIZE)).getBytes());
    
            // check page size of for minimum size
            checkConfigParameter(pageSize >= MemoryManager.MIN_PAGE_SIZE, pageSize,
                TaskManagerOptions.MEMORY_SEGMENT_SIZE.key(),
                "Minimum memory segment size is " + MemoryManager.MIN_PAGE_SIZE);
    
            // check page size for power of two
            checkConfigParameter(MathUtils.isPowerOf2(pageSize), pageSize,
                TaskManagerOptions.MEMORY_SEGMENT_SIZE.key(),
                "Memory segment size must be a power of 2.");
    
            // network buffer memory fraction
    
            float networkBufFraction = configuration.getFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION);
            long networkBufMin = MemorySize.parse(configuration.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN)).getBytes();
            long networkBufMax = MemorySize.parse(configuration.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX)).getBytes();
            checkNetworkBufferConfig(pageSize, networkBufFraction, networkBufMin, networkBufMax);
    
            // fallback: number of network buffers
            final int numNetworkBuffers = configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
            checkNetworkConfigOld(numNetworkBuffers);
    
            if (!hasNewNetworkBufConf(configuration)) {
                // map old config to new one:
                networkBufMin = networkBufMax = ((long) numNetworkBuffers) * pageSize;
            } else {
                if (configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) {
                    LOG.info("Ignoring old (but still present) network buffer configuration via {}.",
                        TaskManagerOptions.NETWORK_NUM_BUFFERS.key());
                }
            }
    
            final NettyConfig nettyConfig;
            if (!localTaskManagerCommunication) {
                final InetSocketAddress taskManagerInetSocketAddress = new InetSocketAddress(taskManagerAddress, dataport);
    
                nettyConfig = new NettyConfig(taskManagerInetSocketAddress.getAddress(),
                    taskManagerInetSocketAddress.getPort(), pageSize, slots, configuration);
            } else {
                nettyConfig = null;
            }
    
            // Default spill I/O mode for intermediate results
            final String syncOrAsync = configuration.getString(
                ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE,
                ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_DEFAULT_IO_MODE);
    
            final IOManager.IOMode ioMode;
            if (syncOrAsync.equals("async")) {
                ioMode = IOManager.IOMode.ASYNC;
            } else {
                ioMode = IOManager.IOMode.SYNC;
            }
    
            int initialRequestBackoff = configuration.getInteger(
                TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
            int maxRequestBackoff = configuration.getInteger(
                TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX);
    
            int buffersPerChannel = configuration.getInteger(
                TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL);
            int extraBuffersPerGate = configuration.getInteger(
                TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);
    
            return new NetworkEnvironmentConfiguration(
                networkBufFraction,
                networkBufMin,
                networkBufMax,
                pageSize,
                ioMode,
                initialRequestBackoff,
                maxRequestBackoff,
                buffersPerChannel,
                extraBuffersPerGate,
                nettyConfig);
        }
    
        //......
    }
    
    • TaskManagerServicesConfiguration有个私有方法parseNetworkEnvironmentConfiguration,用于创建NetworkEnvironmentConfiguration;它会读取TaskManagerOptions.MEMORY_SEGMENT_SIZE、TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION、TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN、TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX、TaskManagerOptions.NETWORK_NUM_BUFFERS、ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE、TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL、TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX、TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL、TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE等配置

    TaskManagerOptions

    flink-1.7.2/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java

    @PublicEvolving
    public class TaskManagerOptions {
        //......
    
        /**
         * Size of memory buffers used by the network stack and the memory manager.
         */
        public static final ConfigOption<String> MEMORY_SEGMENT_SIZE =
                key("taskmanager.memory.segment-size")
                .defaultValue("32kb")
                .withDescription("Size of memory buffers used by the network stack and the memory manager.");
    
        /**
         * Fraction of JVM memory to use for network buffers.
         */
        public static final ConfigOption<Float> NETWORK_BUFFERS_MEMORY_FRACTION =
                key("taskmanager.network.memory.fraction")
                .defaultValue(0.1f)
                .withDescription("Fraction of JVM memory to use for network buffers. This determines how many streaming" +
                    " data exchange channels a TaskManager can have at the same time and how well buffered the channels" +
                    " are. If a job is rejected or you get a warning that the system has not enough buffers available," +
                    " increase this value or the min/max values below. Also note, that \"taskmanager.network.memory.min\"" +
                    "` and \"taskmanager.network.memory.max\" may override this fraction.");
    
        /**
         * Minimum memory size for network buffers.
         */
        public static final ConfigOption<String> NETWORK_BUFFERS_MEMORY_MIN =
                key("taskmanager.network.memory.min")
                .defaultValue("64mb")
                .withDescription("Minimum memory size for network buffers.");
    
        /**
         * Maximum memory size for network buffers.
         */
        public static final ConfigOption<String> NETWORK_BUFFERS_MEMORY_MAX =
                key("taskmanager.network.memory.max")
                .defaultValue("1gb")
                .withDescription("Maximum memory size for network buffers.");
    
        /**
         * Number of buffers used in the network stack. This defines the number of possible tasks and
         * shuffles.
         *
         * @deprecated use {@link #NETWORK_BUFFERS_MEMORY_FRACTION}, {@link #NETWORK_BUFFERS_MEMORY_MIN},
         * and {@link #NETWORK_BUFFERS_MEMORY_MAX} instead
         */
        @Deprecated
        public static final ConfigOption<Integer> NETWORK_NUM_BUFFERS =
                key("taskmanager.network.numberOfBuffers")
                .defaultValue(2048);
    
        /**
         * Minimum backoff for partition requests of input channels.
         */
        public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_INITIAL =
                key("taskmanager.network.request-backoff.initial")
                .defaultValue(100)
                .withDeprecatedKeys("taskmanager.net.request-backoff.initial")
                .withDescription("Minimum backoff in milliseconds for partition requests of input channels.");
    
        /**
         * Maximum backoff for partition requests of input channels.
         */
        public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_MAX =
                key("taskmanager.network.request-backoff.max")
                .defaultValue(10000)
                .withDeprecatedKeys("taskmanager.net.request-backoff.max")
                .withDescription("Maximum backoff in milliseconds for partition requests of input channels.");
    
        /**
         * Number of network buffers to use for each outgoing/incoming channel (subpartition/input channel).
         *
         * <p>Reasoning: 1 buffer for in-flight data in the subpartition + 1 buffer for parallel serialization.
         */
        public static final ConfigOption<Integer> NETWORK_BUFFERS_PER_CHANNEL =
                key("taskmanager.network.memory.buffers-per-channel")
                .defaultValue(2)
                .withDescription("Maximum number of network buffers to use for each outgoing/incoming channel (subpartition/input channel)." +
                    "In credit-based flow control mode, this indicates how many credits are exclusive in each input channel. It should be" +
                    " configured at least 2 for good performance. 1 buffer is for receiving in-flight data in the subpartition and 1 buffer is" +
                    " for parallel serialization.");
    
        /**
         * Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate).
         */
        public static final ConfigOption<Integer> NETWORK_EXTRA_BUFFERS_PER_GATE =
                key("taskmanager.network.memory.floating-buffers-per-gate")
                .defaultValue(8)
                .withDescription("Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate)." +
                    " In credit-based flow control mode, this indicates how many floating credits are shared among all the input channels." +
                    " The floating buffers are distributed based on backlog (real-time output buffers in the subpartition) feedback, and can" +
                    " help relieve back-pressure caused by unbalanced data distribution among the subpartitions. This value should be" +
                    " increased in case of higher round trip times between nodes and/or larger number of machines in the cluster.");                                                                                                        
        //......
    }
    
    • taskmanager.memory.segment-size指定memory segment的大小,默认为32kb;taskmanager.network.memory.fraction指定network buffers使用的memory的比例,默认为0.1;taskmanager.network.memory.min指定network buffers使用的最小内存,默认为64mb;taskmanager.network.memory.max指定network buffers使用的最大内存,默认为1gb;taskmanager.network.numberOfBuffers指定network使用的buffers数量,默认为2048,该配置已经被废弃,使用taskmanager.network.memory.fraction、taskmanager.network.memory.min、taskmanager.network.memory.max这几个配置来替代
    • taskmanager.network.request-backoff.initial指定input channels的partition requests的最小backoff时间(毫秒),默认为100;taskmanager.network.request-backoff.max指定input channels的partition requests的最大backoff时间(毫秒),默认为10000
    • taskmanager.network.memory.buffers-per-channel指定每个outgoing/incoming channel使用buffers数量,默认为2;taskmanager.network.memory.floating-buffers-per-gate指定每个outgoing/incoming gate使用buffers数量,默认为8

    NettyConfig

    flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java

    public class NettyConfig {
    
        private static final Logger LOG = LoggerFactory.getLogger(NettyConfig.class);
    
        // - Config keys ----------------------------------------------------------
    
        public static final ConfigOption<Integer> NUM_ARENAS = ConfigOptions
                .key("taskmanager.network.netty.num-arenas")
                .defaultValue(-1)
                .withDeprecatedKeys("taskmanager.net.num-arenas")
                .withDescription("The number of Netty arenas.");
    
        public static final ConfigOption<Integer> NUM_THREADS_SERVER = ConfigOptions
                .key("taskmanager.network.netty.server.numThreads")
                .defaultValue(-1)
                .withDeprecatedKeys("taskmanager.net.server.numThreads")
                .withDescription("The number of Netty server threads.");
    
        public static final ConfigOption<Integer> NUM_THREADS_CLIENT = ConfigOptions
                .key("taskmanager.network.netty.client.numThreads")
                .defaultValue(-1)
                .withDeprecatedKeys("taskmanager.net.client.numThreads")
                .withDescription("The number of Netty client threads.");
    
        public static final ConfigOption<Integer> CONNECT_BACKLOG = ConfigOptions
                .key("taskmanager.network.netty.server.backlog")
                .defaultValue(0) // default: 0 => Netty's default
                .withDeprecatedKeys("taskmanager.net.server.backlog")
                .withDescription("The netty server connection backlog.");
    
        public static final ConfigOption<Integer> CLIENT_CONNECT_TIMEOUT_SECONDS = ConfigOptions
                .key("taskmanager.network.netty.client.connectTimeoutSec")
                .defaultValue(120) // default: 120s = 2min
                .withDeprecatedKeys("taskmanager.net.client.connectTimeoutSec")
                .withDescription("The Netty client connection timeout.");
    
        public static final ConfigOption<Integer> SEND_RECEIVE_BUFFER_SIZE = ConfigOptions
                .key("taskmanager.network.netty.sendReceiveBufferSize")
                .defaultValue(0) // default: 0 => Netty's default
                .withDeprecatedKeys("taskmanager.net.sendReceiveBufferSize")
                .withDescription("The Netty send and receive buffer size. This defaults to the system buffer size" +
                    " (cat /proc/sys/net/ipv4/tcp_[rw]mem) and is 4 MiB in modern Linux.");
    
        public static final ConfigOption<String> TRANSPORT_TYPE = ConfigOptions
                .key("taskmanager.network.netty.transport")
                .defaultValue("nio")
                .withDeprecatedKeys("taskmanager.net.transport")
                .withDescription("The Netty transport type, either \"nio\" or \"epoll\"");
    
        // ------------------------------------------------------------------------
    
        enum TransportType {
            NIO, EPOLL, AUTO
        }
    
        static final String SERVER_THREAD_GROUP_NAME = "Flink Netty Server";
    
        static final String CLIENT_THREAD_GROUP_NAME = "Flink Netty Client";
    
        private final InetAddress serverAddress;
    
        private final int serverPort;
    
        private final int memorySegmentSize;
    
        private final int numberOfSlots;
    
        private final Configuration config; // optional configuration
    
        public NettyConfig(
                InetAddress serverAddress,
                int serverPort,
                int memorySegmentSize,
                int numberOfSlots,
                Configuration config) {
    
            this.serverAddress = checkNotNull(serverAddress);
    
            checkArgument(serverPort >= 0 && serverPort <= 65536, "Invalid port number.");
            this.serverPort = serverPort;
    
            checkArgument(memorySegmentSize > 0, "Invalid memory segment size.");
            this.memorySegmentSize = memorySegmentSize;
    
            checkArgument(numberOfSlots > 0, "Number of slots");
            this.numberOfSlots = numberOfSlots;
    
            this.config = checkNotNull(config);
    
            LOG.info(this.toString());
        }
    
        InetAddress getServerAddress() {
            return serverAddress;
        }
    
        int getServerPort() {
            return serverPort;
        }
    
        int getMemorySegmentSize() {
            return memorySegmentSize;
        }
    
        public int getNumberOfSlots() {
            return numberOfSlots;
        }
    
        // ------------------------------------------------------------------------
        // Getters
        // ------------------------------------------------------------------------
    
        public int getServerConnectBacklog() {
            return config.getInteger(CONNECT_BACKLOG);
        }
    
        public int getNumberOfArenas() {
            // default: number of slots
            final int configValue = config.getInteger(NUM_ARENAS);
            return configValue == -1 ? numberOfSlots : configValue;
        }
    
        public int getServerNumThreads() {
            // default: number of task slots
            final int configValue = config.getInteger(NUM_THREADS_SERVER);
            return configValue == -1 ? numberOfSlots : configValue;
        }
    
        public int getClientNumThreads() {
            // default: number of task slots
            final int configValue = config.getInteger(NUM_THREADS_CLIENT);
            return configValue == -1 ? numberOfSlots : configValue;
        }
    
        public int getClientConnectTimeoutSeconds() {
            return config.getInteger(CLIENT_CONNECT_TIMEOUT_SECONDS);
        }
    
        public int getSendAndReceiveBufferSize() {
            return config.getInteger(SEND_RECEIVE_BUFFER_SIZE);
        }
    
        public TransportType getTransportType() {
            String transport = config.getString(TRANSPORT_TYPE);
    
            switch (transport) {
                case "nio":
                    return TransportType.NIO;
                case "epoll":
                    return TransportType.EPOLL;
                default:
                    return TransportType.AUTO;
            }
        }
    
        @Nullable
        public SSLHandlerFactory createClientSSLEngineFactory() throws Exception {
            return getSSLEnabled() ?
                    SSLUtils.createInternalClientSSLEngineFactory(config) :
                    null;
        }
    
        @Nullable
        public SSLHandlerFactory createServerSSLEngineFactory() throws Exception {
            return getSSLEnabled() ?
                    SSLUtils.createInternalServerSSLEngineFactory(config) :
                    null;
        }
    
        public boolean getSSLEnabled() {
            return config.getBoolean(TaskManagerOptions.DATA_SSL_ENABLED)
                && SSLUtils.isInternalSSLEnabled(config);
        }
    
        public boolean isCreditBasedEnabled() {
            return config.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL);
        }
    
        public Configuration getConfig() {
            return config;
        }
    
        @Override
        public String toString() {
            String format = "NettyConfig [" +
                    "server address: %s, " +
                    "server port: %d, " +
                    "ssl enabled: %s, " +
                    "memory segment size (bytes): %d, " +
                    "transport type: %s, " +
                    "number of server threads: %d (%s), " +
                    "number of client threads: %d (%s), " +
                    "server connect backlog: %d (%s), " +
                    "client connect timeout (sec): %d, " +
                    "send/receive buffer size (bytes): %d (%s)]";
    
            String def = "use Netty's default";
            String man = "manual";
    
            return String.format(format, serverAddress, serverPort, getSSLEnabled() ? "true" : "false",
                    memorySegmentSize, getTransportType(), getServerNumThreads(),
                    getServerNumThreads() == 0 ? def : man,
                    getClientNumThreads(), getClientNumThreads() == 0 ? def : man,
                    getServerConnectBacklog(), getServerConnectBacklog() == 0 ? def : man,
                    getClientConnectTimeoutSeconds(), getSendAndReceiveBufferSize(),
                    getSendAndReceiveBufferSize() == 0 ? def : man);
        }
    }
    
    • NettyConfig的构造器接收serverAddress、serverPort、memorySegmentSize、numberOfSlots、config这几个参数;它还提供了getServerConnectBacklog、getNumberOfArenas、getServerNumThreads、getClientNumThreads、getClientConnectTimeoutSeconds、getSendAndReceiveBufferSize、getTransportType等方法用于从config读取配置
    • taskmanager.network.netty.server.backlog用于指定netty server的connection backlog,默认值为0即使用netty默认的配置;taskmanager.network.netty.client.connectTimeoutSec指定netty client的connection timeout,默认为120(单位秒);taskmanager.network.netty.sendReceiveBufferSize指定netty send/receive buffer大小,默认为0即使用netty的默认配置,默认是使用system buffer size,即/proc/sys/net/ipv4/tcp_[rw]mem的配置;taskmanager.network.netty.transport指定的是netty transport的类型,默认是nio
    • taskmanager.network.netty.num-arenas指定的是netty arenas的数量,默认为-1;taskmanager.network.netty.server.numThreads指定的是netty server的threads数量,默认为-1;taskmanager.network.netty.client.numThreads指定的是netty client的threads数量,默认为-1;这几个配置当配置值为-1的时候,对应get方法返回的是numberOfSlots值

    小结

    • NetworkEnvironmentConfiguration主要是flink network的相关配置,里头有networkBufFraction、networkBufMin、networkBufMax、networkBufferSize、ioMode、partitionRequestInitialBackoff、partitionRequestMaxBackoff、networkBuffersPerChannel、floatingNetworkBuffersPerGate、nettyConfig属性
    • TaskManagerServicesConfiguration有个私有方法parseNetworkEnvironmentConfiguration,用于创建NetworkEnvironmentConfiguration;它会读取TaskManagerOptions.MEMORY_SEGMENT_SIZE、TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION、TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN、TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX、TaskManagerOptions.NETWORK_NUM_BUFFERS、ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE、TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL、TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX、TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL、TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE等配置
    • NettyConfig的构造器接收serverAddress、serverPort、memorySegmentSize、numberOfSlots、config这几个参数;它还提供了getServerConnectBacklog、getNumberOfArenas、getServerNumThreads、getClientNumThreads、getClientConnectTimeoutSeconds、getSendAndReceiveBufferSize、getTransportType等方法用于从config读取配置

    doc

    相关文章

      网友评论

          本文标题:聊聊flink的NetworkEnvironmentConfig

          本文链接:https://www.haomeiwen.com/subject/vpivyqtx.html